生產環境部署
將 pretty-loguru 部署到生產環境需要考慮可靠性、效能、安全性和可維護性。本指南提供企業級部署的最佳實踐。
🏗️ 部署架構
單機部署
python
# production_config.py
import os
from pretty_loguru import create_logger
def create_production_logger(service_name):
"""建立生產環境日誌配置"""
# 環境變數配置
log_level = os.getenv("LOG_LEVEL", "INFO")
log_path = os.getenv("LOG_PATH", f"/var/log/{service_name}")
environment = os.getenv("ENVIRONMENT", "production")
# 生產環境配置
logger = create_logger(
name=f"{service_name}_{environment}",
level=log_level,
log_path=f"{log_path}/{service_name}.log",
rotation="100 MB", # 適中的檔案大小
retention="30 days", # 合規要求的保留期
compression=True, # 壓縮節省空間
enqueue=True, # 非同步處理提升效能
backtrace=False, # 生產環境不需要完整追蹤
diagnose=False, # 關閉診斷資訊
catch=True, # 捕獲例外避免崩潰
# 生產環境格式
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {process} | {name} | {function}:{line} - {message}",
serialize=True # JSON 格式便於日誌聚合
)
return logger
# 使用範例
app_logger = create_production_logger("webapp")
db_logger = create_production_logger("database")
cache_logger = create_production_logger("redis")
微服務部署
python
# microservice_logging.py
import os
import socket
from pretty_loguru import create_logger
class MicroserviceLogger:
"""微服務日誌管理器"""
def __init__(self, service_name, version="1.0.0"):
self.service_name = service_name
self.version = version
self.instance_id = self._get_instance_id()
self.logger = self._create_logger()
def _get_instance_id(self):
"""獲取服務實例 ID"""
hostname = socket.gethostname()
pod_name = os.getenv("POD_NAME", hostname)
return f"{self.service_name}-{pod_name}"
def _create_logger(self):
"""建立微服務專用日誌記錄器"""
return create_logger(
name=self.service_name,
level=os.getenv("LOG_LEVEL", "INFO"),
log_path=f"/var/log/{self.service_name}/{self.service_name}.log",
rotation="daily",
retention="7 days",
compression=True,
enqueue=True,
# 微服務標準格式
format='{"timestamp": "{time:YYYY-MM-DD HH:mm:ss.SSS}", "level": "{level}", "service": "' + self.service_name + '", "instance": "' + self.instance_id + '", "version": "' + self.version + '", "message": "{message}", "extra": {extra}}',
serialize=True
)
def info(self, message, **kwargs):
"""資訊日誌"""
self.logger.info(message, extra=self._add_context(kwargs))
def error(self, message, **kwargs):
"""錯誤日誌"""
self.logger.error(message, extra=self._add_context(kwargs))
def warning(self, message, **kwargs):
"""警告日誌"""
self.logger.warning(message, extra=self._add_context(kwargs))
def _add_context(self, extra_data):
"""添加服務上下文"""
context = {
"service": self.service_name,
"instance": self.instance_id,
"version": self.version,
"environment": os.getenv("ENVIRONMENT", "production")
}
context.update(extra_data)
return context
# 使用微服務日誌器
logger = MicroserviceLogger("user-service", "2.1.0")
logger.info("服務啟動", port=8080, health_check="/health")
🐳 容器化部署
Docker 配置
dockerfile
# Dockerfile
FROM python:3.11-slim
# 安裝系統依賴
RUN apt-get update && apt-get install -y \
logrotate \
&& rm -rf /var/lib/apt/lists/*
# 建立應用用戶
RUN useradd -m -u 1000 appuser
# 建立日誌目錄
RUN mkdir -p /var/log/app && \
chown -R appuser:appuser /var/log/app
# 安裝 Python 依賴
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 複製應用程式
WORKDIR /app
COPY --chown=appuser:appuser . .
# 切換到非 root 用戶
USER appuser
# 健康檢查
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')"
# 啟動應用
EXPOSE 8080
CMD ["python", "app.py"]
Kubernetes 部署
yaml
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: webapp
labels:
app: webapp
spec:
replicas: 3
selector:
matchLabels:
app: webapp
template:
metadata:
labels:
app: webapp
spec:
containers:
- name: webapp
image: myapp:latest
ports:
- containerPort: 8080
env:
- name: LOG_LEVEL
value: "INFO"
- name: ENVIRONMENT
value: "production"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: logs
mountPath: /var/log/app
- name: config
mountPath: /app/config
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: logs
emptyDir: {}
- name: config
configMap:
name: webapp-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: webapp-config
data:
logging.json: |
{
"log_level": "INFO",
"rotation": "100 MB",
"retention": "7 days",
"compression": true
}
📊 監控與觀測
日誌聚合
python
# log_aggregation.py
import json
from datetime import datetime
from pretty_loguru import create_logger
class LogAggregator:
"""日誌聚合器 - 與 ELK Stack 整合"""
def __init__(self, service_name):
self.service_name = service_name
self.logger = create_logger(
name=f"aggregated_{service_name}",
level="INFO",
log_path=f"/var/log/aggregated/{service_name}.log",
rotation="hourly",
retention="24 hours",
# ELK 友好的格式
format='{"@timestamp": "{time:YYYY-MM-DDTHH:mm:ss.SSSZ}", "@version": "1", "host": "{host}", "level": "{level}", "logger_name": "{name}", "thread": "{thread}", "message": "{message}", "fields": {extra}}',
serialize=True
)
def log_request(self, method, path, status_code, response_time, user_id=None):
"""記錄 HTTP 請求"""
self.logger.info(
f"{method} {path} - {status_code}",
extra={
"event_type": "http_request",
"http": {
"method": method,
"path": path,
"status_code": status_code,
"response_time_ms": round(response_time * 1000, 2)
},
"user_id": user_id,
"service": self.service_name
}
)
def log_database_query(self, query_type, table, duration, rows_affected=None):
"""記錄資料庫查詢"""
self.logger.info(
f"DB {query_type} on {table}",
extra={
"event_type": "database_query",
"database": {
"query_type": query_type,
"table": table,
"duration_ms": round(duration * 1000, 2),
"rows_affected": rows_affected
},
"service": self.service_name
}
)
def log_business_event(self, event_name, event_data=None):
"""記錄業務事件"""
self.logger.info(
f"Business event: {event_name}",
extra={
"event_type": "business_event",
"event_name": event_name,
"event_data": event_data or {},
"service": self.service_name
}
)
# 使用日誌聚合器
aggregator = LogAggregator("user-service")
# 記錄不同類型的事件
aggregator.log_request("GET", "/api/users/123", 200, 0.045, user_id="user_123")
aggregator.log_database_query("SELECT", "users", 0.023, rows_affected=1)
aggregator.log_business_event("user_login", {"login_method": "oauth", "provider": "google"})
指標收集
python
# metrics_collection.py
import time
from collections import defaultdict, Counter
from pretty_loguru import create_logger
class MetricsCollector:
"""指標收集器"""
def __init__(self):
self.logger = create_logger(
name="metrics",
level="INFO",
log_path="/var/log/metrics/metrics.log",
rotation="hourly",
format='{"timestamp": "{time:YYYY-MM-DDTHH:mm:ss.SSSZ}", "metric_type": "application", "data": {extra}}',
serialize=True
)
# 指標存儲
self.counters = defaultdict(int)
self.gauges = defaultdict(float)
self.histograms = defaultdict(list)
self.timers = {}
def increment_counter(self, name, labels=None, value=1):
"""增加計數器"""
key = f"{name}:{labels or {}}"
self.counters[key] += value
self.logger.info(
f"Counter {name} incremented",
extra={
"metric_name": name,
"metric_type": "counter",
"value": value,
"labels": labels or {},
"total": self.counters[key]
}
)
def set_gauge(self, name, value, labels=None):
"""設定量表值"""
key = f"{name}:{labels or {}}"
self.gauges[key] = value
self.logger.info(
f"Gauge {name} set",
extra={
"metric_name": name,
"metric_type": "gauge",
"value": value,
"labels": labels or {}
}
)
def record_histogram(self, name, value, labels=None):
"""記錄直方圖值"""
key = f"{name}:{labels or {}}"
self.histograms[key].append(value)
# 計算統計值
values = self.histograms[key]
count = len(values)
sum_val = sum(values)
avg_val = sum_val / count
self.logger.info(
f"Histogram {name} recorded",
extra={
"metric_name": name,
"metric_type": "histogram",
"value": value,
"count": count,
"sum": sum_val,
"average": avg_val,
"labels": labels or {}
}
)
def start_timer(self, name, labels=None):
"""開始計時"""
key = f"{name}:{labels or {}}"
self.timers[key] = time.time()
def end_timer(self, name, labels=None):
"""結束計時並記錄"""
key = f"{name}:{labels or {}}"
if key in self.timers:
duration = time.time() - self.timers[key]
del self.timers[key]
self.record_histogram(f"{name}_duration", duration, labels)
return duration
return None
# 使用指標收集器
metrics = MetricsCollector()
# 記錄各種指標
metrics.increment_counter("http_requests_total", {"method": "GET", "status": "200"})
metrics.set_gauge("active_connections", 42)
metrics.start_timer("request_processing", {"endpoint": "/api/users"})
time.sleep(0.1) # 模擬處理時間
metrics.end_timer("request_processing", {"endpoint": "/api/users"})
🔒 安全配置
日誌安全
python
# secure_logging.py
import hashlib
import re
from pretty_loguru import create_logger
class SecureLogger:
"""安全日誌記錄器"""
def __init__(self, service_name):
self.service_name = service_name
self.sensitive_patterns = [
r'password\s*[=:]\s*["\']([^"\']+)["\']',
r'token\s*[=:]\s*["\']([^"\']+)["\']',
r'api[_-]?key\s*[=:]\s*["\']([^"\']+)["\']',
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', # 信用卡號
r'\b\d{3}-\d{2}-\d{4}\b' # SSN
]
self.logger = create_logger(
name=f"secure_{service_name}",
level="INFO",
log_path=f"/var/log/secure/{service_name}.log",
rotation="daily",
retention="90 days", # 安全日誌長期保留
compression=True,
# 確保檔案權限安全
enqueue=True
)
def sanitize_message(self, message):
"""清理敏感資訊"""
sanitized = message
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, r'***REDACTED***', sanitized, flags=re.IGNORECASE)
return sanitized
def hash_pii(self, data):
"""雜湊個人識別資訊"""
if isinstance(data, str):
return hashlib.sha256(data.encode()).hexdigest()[:16]
return data
def log_security_event(self, event_type, user_id, details=None):
"""記錄安全事件"""
sanitized_details = {}
if details:
for key, value in details.items():
if key in ['user_id', 'email', 'ip_address']:
sanitized_details[key] = self.hash_pii(str(value))
else:
sanitized_details[key] = self.sanitize_message(str(value))
self.logger.warning(
f"Security event: {event_type}",
extra={
"event_type": event_type,
"user_id": self.hash_pii(user_id),
"details": sanitized_details,
"service": self.service_name
}
)
def log_audit_event(self, action, resource, user_id, before_state=None, after_state=None):
"""記錄審計事件"""
self.logger.info(
f"Audit: {action} on {resource}",
extra={
"event_type": "audit",
"action": action,
"resource": resource,
"user_id": self.hash_pii(user_id),
"before_state": before_state,
"after_state": after_state,
"service": self.service_name
}
)
# 使用安全日誌記錄器
secure_logger = SecureLogger("auth-service")
secure_logger.log_security_event(
"login_attempt",
"user@example.com",
{"ip_address": "192.168.1.100", "user_agent": "Mozilla/5.0..."}
)
secure_logger.log_audit_event(
"update_profile",
"user_profile",
"user@example.com",
before_state={"name": "John"},
after_state={"name": "John Doe"}
)
🚨 錯誤處理與恢復
容錯機制
python
# fault_tolerance.py
import time
import threading
from queue import Queue, Empty
from pretty_loguru import create_logger
class FaultTolerantLogger:
"""容錯日誌記錄器"""
def __init__(self, service_name, backup_enabled=True):
self.service_name = service_name
self.backup_enabled = backup_enabled
self.failed_logs = Queue()
self.retry_thread = None
# 主要日誌記錄器
try:
self.primary_logger = create_logger(
name=f"primary_{service_name}",
level="INFO",
log_path=f"/var/log/{service_name}/{service_name}.log",
rotation="100 MB",
retention="30 days",
compression=True,
enqueue=True
)
except Exception as e:
print(f"主要日誌記錄器初始化失敗: {e}")
self.primary_logger = None
# 備份日誌記錄器(記憶體或備用位置)
if backup_enabled:
try:
self.backup_logger = create_logger(
name=f"backup_{service_name}",
level="INFO",
log_path=f"/tmp/{service_name}_backup.log",
rotation="50 MB",
retention="7 days"
)
except Exception as e:
print(f"備份日誌記錄器初始化失敗: {e}")
self.backup_logger = None
# 啟動重試機制
self._start_retry_mechanism()
def log(self, level, message, **kwargs):
"""容錯日誌記錄"""
log_entry = {
"level": level,
"message": message,
"extra": kwargs,
"timestamp": time.time()
}
try:
# 嘗試使用主要日誌記錄器
if self.primary_logger:
getattr(self.primary_logger, level.lower())(message, **kwargs)
return True
except Exception as e:
print(f"主要日誌記錄失敗: {e}")
# 將失敗的日誌加入重試佇列
self.failed_logs.put(log_entry)
try:
# 使用備份日誌記錄器
if self.backup_logger:
getattr(self.backup_logger, level.lower())(
f"[BACKUP] {message}",
**kwargs
)
return True
except Exception as e:
print(f"備份日誌記錄失敗: {e}")
# 所有日誌記錄器都失敗,輸出到控制台
print(f"[{level}] {message} | Extra: {kwargs}")
return False
def info(self, message, **kwargs):
return self.log("INFO", message, **kwargs)
def error(self, message, **kwargs):
return self.log("ERROR", message, **kwargs)
def warning(self, message, **kwargs):
return self.log("WARNING", message, **kwargs)
def _start_retry_mechanism(self):
"""啟動重試機制"""
def retry_worker():
while True:
try:
# 從失敗佇列中取出日誌
log_entry = self.failed_logs.get(timeout=30)
# 重試記錄日誌
if self.primary_logger:
try:
getattr(self.primary_logger, log_entry["level"].lower())(
f"[RETRY] {log_entry['message']}",
**log_entry["extra"]
)
except Exception:
# 重試失敗,重新加入佇列(限制重試次數)
retry_count = log_entry.get("retry_count", 0)
if retry_count < 3:
log_entry["retry_count"] = retry_count + 1
self.failed_logs.put(log_entry)
except Empty:
# 沒有待重試的日誌,繼續等待
continue
except Exception as e:
print(f"重試機制錯誤: {e}")
self.retry_thread = threading.Thread(target=retry_worker, daemon=True)
self.retry_thread.start()
# 使用容錯日誌記錄器
fault_tolerant_logger = FaultTolerantLogger("critical-service")
# 即使在磁碟空間不足或權限問題時也能記錄日誌
fault_tolerant_logger.info("應用程式啟動", version="2.0.0")
fault_tolerant_logger.error("處理請求時發生錯誤", error_code=500, user_id="user123")
📋 部署檢查清單
部署前檢查
bash
#!/bin/bash
# pre_deployment_check.sh
echo "🔍 生產環境部署檢查"
echo "===================="
# 1. 檢查磁碟空間
echo "1. 檢查磁碟空間..."
df -h /var/log
if [ $(df /var/log | tail -1 | awk '{print $5}' | sed 's/%//') -gt 80 ]; then
echo "❌ 警告:日誌磁碟空間使用率超過 80%"
else
echo "✅ 磁碟空間充足"
fi
# 2. 檢查日誌目錄權限
echo "2. 檢查日誌目錄權限..."
if [ -w /var/log/app ]; then
echo "✅ 日誌目錄可寫"
else
echo "❌ 警告:日誌目錄不可寫"
fi
# 3. 檢查環境變數
echo "3. 檢查環境變數..."
required_vars=("LOG_LEVEL" "ENVIRONMENT" "SERVICE_NAME")
for var in "${required_vars[@]}"; do
if [ -z "${!var}" ]; then
echo "❌ 警告:缺少環境變數 $var"
else
echo "✅ $var = ${!var}"
fi
done
# 4. 檢查日誌輪換配置
echo "4. 檢查 logrotate 配置..."
if [ -f /etc/logrotate.d/app ]; then
echo "✅ logrotate 配置存在"
else
echo "❌ 警告:缺少 logrotate 配置"
fi
# 5. 測試日誌寫入
echo "5. 測試日誌寫入..."
python3 -c "
from pretty_loguru import create_logger
try:
logger = create_logger('test', log_path='/var/log/app/test.log')
logger.info('部署測試')
print('✅ 日誌寫入測試成功')
except Exception as e:
print(f'❌ 日誌寫入測試失敗: {e}')
"
echo "===================="
echo "部署檢查完成"
生產環境配置範本
python
# production_template.py
"""
生產環境配置範本
適用於大多數 Web 應用程式和微服務
"""
import os
from pretty_loguru import create_logger
class ProductionConfig:
"""生產環境配置類"""
def __init__(self):
self.service_name = os.getenv("SERVICE_NAME", "app")
self.environment = os.getenv("ENVIRONMENT", "production")
self.log_level = os.getenv("LOG_LEVEL", "INFO")
self.log_path = os.getenv("LOG_PATH", f"/var/log/{self.service_name}")
def create_application_logger(self):
"""建立應用程式日誌記錄器"""
return create_logger(
name=f"{self.service_name}_app",
level=self.log_level,
log_path=f"{self.log_path}/application.log",
rotation="100 MB",
retention="30 days",
compression=True,
enqueue=True,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name} | {function}:{line} - {message}",
catch=True
)
def create_access_logger(self):
"""建立訪問日誌記錄器"""
return create_logger(
name=f"{self.service_name}_access",
level="INFO",
log_path=f"{self.log_path}/access.log",
rotation="daily",
retention="90 days",
compression=True,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}",
serialize=True
)
def create_error_logger(self):
"""建立錯誤日誌記錄器"""
return create_logger(
name=f"{self.service_name}_error",
level="WARNING",
log_path=f"{self.log_path}/error.log",
rotation="50 MB",
retention="90 days",
compression=True,
backtrace=True,
diagnose=True
)
def create_audit_logger(self):
"""建立審計日誌記錄器"""
return create_logger(
name=f"{self.service_name}_audit",
level="INFO",
log_path=f"{self.log_path}/audit.log",
rotation="daily",
retention="2555 days", # 7年合規要求
compression=True,
format='{"timestamp": "{time:YYYY-MM-DD HH:mm:ss.SSS}", "service": "' + self.service_name + '", "event": "audit", "message": "{message}", "data": {extra}}',
serialize=True
)
# 使用配置
config = ProductionConfig()
app_logger = config.create_application_logger()
access_logger = config.create_access_logger()
error_logger = config.create_error_logger()
audit_logger = config.create_audit_logger()
# 記錄不同類型的日誌
app_logger.info("應用程式啟動完成")
access_logger.info("用戶訪問", extra={"user_id": "123", "endpoint": "/api/users"})
error_logger.error("資料庫連接失敗", extra={"database": "postgres", "error": "timeout"})
audit_logger.info("用戶資料更新", extra={"user_id": "123", "action": "update_profile"})