# 發布訂閱 (Pub/Sub)
Redis Toolkit 簡化了 Redis 的發布訂閱功能,提供自動序列化、背景監聽、錯誤處理等增強功能,讓訊息傳遞變得簡單可靠。
# 🎯 什麼是發布訂閱?
發布訂閱(Pub/Sub)是一種訊息傳遞模式:
- 發布者(Publisher):發送訊息到特定頻道
- 訂閱者(Subscriber):監聽一個或多個頻道的訊息
- 頻道(Channel):訊息傳遞的管道
graph LR
P1[發布者 1] -->|訊息| C1[頻道: news]
P2[發布者 2] -->|訊息| C1
C1 --> S1[訂閱者 1]
C1 --> S2[訂閱者 2]
C1 --> S3[訂閱者 3]
1
2
3
4
5
6
2
3
4
5
6
# 🚀 快速開始
# 基本訂閱者
from redis_toolkit import RedisToolkit
# 定義訊息處理函數
def handle_message(channel, data):
print(f"收到來自 {channel} 的訊息:")
print(f"內容: {data}")
# 創建訂閱者
subscriber = RedisToolkit(
channels=["news", "updates"], # 訂閱多個頻道
message_handler=handle_message
)
# 訂閱者會在背景自動監聽訊息
print("訂閱者正在監聽...")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基本發布者
# 創建發布者
publisher = RedisToolkit()
# 發送訊息(自動序列化)
message = {
"type": "breaking_news",
"title": "重要更新",
"content": "Redis Toolkit 發布新版本!",
"timestamp": "2024-01-01 10:00:00"
}
publisher.publisher("news", message)
print("訊息已發送")
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 📡 進階訂閱功能
# 訂閱模式匹配
# 使用模式訂閱多個相關頻道
def pattern_handler(channel, data):
print(f"頻道 {channel}: {data}")
# 訂閱所有以 "user:" 開頭的頻道
pattern_subscriber = RedisToolkit(
channels=["user:*"], # 模式匹配
message_handler=pattern_handler
)
# 這些訊息都會被接收
publisher.publisher("user:login", {"user_id": 1001})
publisher.publisher("user:logout", {"user_id": 1002})
publisher.publisher("user:update", {"user_id": 1003})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 多頻道處理
# 根據頻道分發不同處理邏輯
def multi_channel_handler(channel, data):
if channel == "orders":
process_order(data)
elif channel == "payments":
process_payment(data)
elif channel.startswith("notifications:"):
send_notification(channel, data)
def process_order(order_data):
print(f"處理訂單: {order_data['order_id']}")
def process_payment(payment_data):
print(f"處理付款: ${payment_data['amount']}")
def send_notification(channel, notification):
user_id = channel.split(":")[1]
print(f"發送通知給用戶 {user_id}: {notification['message']}")
# 訂閱多種類型的頻道
subscriber = RedisToolkit(
channels=["orders", "payments", "notifications:*"],
message_handler=multi_channel_handler
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 🔄 雙向通訊範例
# 請求-回應模式
import uuid
import threading
import time
class RequestResponse:
def __init__(self):
self.toolkit = RedisToolkit()
self.responses = {}
# 啟動回應監聽器
self.response_listener = RedisToolkit(
channels=["responses:*"],
message_handler=self._handle_response
)
def send_request(self, request_data, timeout=5):
"""發送請求並等待回應"""
request_id = str(uuid.uuid4())
response_channel = f"responses:{request_id}"
# 準備等待回應
event = threading.Event()
self.responses[request_id] = {"event": event, "data": None}
# 發送請求
request = {
"id": request_id,
"response_channel": response_channel,
"data": request_data
}
self.toolkit.publisher("requests", request)
# 等待回應
if event.wait(timeout):
return self.responses[request_id]["data"]
else:
raise TimeoutError("請求超時")
def _handle_response(self, channel, data):
"""處理回應"""
request_id = channel.split(":")[1]
if request_id in self.responses:
self.responses[request_id]["data"] = data
self.responses[request_id]["event"].set()
# 請求處理器(另一個進程)
def request_processor():
def handle_request(channel, request):
print(f"處理請求: {request['id']}")
# 處理請求
result = {"status": "success", "result": len(request['data'])}
# 發送回應
toolkit = RedisToolkit()
toolkit.publisher(request['response_channel'], result)
processor = RedisToolkit(
channels=["requests"],
message_handler=handle_request
)
# 使用範例
client = RequestResponse()
response = client.send_request({"action": "calculate", "data": [1, 2, 3, 4, 5]})
print(f"收到回應: {response}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 🎨 實際應用場景
# 1. 即時通知系統
class NotificationSystem:
def __init__(self):
self.publisher = RedisToolkit()
def send_notification(self, user_id, notification):
"""發送通知給特定用戶"""
channel = f"notifications:user:{user_id}"
message = {
"id": str(uuid.uuid4()),
"type": notification['type'],
"title": notification['title'],
"body": notification['body'],
"timestamp": time.time(),
"read": False
}
self.publisher.publisher(channel, message)
# 同時發送到全域頻道供監控
self.publisher.publisher("notifications:all", {
"user_id": user_id,
"notification_id": message['id']
})
def broadcast(self, notification):
"""廣播通知給所有用戶"""
self.publisher.publisher("notifications:broadcast", {
"id": str(uuid.uuid4()),
"title": notification['title'],
"body": notification['body'],
"timestamp": time.time()
})
# 用戶端訂閱
class UserNotificationClient:
def __init__(self, user_id):
self.user_id = user_id
def handle_notification(channel, data):
if channel == f"notifications:user:{user_id}":
self.show_personal_notification(data)
elif channel == "notifications:broadcast":
self.show_broadcast_notification(data)
self.subscriber = RedisToolkit(
channels=[
f"notifications:user:{user_id}",
"notifications:broadcast"
],
message_handler=handle_notification
)
def show_personal_notification(self, notification):
print(f"[個人通知] {notification['title']}: {notification['body']}")
def show_broadcast_notification(self, notification):
print(f"[廣播] {notification['title']}: {notification['body']}")
# 使用範例
notifier = NotificationSystem()
client = UserNotificationClient(user_id=1001)
# 發送個人通知
notifier.send_notification(1001, {
"type": "order",
"title": "訂單更新",
"body": "您的訂單已發貨"
})
# 廣播通知
notifier.broadcast({
"title": "系統維護",
"body": "系統將於今晚 10 點進行維護"
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 2. 聊天室系統
class ChatRoom:
def __init__(self, room_id):
self.room_id = room_id
self.toolkit = RedisToolkit()
self.channel = f"chat:room:{room_id}"
def join(self, user_name, on_message):
"""加入聊天室"""
# 發送加入訊息
self.toolkit.publisher(self.channel, {
"type": "join",
"user": user_name,
"timestamp": time.time()
})
# 訂閱聊天室頻道
def message_handler(channel, data):
on_message(data)
subscriber = RedisToolkit(
channels=[self.channel],
message_handler=message_handler
)
return subscriber
def send_message(self, user_name, message):
"""發送訊息"""
self.toolkit.publisher(self.channel, {
"type": "message",
"user": user_name,
"text": message,
"timestamp": time.time()
})
def leave(self, user_name):
"""離開聊天室"""
self.toolkit.publisher(self.channel, {
"type": "leave",
"user": user_name,
"timestamp": time.time()
})
# 使用範例
def display_message(data):
if data["type"] == "join":
print(f"*** {data['user']} 加入聊天室 ***")
elif data["type"] == "message":
print(f"{data['user']}: {data['text']}")
elif data["type"] == "leave":
print(f"*** {data['user']} 離開聊天室 ***")
# 創建聊天室
room = ChatRoom("general")
# Alice 加入
alice_sub = room.join("Alice", display_message)
room.send_message("Alice", "大家好!")
# Bob 加入
bob_sub = room.join("Bob", display_message)
room.send_message("Bob", "嗨 Alice!")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 3. 任務佇列系統
class TaskQueue:
def __init__(self, queue_name):
self.queue_name = queue_name
self.toolkit = RedisToolkit()
self.channel = f"tasks:{queue_name}"
def submit_task(self, task_type, payload, priority="normal"):
"""提交任務"""
task = {
"id": str(uuid.uuid4()),
"type": task_type,
"payload": payload,
"priority": priority,
"submitted_at": time.time(),
"status": "pending"
}
# 根據優先級發送到不同頻道
channel = f"{self.channel}:{priority}"
self.toolkit.publisher(channel, task)
return task["id"]
def create_worker(self, worker_id, task_handler):
"""創建工作者"""
def handle_task(channel, task):
print(f"Worker {worker_id} 處理任務: {task['id']}")
try:
# 更新任務狀態
task["status"] = "processing"
task["worker_id"] = worker_id
task["started_at"] = time.time()
# 執行任務
result = task_handler(task)
# 完成任務
task["status"] = "completed"
task["result"] = result
task["completed_at"] = time.time()
# 發布完成事件
self.toolkit.publisher(f"tasks:completed", task)
except Exception as e:
# 任務失敗
task["status"] = "failed"
task["error"] = str(e)
task["failed_at"] = time.time()
# 發布失敗事件
self.toolkit.publisher(f"tasks:failed", task)
# 訂閱不同優先級的任務
return RedisToolkit(
channels=[
f"{self.channel}:high",
f"{self.channel}:normal",
f"{self.channel}:low"
],
message_handler=handle_task
)
# 使用範例
queue = TaskQueue("image_processing")
# 定義任務處理函數
def process_image(task):
print(f"處理圖片: {task['payload']['image_path']}")
time.sleep(2) # 模擬處理
return {"status": "processed", "size": "1024x768"}
# 創建工作者
worker1 = queue.create_worker("worker-1", process_image)
worker2 = queue.create_worker("worker-2", process_image)
# 提交任務
task_id = queue.submit_task("resize", {
"image_path": "/images/photo.jpg",
"width": 800,
"height": 600
}, priority="high")
print(f"任務已提交: {task_id}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# 🛡️ 錯誤處理與重連
# 自動重連機制
from redis_toolkit import RedisToolkit, RedisOptions
# 配置重連選項
options = RedisOptions(
subscriber_retry_delay=5, # 重連延遲(秒)
is_logger_info=True # 啟用日誌
)
def resilient_handler(channel, data):
try:
# 處理訊息
process_message(data)
except Exception as e:
# 記錄錯誤但不中斷訂閱
print(f"處理訊息時發生錯誤: {e}")
# 訂閱者會自動處理連線中斷和重連
subscriber = RedisToolkit(
channels=["important_events"],
message_handler=resilient_handler,
options=options
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 訊息確認機制
class ReliableMessaging:
def __init__(self):
self.toolkit = RedisToolkit()
def publish_with_ack(self, channel, message, timeout=5):
"""發布訊息並等待確認"""
msg_id = str(uuid.uuid4())
ack_channel = f"ack:{msg_id}"
# 準備接收確認
ack_received = threading.Event()
def ack_handler(ch, data):
if data.get("msg_id") == msg_id:
ack_received.set()
ack_subscriber = RedisToolkit(
channels=[ack_channel],
message_handler=ack_handler
)
# 發送訊息
message["_msg_id"] = msg_id
message["_ack_channel"] = ack_channel
self.toolkit.publisher(channel, message)
# 等待確認
if ack_received.wait(timeout):
print(f"訊息 {msg_id} 已確認")
return True
else:
print(f"訊息 {msg_id} 未收到確認")
return False
def create_ack_subscriber(self, channel):
"""創建會發送確認的訂閱者"""
def handler_with_ack(ch, data):
try:
# 處理訊息
process_message(data)
# 發送確認
if "_msg_id" in data and "_ack_channel" in data:
self.toolkit.publisher(data["_ack_channel"], {
"msg_id": data["_msg_id"],
"status": "acknowledged"
})
except Exception as e:
print(f"處理失敗: {e}")
return RedisToolkit(
channels=[channel],
message_handler=handler_with_ack
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 📊 效能優化
# 批次發布
# 批次發送多個訊息
def batch_publish(channel, messages):
toolkit = RedisToolkit()
pipe = toolkit.client.pipeline()
for msg in messages:
serialized = json.dumps(msg)
pipe.publish(channel, serialized)
pipe.execute()
# 使用範例
messages = [
{"id": i, "data": f"Message {i}"}
for i in range(1000)
]
batch_publish("bulk_updates", messages)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 訊息過濾
def filtered_handler(channel, data):
# 在處理器中過濾訊息
if data.get("priority") != "high":
return # 忽略非高優先級訊息
if data.get("user_id") not in allowed_users:
return # 忽略未授權用戶
# 處理符合條件的訊息
process_important_message(data)
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 🎯 最佳實踐
頻道命名規範
# 使用結構化的頻道名稱 "users:1001:notifications" # 用戶通知 "orders:status:pending" # 訂單狀態 "system:alerts:critical" # 系統警報
1
2
3
4訊息格式標準化
# 統一的訊息格式 message = { "id": str(uuid.uuid4()), "type": "event_type", "timestamp": time.time(), "version": "1.0", "data": {...} }
1
2
3
4
5
6
7
8避免阻塞操作
def non_blocking_handler(channel, data): # 將耗時操作放入佇列 task_queue.put({ "channel": channel, "data": data }) # 立即返回,不阻塞訂閱
1
2
3
4
5
6
7資源清理
# 正確清理訂閱者 try: # 使用訂閱者 subscriber = RedisToolkit(...) finally: # 確保清理 subscriber.cleanup()
1
2
3
4
5
6
7
# 📚 下一步
掌握了發布訂閱功能後,您可以:
小結
Redis Toolkit 的 Pub/Sub 功能讓即時通訊變得簡單。記住:合理設計頻道結構,標準化訊息格式,注意錯誤處理!