Skip to content

第37章 可扩展性与高可用

"系统的可用性不是测试出来的,是设计出来的。每一次故障都是一次免费的架构课。" —— Google SRE 团队

37.1 概述:为什么高可用对 Agent 系统尤其重要

Agent 系统与传统的 Web 应用有一个根本区别:它依赖于外部 AI 模型服务(如 OpenAI、Anthropic 等)。这意味着你的系统不仅要面对自身基础设施的故障,还要应对上游供应商的不确定性。一个典型的 Agent 请求可能涉及 3-5 个内部服务 + 1-2 个外部 LLM 调用,任何一个环节的故障都会导致用户体验受损。

本章将系统地讨论如何构建一个可扩展、高可用的 Agent 平台。

37.1.1 可用性目标定义

级别可用性年停机时间适用场景
基础99%3.65 天内部工具、开发环境
标准99.9%8.76 小时一般业务系统
高可用99.95%4.38 小时核心业务系统
极高可用99.99%52.56 分钟金融级 Agent 平台

Agent 平台推荐目标:99.95%(月停机 < 22 分钟)

37.2 水平扩展策略

37.2.1 无状态设计原则

水平扩展的前提是服务必须是无状态(Stateless)的。所有的会话状态、用户上下文都应该存储在外部(Redis、数据库),而不是服务进程内存中。

python
# ❌ 错误:有状态设计
class ChatService:
    def __init__(self):
        self.sessions = {}  # 状态存储在内存中
    
    def chat(self, session_id, message):
        if session_id not in self.sessions:
            self.sessions[session_id] = []
        self.sessions[session_id].append(message)
        # 问题:多实例间状态不共享

# ✅ 正确:无状态设计
class ChatService:
    def __init__(self, redis_client, db):
        self.redis = redis_client
        self.db = db
    
    async def chat(self, session_id, message):
        # 状态存储在 Redis 中,所有实例可访问
        context = await self.redis.get(f"ctx:{session_id}")
        context = json.loads(context) if context else []
        context.append(message)
        await self.redis.setex(
            f"ctx:{session_id}", 3600, json.dumps(context)
        )

37.2.2 有状态服务的特殊处理

某些服务天然是有状态的,需要特殊的扩展策略:

服务类型状态类型扩展策略
Redis缓存数据Redis Cluster(分片 + 副本)
PostgreSQL持久化数据读写分离 + 分库分表
向量数据库EmbeddingMilvus 分布式集群
WebSocket连接状态Sticky Session + Pub/Sub

37.2.3 数据库扩展

读写分离配置:

yaml
# database_config.yaml
primary:
  host: pg-primary.internal
  port: 5432
  pool_size: 20
  
replicas:
  - host: pg-replica-1.internal
    port: 5432
    pool_size: 30
    weight: 1  # 读权重
  - host: pg-replica-2.internal
    port: 5432
    pool_size: 30
    weight: 1

routing_rules:
  # 写操作路由到主库
  write: primary
  # 读操作路由到从库(负载均衡)
  read: replicas
  # 强一致性读(写后立即读)
  read_after_write: primary
  # 事务内所有操作走主库
  transaction: primary

读写分离实现(Python):

python
# db_router.py
import threading
from contextvars import ContextVar
from typing import Optional
import psycopg
from psycopg_pool import PoolConnection

# 使用 ContextVar 跟踪当前线程/协程的路由决策
_route_context: ContextVar[str] = ContextVar('db_route', default='read')

class DatabaseRouter:
    """数据库读写分离路由"""
    
    def __init__(self, primary_config, replica_configs):
        self.primary_pool = psycopg_pool.ConnectionPool(
            **primary_config, min_size=5, max_size=20
        )
        self.replica_pools = [
            psycopg_pool.ConnectionPool(
                **cfg, min_size=5, max_size=15
            ) for cfg in replica_configs
        ]
        self._replica_index = 0
        self._lock = threading.Lock()
    
    def get_connection(self) -> PoolConnection:
        """根据路由规则获取连接"""
        route = _route_context.get()
        
        if route == 'write' or route == 'transaction':
            return self.primary_pool.connection()
        
        if route == 'read_after_write':
            # 写后立即读,强制走主库(确保数据一致性)
            return self.primary_pool.connection()
        
        # 普通读,轮询从库
        with self._lock:
            idx = self._replica_index
            self._replica_index = (
                (idx + 1) % len(self.replica_pools)
            )
        return self.replica_pools[idx].connection()
    
    async def execute_write(self, query, params=None):
        """执行写操作"""
        token = _route_context.set('write')
        try:
            async with self.get_connection() as conn:
                return await conn.execute(query, params)
        finally:
            _route_context.reset(token)
    
    async def execute_read(self, query, params=None):
        """执行读操作"""
        token = _route_context.set('read')
        try:
            async with self.get_connection() as conn:
                return await conn.execute(query, params)
        finally:
            _route_context.reset(token)

# 使用示例
db = DatabaseRouter(primary_config, replica_configs)

# 写操作自动走主库
await db.execute_write(
    "INSERT INTO messages (session_id, role, content) VALUES (%s, %s, %s)",
    (session_id, 'user', message)
)

# 读操作自动走从库
rows = await db.execute_read(
    "SELECT * FROM messages WHERE session_id = %s ORDER BY created_at",
    (session_id,)
)

37.3 负载均衡

37.3.1 多层负载均衡

mermaid
graph TB
    Client[客户端] --> L4[L4 负载均衡<br/>TCP/UDP]
    L4 --> L7[L7 负载均衡<br/>HTTP/gRPC]
    L7 --> SvcA[Service A<br/>Pod 1-3]
    L7 --> SvcB[Service B<br/>Pod 1-5]
    L7 --> SvcC[Service C<br/>Pod 1-2]
    
    style L4 fill:#e3f2fd
    style L7 fill:#fff3e0
层级技术职责
L4Nginx / AWS NLBTCP 连接分发、TLS 终止
L7Nginx / AWS ALBHTTP 路由、gRPC 代理、限流
L7 IntraEnvoy / K8s Service集群内部服务间负载均衡

37.3.2 负载均衡算法选择

算法适用场景优势劣势
Round Robin同构服务简单公平不考虑实例负载差异
Weighted Round Robin异构实例考虑实例能力权重需要手动调整
Least Connections长连接服务动态均衡需要连接数监控
Consistent Hashing有状态服务亲和性不适合均匀分布
Adaptive混合负载最优实现复杂

推荐:L4 使用 Round Robin,L7 使用 Least Connections + Health Check。

37.3.3 健康检查配置

yaml
# health_check_config.yaml
health_checks:
  # L7 层健康检查
  agent_service:
    http:
      path: /health
      interval: 5s
      timeout: 3s
      healthy_threshold: 2
      unhealthy_threshold: 3
    
    # 深度健康检查(包含依赖检查)
    deep:
      path: /health/deep
      interval: 30s
      timeout: 10s
      # 深度检查失败不影响流量(仅告警)
      affect_routing: false

# 服务端健康检查实现
health_check_endpoints:
  /health:
    # 浅层检查:仅检查进程状态
    checks:
      - type: process
        description: "进程存活"
  
  /health/ready:
    # 就绪检查:检查依赖是否可用
    checks:
      - type: database
        description: "数据库连接"
        timeout: 2s
      - type: redis
        description: "Redis 连接"
        timeout: 1s
  
  /health/deep:
    # 深度检查:检查所有依赖
    checks:
      - type: database
        description: "数据库连接"
        query: "SELECT 1"
        timeout: 2s
      - type: redis
        description: "Redis 连接"
        timeout: 1s
      - type: vector_db
        description: "向量数据库连接"
        timeout: 3s
      - type: llm_provider
        description: "LLM Provider 可达性"
        timeout: 5s

37.4 自动扩缩容

37.4.1 HPA 配置

Kubernetes Horizontal Pod Autoscaler 是最常见的自动扩缩容方案:

yaml
# hpa-config.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-service-hpa
  namespace: agent-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-service
  minReplicas: 3
  maxReplicas: 50
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60   # 扩容稳定窗口
      policies:
      - type: Pods
        value: 4                       # 每次最多增加4个Pod
        periodSeconds: 60
      - type: Percent
        value: 100                     # 或者增加当前数量的100%
        periodSeconds: 60
      selectPolicy: Max
    scaleDown:
      stabilizationWindowSeconds: 300  # 缩容稳定窗口(5分钟)
      policies:
      - type: Pods
        value: 2                       # 每次最多减少2个Pod
        periodSeconds: 120
  metrics:
  # CPU 使用率
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  
  # 自定义指标:并发请求数
  - type: Pods
    pods:
      metric:
        name: http_requests_in_flight
      target:
        type: AverageValue
        averageValue: "100"
  
  # 自定义指标:消息队列积压长度
  - type: External
    external:
      metric:
        name: rabbitmq_queue_messages_ready
        selector:
          matchLabels:
            queue: agent.tasks
      target:
        type: AverageValue
        averageValue: "50"

37.4.2 基于预测的扩缩容

简单的阈值触发往往不够智能。基于历史数据的预测扩缩容可以提前应对流量高峰:

python
# predictive_scaler.py
import numpy as np
from datetime import datetime, timedelta
from typing import List, Tuple

class PredictiveScaler:
    """基于时间序列预测的智能扩缩容"""
    
    def __init__(self, history_days: int = 14):
        self.history_days = history_days
    
    def predict_load(self, historical_data: List[Tuple[datetime, float]]
                     ) -> List[Tuple[datetime, float]]:
        """预测未来24小时的负载"""
        # 使用加权移动平均 + 周期性检测
        timestamps = np.array([d.timestamp() for d, _ in historical_data])
        values = np.array([v for _, v in historical_data])
        
        predictions = []
        now = datetime.utcnow()
        
        for hour in range(24):
            future_time = now + timedelta(hours=hour)
            
            # 1. 趋势分量(线性回归)
            trend = self._compute_trend(timestamps, values, future_time)
            
            # 2. 周期分量(同小时历史平均)
            cycle = self._compute_cycle(historical_data, future_time)
            
            # 3. 预测值 = 趋势 + 周期
            predicted = trend * 0.4 + cycle * 0.6
            predictions.append((future_time, max(predicted, 0)))
        
        return predictions
    
    def recommend_replicas(
        self,
        predictions: List[Tuple[datetime, float]],
        target_qps_per_pod: float = 500,
        safety_margin: float = 1.3
    ) -> int:
        """根据预测推荐副本数"""
        if not predictions:
            return 3  # 默认最小值
        
        peak_predicted = max(v for _, v in predictions)
        recommended = int(np.ceil(
            peak_predicted * safety_margin / target_qps_per_pod
        ))
        return max(3, recommended)  # 最少3个副本
    
    def _compute_trend(self, timestamps, values, future_time):
        """线性趋势预测"""
        if len(timestamps) < 2:
            return np.mean(values)
        
        coeffs = np.polyfit(timestamps, values, 1)
        future_ts = future_time.timestamp()
        return coeffs[0] * future_ts + coeffs[1]
    
    def _compute_cycle(self, historical_data, future_time):
        """周期性分量(同小时历史数据的加权平均)"""
        future_hour = future_time.hour
        future_dow = future_time.weekday()
        
        # 筛选相同时段的历史数据
        matching = []
        for dt, val in historical_data:
            if dt.hour == future_hour:
                # 越近的数据权重越高
                age_days = (datetime.utcnow() - dt).days
                weight = 1.0 / (1 + age_days)
                matching.append((val, weight))
        
        if not matching:
            return np.mean([v for _, v in historical_data])
        
        values = np.array([v for v, w in matching])
        weights = np.array([w for v, w in matching])
        return np.average(values, weights=weights)

37.4.3 扩缩容策略矩阵

指标扩容触发缩容触发冷却期
CPU > 70%立即CPU < 30% 持续 5 分钟扩容 60s / 缩容 300s
并发请求数 > 100/pod立即< 50/pod 持续 5 分钟同上
队列积压 > 50立即< 10 持续 10 分钟扩容 30s / 缩容 600s
预测未来1h流量 > 容量80%提前 30 分钟--

37.5 服务降级与熔断

37.5.1 降级策略设计

当系统面临异常流量或依赖服务故障时,降级是保障核心功能可用的最后防线。

mermaid
graph TD
    Request[用户请求] --> Check{依赖检查}
    Check -->|所有正常| FullService[完整功能]
    Check -->|LLM 不可用| FallbackLLM[降级:备用模型]
    Check -->|RAG 不可用| FallbackRAG[降级:纯对话模式]
    Check -->|工具服务不可用| FallbackTool[降级:仅对话]
    Check -->|所有依赖不可用| StaticResponse[静态响应 + 友好提示]

降级层级定义:

级别触发条件降级行为用户体验
L0正常完整功能★★★★★
L1LLM 响应超时切换备用模型★★★★☆
L2所有 LLM 不可用返回缓存响应★★★☆☆
L3RAG 服务不可用跳过检索,纯对话★★★☆☆
L4所有依赖故障静态维护页面★★☆☆☆

37.5.2 熔断器实现

python
# circuit_breaker.py
import time
import threading
from enum import Enum
from typing import Callable, Any
from dataclasses import dataclass
from collections import deque

class CircuitState(Enum):
    CLOSED = "closed"       # 正常
    OPEN = "open"           # 熔断(拒绝请求)
    HALF_OPEN = "half_open" # 半开(试探恢复)

@dataclass
class CircuitConfig:
    failure_threshold: int = 5       # 连续失败次数触发熔断
    recovery_timeout: float = 30.0   # 熔断恢复超时(秒)
    half_open_max_calls: int = 3     # 半开状态最大试探请求数
    success_threshold: int = 2       # 半开状态成功次数恢复关闭
    timeout: float = 10.0            # 单次调用超时

class CircuitBreaker:
    """熔断器"""
    
    def __init__(self, name: str, config: CircuitConfig = CircuitConfig()):
        self.name = name
        self.config = config
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time = 0
        self._half_open_calls = 0
        self._lock = threading.Lock()
        # 滑动窗口记录最近调用结果
        self._window = deque(maxlen=100)
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """通过熔断器执行函数"""
        if not self.allow():
            raise CircuitOpenError(
                f"Circuit '{self.name}' is open. "
                f"Retry after {self._retry_after():.1f}s"
            )
        
        start = time.time()
        try:
            result = func(*args, **kwargs)
            latency = time.time() - start
            self._on_success(latency)
            return result
        except Exception as e:
            latency = time.time() - start
            self._on_failure(e, latency)
            raise
    
    def allow(self) -> bool:
        """检查是否允许请求通过"""
        with self._lock:
            if self._state == CircuitState.CLOSED:
                return True
            
            if self._state == CircuitState.OPEN:
                if time.time() - self._last_failure_time > self.config.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._half_open_calls = 0
                    return True
                return False
            
            if self._state == CircuitState.HALF_OPEN:
                if self._half_open_calls < self.config.half_open_max_calls:
                    self._half_open_calls += 1
                    return True
                return False
            
            return False
    
    def _on_success(self, latency: float):
        with self._lock:
            self._window.append(('success', latency, time.time()))
            self._failure_count = 0
            
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.config.success_threshold:
                    self._state = CircuitState.CLOSED
                    self._success_count = 0
    
    def _on_failure(self, error: Exception, latency: float):
        with self._lock:
            self._window.append(('failure', latency, time.time()))
            self._failure_count += 1
            self._last_failure_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                # 半开状态下失败,立即重新熔断
                self._state = CircuitState.OPEN
                self._success_count = 0
            elif self._failure_count >= self.config.failure_threshold:
                self._state = CircuitState.OPEN
    
    def _retry_after(self) -> float:
        elapsed = time.time() - self._last_failure_time
        return max(0, self.config.recovery_timeout - elapsed)
    
    def get_stats(self) -> dict:
        """获取熔断器统计信息"""
        total = len(self._window)
        failures = sum(1 for s, _, _ in self._window if s == 'failure')
        latencies = [l for _, l, _ in self._window]
        
        return {
            "name": self.name,
            "state": self._state.value,
            "failure_count": self._failure_count,
            "total_calls": total,
            "failure_rate": failures / total if total else 0,
            "avg_latency": sum(latencies) / len(latencies) if latencies else 0,
            "retry_after": self._retry_after() if self._state == CircuitState.OPEN else 0
        }

class CircuitOpenError(Exception):
    """熔断器打开异常"""
    pass

# 使用示例
llm_breaker = CircuitBreaker("llm_provider", CircuitConfig(
    failure_threshold=5,
    recovery_timeout=30.0,
    timeout=15.0
))

async def call_llm_with_protection(prompt: str, model: str):
    try:
        return await llm_breaker.call(
            llm_client.chat_completion,
            model=model, messages=[{"role": "user", "content": prompt}]
        )
    except CircuitOpenError:
        # 降级:使用缓存或备用方案
        return get_cached_response(prompt) or "系统繁忙,请稍后重试。"

37.5.3 超时与重试策略

python
# retry_policy.py
import asyncio
import random
from typing import Callable, TypeVar, List, Type
from functools import wraps

T = TypeVar('T')

class RetryPolicy:
    """智能重试策略"""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retryable_exceptions: List[Type[Exception]] = None,
        retryable_status_codes: List[int] = None,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retryable_exceptions = retryable_exceptions or [
            ConnectionError, TimeoutError
        ]
        self.retryable_status_codes = retryable_status_codes or [429, 502, 503, 504]
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        """执行带重试的函数"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
            except tuple(self.retryable_exceptions) as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = self._calculate_delay(attempt)
                    self._log_retry(func.__name__, attempt, delay, e)
                    time.sleep(delay)
                else:
                    raise
            except Exception as e:
                raise  # 非重试异常直接抛出
        
        raise last_exception
    
    async def execute_async(self, func: Callable, *args, **kwargs) -> T:
        """异步版本的执行"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await func(*args, **kwargs)
            except tuple(self.retryable_exceptions) as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = self._calculate_delay(attempt)
                    self._log_retry(func.__name__, attempt, delay, e)
                    await asyncio.sleep(delay)
                else:
                    raise
            except Exception as e:
                raise
        
        raise last_exception
    
    def _calculate_delay(self, attempt: int) -> float:
        """指数退避 + 抖动"""
        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            delay = delay * (0.5 + random.random() * 0.5)
        
        return delay
    
    def _log_retry(self, func_name, attempt, delay, error):
        import logging
        logging.warning(
            f"Retry {attempt + 1}/{self.max_retries} for {func_name} "
            f"after {delay:.1f}s: {error}"
        )

# 针对 LLM API 的专门重试策略
llm_retry = RetryPolicy(
    max_retries=3,
    base_delay=1.0,
    max_delay=30.0,
    retryable_exceptions=[ConnectionError, TimeoutError],
    retryable_status_codes=[429, 500, 502, 503],
)

37.6 多区域部署

37.6.1 部署架构

mermaid
graph TB
    DNS[Global DNS<br/>Anycast/GSLB]
    
    subgraph RegionA["华东区域 (上海)"]
        GW_A[Gateway]
        Core_A[Agent Core]
        DB_A[(DB Primary)]
        Redis_A[(Redis)]
    end
    
    subgraph RegionB["华北区域 (北京)"]
        GW_B[Gateway]
        Core_B[Agent Core]
        DB_B[(DB Replica)]
        Redis_B[(Redis)]
    end
    
    subgraph RegionC["海外区域 (新加坡)"]
        GW_C[Gateway]
        Core_C[Agent Core]
        DB_C[(DB Replica)]
        Redis_C[(Redis)]
    end
    
    DNS -->|就近路由| GW_A & GW_B & GW_C
    Core_A --> Core_B & Core_C
    DB_A -->|异步复制| DB_B & DB_C

37.6.2 跨区域数据同步

python
# cross_region_sync.py
from enum import Enum
from typing import Optional
import asyncio

class SyncMode(Enum):
    SYNC = "sync"         # 同步复制(强一致性)
    ASYNC = "async"       # 异步复制(最终一致性)
    SEMI_SYNC = "semi"    # 半同步

class CrossRegionReplicator:
    """跨区域数据复制器"""
    
    def __init__(self, local_region: str, remote_regions: list):
        self.local_region = local_region
        self.remotes = {}
        for region in remote_regions:
            self.remotes[region] = self._connect(region)
    
    async def replicate_write(
        self,
        table: str,
        operation: str,  # INSERT, UPDATE, DELETE
        data: dict,
        mode: SyncMode = SyncMode.ASYNC
    ) -> bool:
        """跨区域写入复制"""
        # 1. 本地写入(始终先成功)
        await self._local_write(table, operation, data)
        
        if mode == SyncMode.SYNC:
            # 同步模式:等待所有区域确认
            tasks = [
                self._remote_write(region, table, operation, data)
                for region in self.remotes
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return all(not isinstance(r, Exception) for r in results)
        
        elif mode == SyncMode.SEMI_SYNC:
            # 半同步:至少一个远程区域确认
            tasks = [
                self._remote_write(region, table, operation, data)
                for region in self.remotes
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            success_count = sum(
                1 for r in results if not isinstance(r, Exception)
            )
            return success_count >= 1
        
        else:
            # 异步模式:fire-and-forget(通过消息队列)
            await self._queue_replication(table, operation, data)
            return True
    
    def route_read(self, consistency: str = "eventual") -> str:
        """根据一致性要求路由读请求"""
        if consistency == "strong":
            return self.local_region  # 强一致性读走主区域
        
        # 最终一致性读:就近区域
        return self._get_nearest_region()

37.7 灾难恢复

37.7.1 灾备等级

等级RPO(数据丢失)RTO(恢复时间)成本方案
冷备24小时4-24小时定期备份 + 异地存储
温备1小时1-4小时异步复制 + 热待机
热备分钟级分钟级同步复制 + 自动故障转移
多活00(秒级切换)极高多区域活跃-活跃

推荐:Agent 平台采用"温备"方案,核心数据 RPO < 1小时,RTO < 1小时。

37.7.2 备份策略

bash
#!/bin/bash
# backup_agent_platform.sh
# Agent 平台自动化备份脚本

set -euo pipefail

BACKUP_DIR="/data/backups/$(date +%Y-%m-%d_%H%M%S)"
RETENTION_DAYS=30

log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"; }

# 1. PostgreSQL 全量备份
log "Starting PostgreSQL backup..."
mkdir -p "${BACKUP_DIR}/postgres"
pg_dump -Fc -f "${BACKUP_DIR}/postgres/agent_platform.dump" \
    --no-owner --no-privileges \
    postgresql://agent_app:PASSWORD@pg-primary:5432/agent_platform

# 2. Redis RDB 备份
log "Starting Redis backup..."
mkdir -p "${BACKUP_DIR}/redis"
redis-cli -h redis-primary --rdb - > "${BACKUP_DIR}/redis/dump.rdb"

# 3. 向量数据库备份
log "Starting vector DB backup..."
mkdir -p "${BACKUP_DIR}/vectors"
# Milvus 备份
python3 - <<'EOF'
from pymilvus import Collection, connections
connections.connect(host="milvus", port="19530")
# 导出集合元数据和数据
for name in ["documents", "embeddings"]:
    col = Collection(name)
    col.dump(f"{BACKUP_DIR}/vectors/{name}.json")
EOF

# 4. 配置文件备份
log "Starting config backup..."
mkdir -p "${BACKUP_DIR}/configs"
cp -r /etc/agent-platform/* "${BACKUP_DIR}/configs/"

# 5. 上传到对象存储(跨区域)
log "Uploading to remote storage..."
aws s3 sync "${BACKUP_DIR}" \
    "s3://agent-platform-backups/$(date +%Y-%m)/" \
    --storage-class STANDARD_IA

# 6. 清理旧备份
log "Cleaning old backups..."
find /data/backups -type d -mtime +${RETENTION_DAYS} -exec rm -rf {} +

# 7. 验证备份完整性
log "Verifying backup integrity..."
if [ -f "${BACKUP_DIR}/postgres/agent_platform.dump" ]; then
    pg_restore --list "${BACKUP_DIR}/postgres/agent_platform.dump" > /dev/null
    log "PostgreSQL backup verified ✓"
else
    log "ERROR: PostgreSQL backup failed!"
    exit 1
fi

log "Backup completed successfully. Size: $(du -sh ${BACKUP_DIR} | cut -f1)"

37.7.3 故障转移演练

yaml
# disaster_recovery_playbook.yaml
# 灾难恢复演练手册

name: Agent Platform DR Drill
version: 1.0
last_drill: "2026-03-15"
next_drill: "2026-04-15"

scenarios:
  - name: "区域级故障"
    description: "华东区域完全不可用"
    steps:
      1:
        action: "修改 DNS 将流量切换到华北区域"
        owner: SRE
        timeout: 5m
      2:
        action: "验证华北区域服务正常"
        owner: QA
        timeout: 10m
      3:
        action: "确认数据延迟在可接受范围"
        owner: DBA
        timeout: 15m
      4:
        action: "通知客户并更新状态页面"
        owner: Support
        timeout: 30m
    recovery_time_target: 30min
    
  - name: "数据库主节点故障"
    description: "PostgreSQL 主节点宕机"
    steps:
      1:
        action: "Promote 只读副本为主节点"
        owner: DBA
        timeout: 5m
      2:
        action: "更新应用数据库连接配置"
        owner: SRE
        timeout: 5m
      3:
        action: "验证读写功能正常"
        owner: QA
        timeout: 10m
    recovery_time_target: 20min
    
  - name: "LLM Provider 全面故障"
    description: "所有 LLM 服务不可用"
    steps:
      1:
        action: "熔断器自动打开"
        owner: System
        timeout: 0m
      2:
        action: "启用备用 LLM Provider"
        owner: AI Team
        timeout: 10m
      3:
        action: "降级为缓存响应模式"
        owner: System
        timeout: 1m
    recovery_time_target: 10min

37.8 混沌工程

37.8.1 Chaos Monkey 实践

python
# chaos_experiment.py
"""
Agent 平台混沌工程实验
谨慎在生产环境中使用!建议先在预发布环境充分验证。
"""
import random
import time
import logging
from typing import List, Callable

logger = logging.getLogger("chaos")

class ChaosExperiment:
    """混沌工程实验框架"""
    
    def __init__(self, target_services: List[str]):
        self.targets = target_services
        self.safety_hooks = []
    
    def add_safety_hook(self, hook: Callable):
        """添加安全钩子(实验前的检查条件)"""
        self.safety_hooks.append(hook)
    
    def check_safety(self) -> bool:
        """执行所有安全检查"""
        for hook in self.safety_hooks:
            if not hook():
                logger.warning(f"Safety check failed: {hook.__name__}")
                return False
        return True
    
    def inject_latency(self, service: str, duration_ms: int,
                       probability: float = 0.1):
        """注入延迟"""
        if random.random() > probability:
            return
        
        logger.info(
            f"[CHAOS] Injecting {duration_ms}ms latency to {service}"
        )
        time.sleep(duration_ms / 1000)
    
    def simulate_failure(self, service: str, probability: float = 0.05):
        """模拟服务故障"""
        if random.random() > probability:
            return
        
        logger.warning(f"[CHAOS] Simulating failure of {service}")
        raise SimulatedFailure(
            f"Chaos experiment: {service} is unavailable"
        )

# 安全钩子示例
def check_not_peak_hours():
    """确保不在高峰期执行"""
    hour = time.localtime().tm_hour
    return hour < 9 or hour > 22  # 非高峰时段

def check_error_rate_below_threshold():
    """确保当前错误率低于阈值"""
    current_error_rate = get_current_error_rate()
    return current_error_rate < 0.01  # 错误率低于1%

37.9 本章小结

本章系统地介绍了 Agent 平台的可扩展性与高可用策略:

  1. 无状态设计:水平扩展的基础,所有状态外置
  2. 负载均衡:多层 LB 架构,智能健康检查
  3. 自动扩缩容:基于 HPA + 预测的智能扩缩容
  4. 服务降级与熔断:多层降级策略 + 熔断器模式
  5. 多区域部署:跨区域流量调度和数据同步
  6. 灾难恢复:备份策略 + 故障转移 + 混沌工程

高可用不是一个目标,而是一个持续的过程。它需要设计、实现、测试、演练的闭环。下一章我们将讨论监控与告警体系——没有监控的可用性只是盲目的自信。

基于 MIT 许可发布