Skip to content

第35章:企业级设计模式

概述

将 Agent 从原型推向生产环境,面临的是一组截然不同的挑战:可靠性要求从"基本能用"提升到"五个九"可用性,安全从"Demo够用"升级到"企业级合规",成本从"不计代价"转变为"精打细算"。本章将系统讲解企业级 Agent 系统的设计模式,涵盖 API 网关、多租户架构、版本管理、审计合规、高可用、成本管理、安全等核心主题。

35.1 企业级Agent的特殊需求

维度原型阶段企业级
可用性90%+99.9%+
响应延迟<30秒<5秒 P95
安全API KeyRBAC + 审计 + 加密
多租户单租户1000+租户隔离
成本不关心精确计量 + 预算控制
部署手动CI/CD + 蓝/绿 + 回滚
监控打印日志Dashboard + 告警 + SLO

35.2 Agent Gateway模式

35.2.1 API网关设计

python
from dataclasses import dataclass

@dataclass
class GatewayConfig:
    max_requests_per_minute: int = 1000
    max_concurrent_requests: int = 100
    request_timeout: float = 60.0
    auth_required: bool = True

class AgentGateway:
    """Agent API网关"""
    
    def __init__(self, config: GatewayConfig, agent):
        self.config = config
        self.agent = agent
        self.rate_limiter = TokenBucketRateLimiter(
            config.max_requests_per_minute
        )
        self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
        self._middleware: list[Callable] = []
    
    def use(self, middleware: Callable):
        self._middleware.append(middleware)
        return self
    
    async def handle_request(self, request: dict) -> dict:
        context = {"request": request, "response": None}
        
        for mw in self._middleware:
            result = await mw(context)
            if result is not None:
                return result
        
        if not self.rate_limiter.acquire():
            return {"error": "rate_limit_exceeded"}
        
        async with self.semaphore:
            try:
                result = await asyncio.wait_for(
                    self.agent.run(request["input"]),
                    timeout=self.config.request_timeout
                )
                return {"success": True, "result": result}
            except asyncio.TimeoutError:
                return {"error": "timeout"}
            except Exception as e:
                return {"error": "internal", "message": str(e)}

# 中间件
async def auth_middleware(ctx: dict) -> dict | None:
    token = ctx["request"].get("auth_token")
    if not token or not validate_token(token):
        return {"error": "unauthorized"}
    ctx["user"] = decode_token(token)
    return None

async def audit_middleware(ctx: dict) -> dict | None:
    await audit_log.record({
        "user": ctx.get("user", {}).get("id"),
        "action": "agent_request",
        "timestamp": datetime.now().isoformat(),
    })
    return None

async def pii_filter_middleware(ctx: dict) -> dict | None:
    ctx["request"]["input"] = sanitize_pii(ctx["request"]["input"])
    return None

35.2.2 路由与负载均衡

python
class AgentRouter:
    """Agent路由器:根据任务类型路由到合适的Agent"""
    
    def __init__(self):
        self._routes: dict[str, Agent] = {}
    
    def register(self, path: str, agent: Agent):
        self._routes[path] = agent
    
    def route(self, path: str) -> Agent:
        if path not in self._routes:
            raise ValueError(f"未找到路由: {path}")
        return self._routes[path]
    
    def route_by_capability(self, task: str) -> Agent:
        task_type = classify_task(task)
        routing_map = {
            "code": "/agents/code-agent",
            "data": "/agents/data-agent",
            "general": "/agents/general-agent",
        }
        path = routing_map.get(task_type, "/agents/general-agent")
        return self.route(path)

35.3 Agent Registry与发现

35.3.1 服务注册

python
@dataclass
class AgentRegistration:
    agent_id: str
    name: str
    version: str
    endpoint: str
    capabilities: list[str]
    max_concurrent: int
    health_check_url: str
    registered_at: str = field(default_factory=lambda: datetime.now().isoformat())

class AgentRegistry:
    """Agent注册中心"""
    
    def __init__(self):
        self._agents: dict[str, AgentRegistration] = {}
    
    async def register(self, reg: AgentRegistration):
        self._agents[reg.agent_id] = reg
    
    async def discover(self, capability: str) -> list[AgentRegistration]:
        return [
            a for a in self._agents.values()
            if capability in a.capabilities
        ]
    
    async def health_check_all(self) -> dict[str, bool]:
        results = {}
        for aid, agent in self._agents.items():
            try:
                async with httpx.AsyncClient() as c:
                    resp = await c.get(agent.health_check_url, timeout=5.0)
                    results[aid] = resp.status_code == 200
            except Exception:
                results[aid] = False
        return results

35.4 多租户Agent架构

35.4.1 租户隔离

python
@dataclass
class Tenant:
    tenant_id: str
    name: str
    plan: str  # free, pro, enterprise
    limits: dict = field(default_factory=dict)
    
    def __post_init__(self):
        if not self.limits:
            self.limits = {
                "free": {"max_requests_per_day": 100, "max_tokens_per_request": 4000, "max_concurrent": 2},
                "pro": {"max_requests_per_day": 10000, "max_tokens_per_request": 32000, "max_concurrent": 10},
                "enterprise": {"max_requests_per_day": -1, "max_tokens_per_request": 128000, "max_concurrent": 100},
            }.get(self.plan, {})

class TenantManager:
    """租户管理器"""
    
    def __init__(self):
        self._tenants: dict[str, Tenant] = {}
    
    async def check_limit(self, tenant_id: str, metric: str, value: int) -> bool:
        tenant = self._tenants.get(tenant_id)
        if not tenant:
            return False
        limit = tenant.limits.get(metric, -1)
        if limit == -1:
            return True
        used = self._get_usage(tenant_id, metric)
        return used + value <= limit

35.4.2 数据隔离

python
class TenantDataIsolator:
    """租户数据隔离:所有key添加租户前缀"""
    
    def isolate_key(self, tenant_id: str, key: str) -> str:
        if not re.match(r'^[a-zA-Z0-9_-]+$', tenant_id):
            raise ValueError(f"无效的租户ID: {tenant_id}")
        return f"tenant:{tenant_id}:{key}"

35.5 Agent版本管理与回滚

python
class AgentVersionManager:
    """Agent版本管理"""
    
    def __init__(self):
        self._active: dict[str, str] = {}
        self._configs: dict[str, dict] = {}
    
    async def deploy(self, agent_id: str, version: str, 
                     config: dict, strategy: str = "rolling"):
        self._configs[f"{agent_id}:{version}"] = config
        
        if strategy == "blue_green":
            green = create_agent(config)
            if await self._health_check(green):
                old = self._active.get(agent_id)
                self._active[agent_id] = version
                self._configs[f"{agent_id}:rollback"] = old
        else:
            self._active[agent_id] = version
    
    async def rollback(self, agent_id: str) -> str:
        old = self._configs.get(f"{agent_id}:rollback")
        if old:
            self._active[agent_id] = old
            return old
        raise ValueError("没有可回滚的版本")

35.6 审计与合规模式

35.6.1 操作审计

python
class AuditLogger:
    """操作审计日志"""
    
    async def log(self, event: dict):
        entry = {
            "event_id": str(uuid.uuid4()),
            "timestamp": datetime.now().isoformat(),
            "actor_id": event["actor_id"],
            "action": event["action"],
            "resource_type": event.get("resource_type", "agent"),
            "input_hash": hashlib.sha256(
                json.dumps(event.get("input", ""), sort_keys=True).encode()
            ).hexdigest()[:16],
            "metadata": event.get("metadata", {}),
        }
        await self.storage.append("audit_log", entry)

# 必须审计的事件
AUDIT_EVENTS = ["agent.run", "tool.call", "config.change", "data.export", "data.delete"]

35.6.2 PII处理

python
class PIIGuard:
    """PII保护"""
    
    PATTERNS = {
        "phone": r"\b1[3-9]\d{9}\b",
        "email": r"\b[\w.-]+@[\w.-]+\.\w+\b",
        "id_card": r"\b\d{17}[\dXx]\b",
    }
    
    def process(self, text: str, mode: str = "mask") -> tuple[str, list]:
        """mode: mask/remove/detect"""
        detections = []
        processed = text
        for pii_type, pattern in self.PATTERNS.items():
            for match in re.finditer(pattern, text):
                detections.append({"type": pii_type, "value": match.group()})
                if mode == "mask":
                    processed = processed.replace(match.group(), match.group()[:2] + "****")
                elif mode == "remove":
                    processed = processed.replace(match.group(), f"[{pii_type}_REDACTED]")
        return processed, detections

35.7 高可用与容错模式

35.7.1 多模型容错

python
class MultiModelFallback:
    """多模型容错"""
    
    def __init__(self, models: list[dict]):
        self.models = sorted(models, key=lambda m: m.get("priority", 99))
    
    async def generate(self, messages: list[dict], **kwargs) -> str:
        for model_cfg in self.models:
            try:
                client = self._get_client(model_cfg)
                resp = await client.chat.completions.create(
                    model=model_cfg["model"], messages=messages, **kwargs
                )
                return resp.choices[0].message.content
            except Exception as e:
                print(f"模型 {model_cfg['model']} 失败: {e}")
                continue
        raise RuntimeError("所有模型均失败")

# 使用
fallback = MultiModelFallback([
    {"model": "claude-3.5-sonnet", "priority": 1, "provider": "anthropic"},
    {"model": "gpt-4o", "priority": 2, "provider": "openai"},
    {"model": "gpt-4o-mini", "priority": 3, "provider": "openai"},
])

35.7.2 优雅降级

python
class DegradationManager:
    """优雅降级"""
    
    def __init__(self, agent):
        self.agent = agent
        self.level = 0  # 0=正常, 1=部分降级, 2=完全降级
    
    async def run(self, task: str) -> str:
        if self.level == 0:
            try:
                return await self.agent.run(task)
            except Exception:
                self.level = 1
        
        if self.level == 1:
            try:
                return await self.agent.run_lite(task)
            except Exception:
                self.level = 2
        
        return "服务暂时不可用,请稍后重试。"

35.8 成本管理模式

python
class CostManager:
    """成本管理"""
    
    def __init__(self):
        self._budgets: dict[str, float] = {}
        self._usage: dict[str, float] = {}
    
    def set_budget(self, tenant_id: str, daily_budget: float):
        self._budgets[tenant_id] = daily_budget
    
    async def check_budget(self, tenant_id: str, est_cost: float) -> bool:
        budget = self._budgets.get(tenant_id, float('inf'))
        used = self._get_daily_usage(tenant_id)
        return used + est_cost <= budget
    
    async def record_cost(self, tenant_id: str, model: str, tokens: int):
        cost = self._calc_cost(model, tokens)
        key = f"{tenant_id}:{date.today().isoformat()}"
        self._usage[key] = self._usage.get(key, 0.0) + cost
    
    def get_report(self, tenant_id: str) -> dict:
        budget = self._budgets.get(tenant_id, 0)
        used = self._get_daily_usage(tenant_id)
        return {"budget": budget, "used": round(used, 4), 
                "remaining": round(budget - used, 4),
                "percent": (used / budget * 100) if budget else 0}

35.9 企业级安全模式

35.9.1 RBAC权限控制

python
from enum import Enum

class Permission(Enum):
    AGENT_RUN = "agent:run"
    AGENT_CONFIG = "agent:config"
    DATA_EXPORT = "data:export"
    ADMIN = "admin"

ROLE_PERMISSIONS = {
    "viewer": [Permission.AGENT_RUN],
    "operator": [Permission.AGENT_RUN, Permission.AGENT_CONFIG],
    "admin": [p for p in Permission],
}

class RBACManager:
    """RBAC权限管理"""
    
    def __init__(self):
        self._user_roles: dict[str, list[str]] = {}
    
    def assign_role(self, user_id: str, role: str):
        self._user_roles.setdefault(user_id, []).append(role)
    
    def check(self, user_id: str, perm: Permission) -> bool:
        for role in self._user_roles.get(user_id, []):
            if perm in ROLE_PERMISSIONS.get(role, []):
                return True
        return False

35.9.2 零信任安全

python
class ZeroTrustGuard:
    """零信任安全守卫"""
    
    async def verify_every_request(self, request: dict) -> bool:
        """每个请求都验证:不信任任何上下文"""
        checks = [
            self._verify_identity(request),
            self._verify_permissions(request),
            self._verify_payload(request),
            self._verify_rate(request),
        ]
        return all(checks)
    
    def _verify_identity(self, request) -> bool:
        token = request.get("auth_token")
        return token is not None and validate_token(token)
    
    def _verify_permissions(self, request) -> bool:
        user_id = request.get("user_id")
        action = request.get("action")
        return self.rbac.check(user_id, Permission(action))
    
    def _verify_payload(self, request) -> bool:
        # 检查payload大小、内容
        return len(json.dumps(request)) < 10 * 1024 * 1024

最佳实践

  1. 安全先行:设计阶段就考虑安全、合规和隐私
  2. 配置外置:Prompt、模型、工具列表通过配置管理
  3. 渐进部署:金丝雀发布,先5%再全量
  4. 全面审计:所有关键操作记录日志,保留90天
  5. 成本可视化:实时展示每个租户的成本消耗

常见陷阱

  1. 忽略租户隔离:多租户数据混存导致隐私泄露
  2. 硬编码配置:变更需要重新部署
  3. 无回滚机制:上线出问题无法快速回退
  4. 审计不足:失败操作无日志,无法追溯
  5. 成本失控:无预算限制导致成本爆炸

小结

企业级 Agent 的核心要求是:可靠性是基础,安全性是底线,成本可控是前提。本章介绍的模式——网关、注册发现、多租户、版本管理、审计合规、高可用、成本管理、安全——构成了企业级 Agent 的基础设施骨架。

延伸阅读

  1. 书籍: "Designing Data-Intensive Applications" (Martin Kleppmann)
  2. 书籍: "Microservices Patterns" (Chris Richardson)
  3. 论文: "The Twelve-Factor App"
  4. NIST AI RMF: AI风险管理框架
  5. SOC 2: 安全合规标准

基于 MIT 许可发布