# 批次操作
批次操作是提升 Redis 效能的關鍵技術,能夠大幅減少網路往返次數,提高資料處理效率。
# 🚀 為什麼需要批次操作?
單筆操作 vs 批次操作的效能差異:
import time
# ❌ 單筆操作:效能低下
start = time.time()
for i in range(10000):
toolkit.setter(f"key:{i}", f"value:{i}")
print(f"單筆操作耗時: {time.time() - start:.2f} 秒")
# ✅ 批次操作:效能提升 10 倍以上
start = time.time()
batch_data = {f"key:{i}": f"value:{i}" for i in range(10000)}
toolkit.batch_set(batch_data)
print(f"批次操作耗時: {time.time() - start:.2f} 秒")
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 📝 批次操作方法
# batch_set - 批次設定
# 準備批次資料
users = {
"user:1001": {"name": "Alice", "score": 95},
"user:1002": {"name": "Bob", "score": 87},
"user:1003": {"name": "Charlie", "score": 92}
}
# 一次設定多個鍵值
toolkit.batch_set(users)
# 支援設定過期時間
toolkit.batch_set(users, ex=3600) # 全部設定 1 小時過期
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# batch_get - 批次讀取
# 批次讀取多個鍵
keys = ["user:1001", "user:1002", "user:1003"]
results = toolkit.batch_get(keys)
# results 是字典格式
for key, value in results.items():
if value: # 檢查鍵是否存在
print(f"{key}: {value['name']} - 分數: {value['score']}")
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 🔧 使用 Pipeline
Pipeline 提供更靈活的批次操作方式:
# 創建管線
pipe = toolkit.client.pipeline()
# 排隊多個不同類型的命令
pipe.set("counter", 0)
pipe.incr("counter")
pipe.incr("counter")
pipe.get("counter")
# 執行所有命令
results = pipe.execute()
print(results) # [True, 1, 2, b'2']
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# Pipeline 事務支援
# 使用事務確保原子性
pipe = toolkit.client.pipeline(transaction=True)
try:
# 監視鍵(樂觀鎖)
pipe.watch("account:balance")
# 獲取當前餘額
balance = int(pipe.get("account:balance") or 0)
# 開始事務
pipe.multi()
# 執行轉帳操作
if balance >= 100:
pipe.decrby("account:balance", 100)
pipe.incrby("account:savings", 100)
pipe.execute()
print("轉帳成功")
else:
pipe.reset() # 取消事務
print("餘額不足")
except redis.WatchError:
print("餘額在交易期間被修改")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 🎯 實用範例
# 批次更新用戶積分
class ScoreManager:
def __init__(self):
self.toolkit = RedisToolkit()
def update_scores(self, score_updates):
"""批次更新用戶積分"""
# 先批次讀取現有資料
user_ids = list(score_updates.keys())
keys = [f"user:{uid}" for uid in user_ids]
existing_users = self.toolkit.batch_get(keys)
# 更新積分
updated_data = {}
for uid, score_delta in score_updates.items():
key = f"user:{uid}"
user_data = existing_users.get(key, {"score": 0})
user_data["score"] = user_data.get("score", 0) + score_delta
updated_data[key] = user_data
# 批次寫回
self.toolkit.batch_set(updated_data)
return updated_data
# 使用範例
manager = ScoreManager()
# 批次更新多個用戶的積分
updates = {
"1001": 10, # 用戶 1001 加 10 分
"1002": -5, # 用戶 1002 減 5 分
"1003": 20 # 用戶 1003 加 20 分
}
result = manager.update_scores(updates)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 批次快取預熱
def cache_warmup(data_loader, cache_keys):
"""批次預熱快取"""
# 從資料來源批次載入資料
data = data_loader.load_batch(cache_keys)
# 轉換為 Redis 鍵值格式
cache_data = {}
for item in data:
key = f"cache:{item['id']}"
cache_data[key] = item
# 批次寫入快取,設定過期時間
toolkit.batch_set(cache_data, ex=3600)
print(f"預熱了 {len(cache_data)} 個快取項目")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 🚀 效能優化技巧
# 1. 分批處理大量資料
def batch_process_large_dataset(data, batch_size=1000):
"""分批處理大型資料集"""
total = len(data)
for i in range(0, total, batch_size):
batch = dict(list(data.items())[i:i + batch_size])
toolkit.batch_set(batch)
print(f"已處理 {min(i + batch_size, total)}/{total}")
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 2. 並行批次操作
from concurrent.futures import ThreadPoolExecutor
import threading
def parallel_batch_operations(datasets):
"""並行執行多個批次操作"""
def process_batch(batch_id, data):
local_toolkit = RedisToolkit()
local_toolkit.batch_set(data)
print(f"批次 {batch_id} 完成")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i, dataset in enumerate(datasets):
future = executor.submit(process_batch, i, dataset)
futures.append(future)
# 等待所有批次完成
for future in futures:
future.result()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 📊 效能比較
操作類型 | 10,000 筆資料耗時 | 網路往返次數 |
---|---|---|
單筆操作 | ~5.2 秒 | 10,000 次 |
批次操作 | ~0.3 秒 | 1 次 |
Pipeline | ~0.4 秒 | 1 次 |
分批處理(1000筆/批) | ~0.5 秒 | 10 次 |
# 🎯 最佳實踐
選擇適當的批次大小
# 建議批次大小:100-1000 筆 # 太小:效能提升有限 # 太大:可能超過 Redis 限制或記憶體 optimal_batch_size = 500
1
2
3
4處理部分失敗
try: toolkit.batch_set(large_batch) except Exception as e: # 降級為逐筆處理 for key, value in large_batch.items(): try: toolkit.setter(key, value) except: logger.error(f"Failed to set {key}")
1
2
3
4
5
6
7
8
9監控批次操作
import time start = time.time() toolkit.batch_set(data) elapsed = time.time() - start logger.info(f"批次寫入 {len(data)} 筆,耗時 {elapsed:.3f} 秒")
1
2
3
4
5
6
7
# 📚 延伸閱讀
小結
批次操作是提升 Redis 效能的利器。記住選擇適當的批次大小,處理錯誤情況,並監控操作效能!
← 媒體處理