第37章 可扩展性与高可用
"系统的可用性不是测试出来的,是设计出来的。每一次故障都是一次免费的架构课。" —— Google SRE 团队
37.1 概述:为什么高可用对 Agent 系统尤其重要
Agent 系统与传统的 Web 应用有一个根本区别:它依赖于外部 AI 模型服务(如 OpenAI、Anthropic 等)。这意味着你的系统不仅要面对自身基础设施的故障,还要应对上游供应商的不确定性。一个典型的 Agent 请求可能涉及 3-5 个内部服务 + 1-2 个外部 LLM 调用,任何一个环节的故障都会导致用户体验受损。
本章将系统地讨论如何构建一个可扩展、高可用的 Agent 平台。
37.1.1 可用性目标定义
| 级别 | 可用性 | 年停机时间 | 适用场景 |
|---|---|---|---|
| 基础 | 99% | 3.65 天 | 内部工具、开发环境 |
| 标准 | 99.9% | 8.76 小时 | 一般业务系统 |
| 高可用 | 99.95% | 4.38 小时 | 核心业务系统 |
| 极高可用 | 99.99% | 52.56 分钟 | 金融级 Agent 平台 |
Agent 平台推荐目标:99.95%(月停机 < 22 分钟)
37.2 水平扩展策略
37.2.1 无状态设计原则
水平扩展的前提是服务必须是无状态(Stateless)的。所有的会话状态、用户上下文都应该存储在外部(Redis、数据库),而不是服务进程内存中。
python
# ❌ 错误:有状态设计
class ChatService:
def __init__(self):
self.sessions = {} # 状态存储在内存中
def chat(self, session_id, message):
if session_id not in self.sessions:
self.sessions[session_id] = []
self.sessions[session_id].append(message)
# 问题:多实例间状态不共享
# ✅ 正确:无状态设计
class ChatService:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
async def chat(self, session_id, message):
# 状态存储在 Redis 中,所有实例可访问
context = await self.redis.get(f"ctx:{session_id}")
context = json.loads(context) if context else []
context.append(message)
await self.redis.setex(
f"ctx:{session_id}", 3600, json.dumps(context)
)37.2.2 有状态服务的特殊处理
某些服务天然是有状态的,需要特殊的扩展策略:
| 服务类型 | 状态类型 | 扩展策略 |
|---|---|---|
| Redis | 缓存数据 | Redis Cluster(分片 + 副本) |
| PostgreSQL | 持久化数据 | 读写分离 + 分库分表 |
| 向量数据库 | Embedding | Milvus 分布式集群 |
| WebSocket | 连接状态 | Sticky Session + Pub/Sub |
37.2.3 数据库扩展
读写分离配置:
yaml
# database_config.yaml
primary:
host: pg-primary.internal
port: 5432
pool_size: 20
replicas:
- host: pg-replica-1.internal
port: 5432
pool_size: 30
weight: 1 # 读权重
- host: pg-replica-2.internal
port: 5432
pool_size: 30
weight: 1
routing_rules:
# 写操作路由到主库
write: primary
# 读操作路由到从库(负载均衡)
read: replicas
# 强一致性读(写后立即读)
read_after_write: primary
# 事务内所有操作走主库
transaction: primary读写分离实现(Python):
python
# db_router.py
import threading
from contextvars import ContextVar
from typing import Optional
import psycopg
from psycopg_pool import PoolConnection
# 使用 ContextVar 跟踪当前线程/协程的路由决策
_route_context: ContextVar[str] = ContextVar('db_route', default='read')
class DatabaseRouter:
"""数据库读写分离路由"""
def __init__(self, primary_config, replica_configs):
self.primary_pool = psycopg_pool.ConnectionPool(
**primary_config, min_size=5, max_size=20
)
self.replica_pools = [
psycopg_pool.ConnectionPool(
**cfg, min_size=5, max_size=15
) for cfg in replica_configs
]
self._replica_index = 0
self._lock = threading.Lock()
def get_connection(self) -> PoolConnection:
"""根据路由规则获取连接"""
route = _route_context.get()
if route == 'write' or route == 'transaction':
return self.primary_pool.connection()
if route == 'read_after_write':
# 写后立即读,强制走主库(确保数据一致性)
return self.primary_pool.connection()
# 普通读,轮询从库
with self._lock:
idx = self._replica_index
self._replica_index = (
(idx + 1) % len(self.replica_pools)
)
return self.replica_pools[idx].connection()
async def execute_write(self, query, params=None):
"""执行写操作"""
token = _route_context.set('write')
try:
async with self.get_connection() as conn:
return await conn.execute(query, params)
finally:
_route_context.reset(token)
async def execute_read(self, query, params=None):
"""执行读操作"""
token = _route_context.set('read')
try:
async with self.get_connection() as conn:
return await conn.execute(query, params)
finally:
_route_context.reset(token)
# 使用示例
db = DatabaseRouter(primary_config, replica_configs)
# 写操作自动走主库
await db.execute_write(
"INSERT INTO messages (session_id, role, content) VALUES (%s, %s, %s)",
(session_id, 'user', message)
)
# 读操作自动走从库
rows = await db.execute_read(
"SELECT * FROM messages WHERE session_id = %s ORDER BY created_at",
(session_id,)
)37.3 负载均衡
37.3.1 多层负载均衡
mermaid
graph TB
Client[客户端] --> L4[L4 负载均衡<br/>TCP/UDP]
L4 --> L7[L7 负载均衡<br/>HTTP/gRPC]
L7 --> SvcA[Service A<br/>Pod 1-3]
L7 --> SvcB[Service B<br/>Pod 1-5]
L7 --> SvcC[Service C<br/>Pod 1-2]
style L4 fill:#e3f2fd
style L7 fill:#fff3e0| 层级 | 技术 | 职责 |
|---|---|---|
| L4 | Nginx / AWS NLB | TCP 连接分发、TLS 终止 |
| L7 | Nginx / AWS ALB | HTTP 路由、gRPC 代理、限流 |
| L7 Intra | Envoy / K8s Service | 集群内部服务间负载均衡 |
37.3.2 负载均衡算法选择
| 算法 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| Round Robin | 同构服务 | 简单公平 | 不考虑实例负载差异 |
| Weighted Round Robin | 异构实例 | 考虑实例能力 | 权重需要手动调整 |
| Least Connections | 长连接服务 | 动态均衡 | 需要连接数监控 |
| Consistent Hashing | 有状态服务 | 亲和性 | 不适合均匀分布 |
| Adaptive | 混合负载 | 最优 | 实现复杂 |
推荐:L4 使用 Round Robin,L7 使用 Least Connections + Health Check。
37.3.3 健康检查配置
yaml
# health_check_config.yaml
health_checks:
# L7 层健康检查
agent_service:
http:
path: /health
interval: 5s
timeout: 3s
healthy_threshold: 2
unhealthy_threshold: 3
# 深度健康检查(包含依赖检查)
deep:
path: /health/deep
interval: 30s
timeout: 10s
# 深度检查失败不影响流量(仅告警)
affect_routing: false
# 服务端健康检查实现
health_check_endpoints:
/health:
# 浅层检查:仅检查进程状态
checks:
- type: process
description: "进程存活"
/health/ready:
# 就绪检查:检查依赖是否可用
checks:
- type: database
description: "数据库连接"
timeout: 2s
- type: redis
description: "Redis 连接"
timeout: 1s
/health/deep:
# 深度检查:检查所有依赖
checks:
- type: database
description: "数据库连接"
query: "SELECT 1"
timeout: 2s
- type: redis
description: "Redis 连接"
timeout: 1s
- type: vector_db
description: "向量数据库连接"
timeout: 3s
- type: llm_provider
description: "LLM Provider 可达性"
timeout: 5s37.4 自动扩缩容
37.4.1 HPA 配置
Kubernetes Horizontal Pod Autoscaler 是最常见的自动扩缩容方案:
yaml
# hpa-config.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-service-hpa
namespace: agent-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-service
minReplicas: 3
maxReplicas: 50
behavior:
scaleUp:
stabilizationWindowSeconds: 60 # 扩容稳定窗口
policies:
- type: Pods
value: 4 # 每次最多增加4个Pod
periodSeconds: 60
- type: Percent
value: 100 # 或者增加当前数量的100%
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300 # 缩容稳定窗口(5分钟)
policies:
- type: Pods
value: 2 # 每次最多减少2个Pod
periodSeconds: 120
metrics:
# CPU 使用率
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# 自定义指标:并发请求数
- type: Pods
pods:
metric:
name: http_requests_in_flight
target:
type: AverageValue
averageValue: "100"
# 自定义指标:消息队列积压长度
- type: External
external:
metric:
name: rabbitmq_queue_messages_ready
selector:
matchLabels:
queue: agent.tasks
target:
type: AverageValue
averageValue: "50"37.4.2 基于预测的扩缩容
简单的阈值触发往往不够智能。基于历史数据的预测扩缩容可以提前应对流量高峰:
python
# predictive_scaler.py
import numpy as np
from datetime import datetime, timedelta
from typing import List, Tuple
class PredictiveScaler:
"""基于时间序列预测的智能扩缩容"""
def __init__(self, history_days: int = 14):
self.history_days = history_days
def predict_load(self, historical_data: List[Tuple[datetime, float]]
) -> List[Tuple[datetime, float]]:
"""预测未来24小时的负载"""
# 使用加权移动平均 + 周期性检测
timestamps = np.array([d.timestamp() for d, _ in historical_data])
values = np.array([v for _, v in historical_data])
predictions = []
now = datetime.utcnow()
for hour in range(24):
future_time = now + timedelta(hours=hour)
# 1. 趋势分量(线性回归)
trend = self._compute_trend(timestamps, values, future_time)
# 2. 周期分量(同小时历史平均)
cycle = self._compute_cycle(historical_data, future_time)
# 3. 预测值 = 趋势 + 周期
predicted = trend * 0.4 + cycle * 0.6
predictions.append((future_time, max(predicted, 0)))
return predictions
def recommend_replicas(
self,
predictions: List[Tuple[datetime, float]],
target_qps_per_pod: float = 500,
safety_margin: float = 1.3
) -> int:
"""根据预测推荐副本数"""
if not predictions:
return 3 # 默认最小值
peak_predicted = max(v for _, v in predictions)
recommended = int(np.ceil(
peak_predicted * safety_margin / target_qps_per_pod
))
return max(3, recommended) # 最少3个副本
def _compute_trend(self, timestamps, values, future_time):
"""线性趋势预测"""
if len(timestamps) < 2:
return np.mean(values)
coeffs = np.polyfit(timestamps, values, 1)
future_ts = future_time.timestamp()
return coeffs[0] * future_ts + coeffs[1]
def _compute_cycle(self, historical_data, future_time):
"""周期性分量(同小时历史数据的加权平均)"""
future_hour = future_time.hour
future_dow = future_time.weekday()
# 筛选相同时段的历史数据
matching = []
for dt, val in historical_data:
if dt.hour == future_hour:
# 越近的数据权重越高
age_days = (datetime.utcnow() - dt).days
weight = 1.0 / (1 + age_days)
matching.append((val, weight))
if not matching:
return np.mean([v for _, v in historical_data])
values = np.array([v for v, w in matching])
weights = np.array([w for v, w in matching])
return np.average(values, weights=weights)37.4.3 扩缩容策略矩阵
| 指标 | 扩容触发 | 缩容触发 | 冷却期 |
|---|---|---|---|
| CPU > 70% | 立即 | CPU < 30% 持续 5 分钟 | 扩容 60s / 缩容 300s |
| 并发请求数 > 100/pod | 立即 | < 50/pod 持续 5 分钟 | 同上 |
| 队列积压 > 50 | 立即 | < 10 持续 10 分钟 | 扩容 30s / 缩容 600s |
| 预测未来1h流量 > 容量80% | 提前 30 分钟 | - | - |
37.5 服务降级与熔断
37.5.1 降级策略设计
当系统面临异常流量或依赖服务故障时,降级是保障核心功能可用的最后防线。
mermaid
graph TD
Request[用户请求] --> Check{依赖检查}
Check -->|所有正常| FullService[完整功能]
Check -->|LLM 不可用| FallbackLLM[降级:备用模型]
Check -->|RAG 不可用| FallbackRAG[降级:纯对话模式]
Check -->|工具服务不可用| FallbackTool[降级:仅对话]
Check -->|所有依赖不可用| StaticResponse[静态响应 + 友好提示]降级层级定义:
| 级别 | 触发条件 | 降级行为 | 用户体验 |
|---|---|---|---|
| L0 | 正常 | 完整功能 | ★★★★★ |
| L1 | LLM 响应超时 | 切换备用模型 | ★★★★☆ |
| L2 | 所有 LLM 不可用 | 返回缓存响应 | ★★★☆☆ |
| L3 | RAG 服务不可用 | 跳过检索,纯对话 | ★★★☆☆ |
| L4 | 所有依赖故障 | 静态维护页面 | ★★☆☆☆ |
37.5.2 熔断器实现
python
# circuit_breaker.py
import time
import threading
from enum import Enum
from typing import Callable, Any
from dataclasses import dataclass
from collections import deque
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 熔断(拒绝请求)
HALF_OPEN = "half_open" # 半开(试探恢复)
@dataclass
class CircuitConfig:
failure_threshold: int = 5 # 连续失败次数触发熔断
recovery_timeout: float = 30.0 # 熔断恢复超时(秒)
half_open_max_calls: int = 3 # 半开状态最大试探请求数
success_threshold: int = 2 # 半开状态成功次数恢复关闭
timeout: float = 10.0 # 单次调用超时
class CircuitBreaker:
"""熔断器"""
def __init__(self, name: str, config: CircuitConfig = CircuitConfig()):
self.name = name
self.config = config
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = 0
self._half_open_calls = 0
self._lock = threading.Lock()
# 滑动窗口记录最近调用结果
self._window = deque(maxlen=100)
def call(self, func: Callable, *args, **kwargs) -> Any:
"""通过熔断器执行函数"""
if not self.allow():
raise CircuitOpenError(
f"Circuit '{self.name}' is open. "
f"Retry after {self._retry_after():.1f}s"
)
start = time.time()
try:
result = func(*args, **kwargs)
latency = time.time() - start
self._on_success(latency)
return result
except Exception as e:
latency = time.time() - start
self._on_failure(e, latency)
raise
def allow(self) -> bool:
"""检查是否允许请求通过"""
with self._lock:
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.config.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return True
return False
if self._state == CircuitState.HALF_OPEN:
if self._half_open_calls < self.config.half_open_max_calls:
self._half_open_calls += 1
return True
return False
return False
def _on_success(self, latency: float):
with self._lock:
self._window.append(('success', latency, time.time()))
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
self._state = CircuitState.CLOSED
self._success_count = 0
def _on_failure(self, error: Exception, latency: float):
with self._lock:
self._window.append(('failure', latency, time.time()))
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# 半开状态下失败,立即重新熔断
self._state = CircuitState.OPEN
self._success_count = 0
elif self._failure_count >= self.config.failure_threshold:
self._state = CircuitState.OPEN
def _retry_after(self) -> float:
elapsed = time.time() - self._last_failure_time
return max(0, self.config.recovery_timeout - elapsed)
def get_stats(self) -> dict:
"""获取熔断器统计信息"""
total = len(self._window)
failures = sum(1 for s, _, _ in self._window if s == 'failure')
latencies = [l for _, l, _ in self._window]
return {
"name": self.name,
"state": self._state.value,
"failure_count": self._failure_count,
"total_calls": total,
"failure_rate": failures / total if total else 0,
"avg_latency": sum(latencies) / len(latencies) if latencies else 0,
"retry_after": self._retry_after() if self._state == CircuitState.OPEN else 0
}
class CircuitOpenError(Exception):
"""熔断器打开异常"""
pass
# 使用示例
llm_breaker = CircuitBreaker("llm_provider", CircuitConfig(
failure_threshold=5,
recovery_timeout=30.0,
timeout=15.0
))
async def call_llm_with_protection(prompt: str, model: str):
try:
return await llm_breaker.call(
llm_client.chat_completion,
model=model, messages=[{"role": "user", "content": prompt}]
)
except CircuitOpenError:
# 降级:使用缓存或备用方案
return get_cached_response(prompt) or "系统繁忙,请稍后重试。"37.5.3 超时与重试策略
python
# retry_policy.py
import asyncio
import random
from typing import Callable, TypeVar, List, Type
from functools import wraps
T = TypeVar('T')
class RetryPolicy:
"""智能重试策略"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: List[Type[Exception]] = None,
retryable_status_codes: List[int] = None,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.retryable_exceptions = retryable_exceptions or [
ConnectionError, TimeoutError
]
self.retryable_status_codes = retryable_status_codes or [429, 502, 503, 504]
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
"""执行带重试的函数"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except tuple(self.retryable_exceptions) as e:
last_exception = e
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
self._log_retry(func.__name__, attempt, delay, e)
time.sleep(delay)
else:
raise
except Exception as e:
raise # 非重试异常直接抛出
raise last_exception
async def execute_async(self, func: Callable, *args, **kwargs) -> T:
"""异步版本的执行"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except tuple(self.retryable_exceptions) as e:
last_exception = e
if attempt < self.max_retries:
delay = self._calculate_delay(attempt)
self._log_retry(func.__name__, attempt, delay, e)
await asyncio.sleep(delay)
else:
raise
except Exception as e:
raise
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""指数退避 + 抖动"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
delay = delay * (0.5 + random.random() * 0.5)
return delay
def _log_retry(self, func_name, attempt, delay, error):
import logging
logging.warning(
f"Retry {attempt + 1}/{self.max_retries} for {func_name} "
f"after {delay:.1f}s: {error}"
)
# 针对 LLM API 的专门重试策略
llm_retry = RetryPolicy(
max_retries=3,
base_delay=1.0,
max_delay=30.0,
retryable_exceptions=[ConnectionError, TimeoutError],
retryable_status_codes=[429, 500, 502, 503],
)37.6 多区域部署
37.6.1 部署架构
mermaid
graph TB
DNS[Global DNS<br/>Anycast/GSLB]
subgraph RegionA["华东区域 (上海)"]
GW_A[Gateway]
Core_A[Agent Core]
DB_A[(DB Primary)]
Redis_A[(Redis)]
end
subgraph RegionB["华北区域 (北京)"]
GW_B[Gateway]
Core_B[Agent Core]
DB_B[(DB Replica)]
Redis_B[(Redis)]
end
subgraph RegionC["海外区域 (新加坡)"]
GW_C[Gateway]
Core_C[Agent Core]
DB_C[(DB Replica)]
Redis_C[(Redis)]
end
DNS -->|就近路由| GW_A & GW_B & GW_C
Core_A --> Core_B & Core_C
DB_A -->|异步复制| DB_B & DB_C37.6.2 跨区域数据同步
python
# cross_region_sync.py
from enum import Enum
from typing import Optional
import asyncio
class SyncMode(Enum):
SYNC = "sync" # 同步复制(强一致性)
ASYNC = "async" # 异步复制(最终一致性)
SEMI_SYNC = "semi" # 半同步
class CrossRegionReplicator:
"""跨区域数据复制器"""
def __init__(self, local_region: str, remote_regions: list):
self.local_region = local_region
self.remotes = {}
for region in remote_regions:
self.remotes[region] = self._connect(region)
async def replicate_write(
self,
table: str,
operation: str, # INSERT, UPDATE, DELETE
data: dict,
mode: SyncMode = SyncMode.ASYNC
) -> bool:
"""跨区域写入复制"""
# 1. 本地写入(始终先成功)
await self._local_write(table, operation, data)
if mode == SyncMode.SYNC:
# 同步模式:等待所有区域确认
tasks = [
self._remote_write(region, table, operation, data)
for region in self.remotes
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return all(not isinstance(r, Exception) for r in results)
elif mode == SyncMode.SEMI_SYNC:
# 半同步:至少一个远程区域确认
tasks = [
self._remote_write(region, table, operation, data)
for region in self.remotes
]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(
1 for r in results if not isinstance(r, Exception)
)
return success_count >= 1
else:
# 异步模式:fire-and-forget(通过消息队列)
await self._queue_replication(table, operation, data)
return True
def route_read(self, consistency: str = "eventual") -> str:
"""根据一致性要求路由读请求"""
if consistency == "strong":
return self.local_region # 强一致性读走主区域
# 最终一致性读:就近区域
return self._get_nearest_region()37.7 灾难恢复
37.7.1 灾备等级
| 等级 | RPO(数据丢失) | RTO(恢复时间) | 成本 | 方案 |
|---|---|---|---|---|
| 冷备 | 24小时 | 4-24小时 | 低 | 定期备份 + 异地存储 |
| 温备 | 1小时 | 1-4小时 | 中 | 异步复制 + 热待机 |
| 热备 | 分钟级 | 分钟级 | 高 | 同步复制 + 自动故障转移 |
| 多活 | 0 | 0(秒级切换) | 极高 | 多区域活跃-活跃 |
推荐:Agent 平台采用"温备"方案,核心数据 RPO < 1小时,RTO < 1小时。
37.7.2 备份策略
bash
#!/bin/bash
# backup_agent_platform.sh
# Agent 平台自动化备份脚本
set -euo pipefail
BACKUP_DIR="/data/backups/$(date +%Y-%m-%d_%H%M%S)"
RETENTION_DAYS=30
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"; }
# 1. PostgreSQL 全量备份
log "Starting PostgreSQL backup..."
mkdir -p "${BACKUP_DIR}/postgres"
pg_dump -Fc -f "${BACKUP_DIR}/postgres/agent_platform.dump" \
--no-owner --no-privileges \
postgresql://agent_app:PASSWORD@pg-primary:5432/agent_platform
# 2. Redis RDB 备份
log "Starting Redis backup..."
mkdir -p "${BACKUP_DIR}/redis"
redis-cli -h redis-primary --rdb - > "${BACKUP_DIR}/redis/dump.rdb"
# 3. 向量数据库备份
log "Starting vector DB backup..."
mkdir -p "${BACKUP_DIR}/vectors"
# Milvus 备份
python3 - <<'EOF'
from pymilvus import Collection, connections
connections.connect(host="milvus", port="19530")
# 导出集合元数据和数据
for name in ["documents", "embeddings"]:
col = Collection(name)
col.dump(f"{BACKUP_DIR}/vectors/{name}.json")
EOF
# 4. 配置文件备份
log "Starting config backup..."
mkdir -p "${BACKUP_DIR}/configs"
cp -r /etc/agent-platform/* "${BACKUP_DIR}/configs/"
# 5. 上传到对象存储(跨区域)
log "Uploading to remote storage..."
aws s3 sync "${BACKUP_DIR}" \
"s3://agent-platform-backups/$(date +%Y-%m)/" \
--storage-class STANDARD_IA
# 6. 清理旧备份
log "Cleaning old backups..."
find /data/backups -type d -mtime +${RETENTION_DAYS} -exec rm -rf {} +
# 7. 验证备份完整性
log "Verifying backup integrity..."
if [ -f "${BACKUP_DIR}/postgres/agent_platform.dump" ]; then
pg_restore --list "${BACKUP_DIR}/postgres/agent_platform.dump" > /dev/null
log "PostgreSQL backup verified ✓"
else
log "ERROR: PostgreSQL backup failed!"
exit 1
fi
log "Backup completed successfully. Size: $(du -sh ${BACKUP_DIR} | cut -f1)"37.7.3 故障转移演练
yaml
# disaster_recovery_playbook.yaml
# 灾难恢复演练手册
name: Agent Platform DR Drill
version: 1.0
last_drill: "2026-03-15"
next_drill: "2026-04-15"
scenarios:
- name: "区域级故障"
description: "华东区域完全不可用"
steps:
1:
action: "修改 DNS 将流量切换到华北区域"
owner: SRE
timeout: 5m
2:
action: "验证华北区域服务正常"
owner: QA
timeout: 10m
3:
action: "确认数据延迟在可接受范围"
owner: DBA
timeout: 15m
4:
action: "通知客户并更新状态页面"
owner: Support
timeout: 30m
recovery_time_target: 30min
- name: "数据库主节点故障"
description: "PostgreSQL 主节点宕机"
steps:
1:
action: "Promote 只读副本为主节点"
owner: DBA
timeout: 5m
2:
action: "更新应用数据库连接配置"
owner: SRE
timeout: 5m
3:
action: "验证读写功能正常"
owner: QA
timeout: 10m
recovery_time_target: 20min
- name: "LLM Provider 全面故障"
description: "所有 LLM 服务不可用"
steps:
1:
action: "熔断器自动打开"
owner: System
timeout: 0m
2:
action: "启用备用 LLM Provider"
owner: AI Team
timeout: 10m
3:
action: "降级为缓存响应模式"
owner: System
timeout: 1m
recovery_time_target: 10min37.8 混沌工程
37.8.1 Chaos Monkey 实践
python
# chaos_experiment.py
"""
Agent 平台混沌工程实验
谨慎在生产环境中使用!建议先在预发布环境充分验证。
"""
import random
import time
import logging
from typing import List, Callable
logger = logging.getLogger("chaos")
class ChaosExperiment:
"""混沌工程实验框架"""
def __init__(self, target_services: List[str]):
self.targets = target_services
self.safety_hooks = []
def add_safety_hook(self, hook: Callable):
"""添加安全钩子(实验前的检查条件)"""
self.safety_hooks.append(hook)
def check_safety(self) -> bool:
"""执行所有安全检查"""
for hook in self.safety_hooks:
if not hook():
logger.warning(f"Safety check failed: {hook.__name__}")
return False
return True
def inject_latency(self, service: str, duration_ms: int,
probability: float = 0.1):
"""注入延迟"""
if random.random() > probability:
return
logger.info(
f"[CHAOS] Injecting {duration_ms}ms latency to {service}"
)
time.sleep(duration_ms / 1000)
def simulate_failure(self, service: str, probability: float = 0.05):
"""模拟服务故障"""
if random.random() > probability:
return
logger.warning(f"[CHAOS] Simulating failure of {service}")
raise SimulatedFailure(
f"Chaos experiment: {service} is unavailable"
)
# 安全钩子示例
def check_not_peak_hours():
"""确保不在高峰期执行"""
hour = time.localtime().tm_hour
return hour < 9 or hour > 22 # 非高峰时段
def check_error_rate_below_threshold():
"""确保当前错误率低于阈值"""
current_error_rate = get_current_error_rate()
return current_error_rate < 0.01 # 错误率低于1%37.9 本章小结
本章系统地介绍了 Agent 平台的可扩展性与高可用策略:
- 无状态设计:水平扩展的基础,所有状态外置
- 负载均衡:多层 LB 架构,智能健康检查
- 自动扩缩容:基于 HPA + 预测的智能扩缩容
- 服务降级与熔断:多层降级策略 + 熔断器模式
- 多区域部署:跨区域流量调度和数据同步
- 灾难恢复:备份策略 + 故障转移 + 混沌工程
高可用不是一个目标,而是一个持续的过程。它需要设计、实现、测试、演练的闭环。下一章我们将讨论监控与告警体系——没有监控的可用性只是盲目的自信。