第35章:企业级设计模式
概述
将 Agent 从原型推向生产环境,面临的是一组截然不同的挑战:可靠性要求从"基本能用"提升到"五个九"可用性,安全从"Demo够用"升级到"企业级合规",成本从"不计代价"转变为"精打细算"。本章将系统讲解企业级 Agent 系统的设计模式,涵盖 API 网关、多租户架构、版本管理、审计合规、高可用、成本管理、安全等核心主题。
35.1 企业级Agent的特殊需求
| 维度 | 原型阶段 | 企业级 |
|---|---|---|
| 可用性 | 90%+ | 99.9%+ |
| 响应延迟 | <30秒 | <5秒 P95 |
| 安全 | API Key | RBAC + 审计 + 加密 |
| 多租户 | 单租户 | 1000+租户隔离 |
| 成本 | 不关心 | 精确计量 + 预算控制 |
| 部署 | 手动 | CI/CD + 蓝/绿 + 回滚 |
| 监控 | 打印日志 | Dashboard + 告警 + SLO |
35.2 Agent Gateway模式
35.2.1 API网关设计
python
from dataclasses import dataclass
@dataclass
class GatewayConfig:
max_requests_per_minute: int = 1000
max_concurrent_requests: int = 100
request_timeout: float = 60.0
auth_required: bool = True
class AgentGateway:
"""Agent API网关"""
def __init__(self, config: GatewayConfig, agent):
self.config = config
self.agent = agent
self.rate_limiter = TokenBucketRateLimiter(
config.max_requests_per_minute
)
self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
self._middleware: list[Callable] = []
def use(self, middleware: Callable):
self._middleware.append(middleware)
return self
async def handle_request(self, request: dict) -> dict:
context = {"request": request, "response": None}
for mw in self._middleware:
result = await mw(context)
if result is not None:
return result
if not self.rate_limiter.acquire():
return {"error": "rate_limit_exceeded"}
async with self.semaphore:
try:
result = await asyncio.wait_for(
self.agent.run(request["input"]),
timeout=self.config.request_timeout
)
return {"success": True, "result": result}
except asyncio.TimeoutError:
return {"error": "timeout"}
except Exception as e:
return {"error": "internal", "message": str(e)}
# 中间件
async def auth_middleware(ctx: dict) -> dict | None:
token = ctx["request"].get("auth_token")
if not token or not validate_token(token):
return {"error": "unauthorized"}
ctx["user"] = decode_token(token)
return None
async def audit_middleware(ctx: dict) -> dict | None:
await audit_log.record({
"user": ctx.get("user", {}).get("id"),
"action": "agent_request",
"timestamp": datetime.now().isoformat(),
})
return None
async def pii_filter_middleware(ctx: dict) -> dict | None:
ctx["request"]["input"] = sanitize_pii(ctx["request"]["input"])
return None35.2.2 路由与负载均衡
python
class AgentRouter:
"""Agent路由器:根据任务类型路由到合适的Agent"""
def __init__(self):
self._routes: dict[str, Agent] = {}
def register(self, path: str, agent: Agent):
self._routes[path] = agent
def route(self, path: str) -> Agent:
if path not in self._routes:
raise ValueError(f"未找到路由: {path}")
return self._routes[path]
def route_by_capability(self, task: str) -> Agent:
task_type = classify_task(task)
routing_map = {
"code": "/agents/code-agent",
"data": "/agents/data-agent",
"general": "/agents/general-agent",
}
path = routing_map.get(task_type, "/agents/general-agent")
return self.route(path)35.3 Agent Registry与发现
35.3.1 服务注册
python
@dataclass
class AgentRegistration:
agent_id: str
name: str
version: str
endpoint: str
capabilities: list[str]
max_concurrent: int
health_check_url: str
registered_at: str = field(default_factory=lambda: datetime.now().isoformat())
class AgentRegistry:
"""Agent注册中心"""
def __init__(self):
self._agents: dict[str, AgentRegistration] = {}
async def register(self, reg: AgentRegistration):
self._agents[reg.agent_id] = reg
async def discover(self, capability: str) -> list[AgentRegistration]:
return [
a for a in self._agents.values()
if capability in a.capabilities
]
async def health_check_all(self) -> dict[str, bool]:
results = {}
for aid, agent in self._agents.items():
try:
async with httpx.AsyncClient() as c:
resp = await c.get(agent.health_check_url, timeout=5.0)
results[aid] = resp.status_code == 200
except Exception:
results[aid] = False
return results35.4 多租户Agent架构
35.4.1 租户隔离
python
@dataclass
class Tenant:
tenant_id: str
name: str
plan: str # free, pro, enterprise
limits: dict = field(default_factory=dict)
def __post_init__(self):
if not self.limits:
self.limits = {
"free": {"max_requests_per_day": 100, "max_tokens_per_request": 4000, "max_concurrent": 2},
"pro": {"max_requests_per_day": 10000, "max_tokens_per_request": 32000, "max_concurrent": 10},
"enterprise": {"max_requests_per_day": -1, "max_tokens_per_request": 128000, "max_concurrent": 100},
}.get(self.plan, {})
class TenantManager:
"""租户管理器"""
def __init__(self):
self._tenants: dict[str, Tenant] = {}
async def check_limit(self, tenant_id: str, metric: str, value: int) -> bool:
tenant = self._tenants.get(tenant_id)
if not tenant:
return False
limit = tenant.limits.get(metric, -1)
if limit == -1:
return True
used = self._get_usage(tenant_id, metric)
return used + value <= limit35.4.2 数据隔离
python
class TenantDataIsolator:
"""租户数据隔离:所有key添加租户前缀"""
def isolate_key(self, tenant_id: str, key: str) -> str:
if not re.match(r'^[a-zA-Z0-9_-]+$', tenant_id):
raise ValueError(f"无效的租户ID: {tenant_id}")
return f"tenant:{tenant_id}:{key}"35.5 Agent版本管理与回滚
python
class AgentVersionManager:
"""Agent版本管理"""
def __init__(self):
self._active: dict[str, str] = {}
self._configs: dict[str, dict] = {}
async def deploy(self, agent_id: str, version: str,
config: dict, strategy: str = "rolling"):
self._configs[f"{agent_id}:{version}"] = config
if strategy == "blue_green":
green = create_agent(config)
if await self._health_check(green):
old = self._active.get(agent_id)
self._active[agent_id] = version
self._configs[f"{agent_id}:rollback"] = old
else:
self._active[agent_id] = version
async def rollback(self, agent_id: str) -> str:
old = self._configs.get(f"{agent_id}:rollback")
if old:
self._active[agent_id] = old
return old
raise ValueError("没有可回滚的版本")35.6 审计与合规模式
35.6.1 操作审计
python
class AuditLogger:
"""操作审计日志"""
async def log(self, event: dict):
entry = {
"event_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"actor_id": event["actor_id"],
"action": event["action"],
"resource_type": event.get("resource_type", "agent"),
"input_hash": hashlib.sha256(
json.dumps(event.get("input", ""), sort_keys=True).encode()
).hexdigest()[:16],
"metadata": event.get("metadata", {}),
}
await self.storage.append("audit_log", entry)
# 必须审计的事件
AUDIT_EVENTS = ["agent.run", "tool.call", "config.change", "data.export", "data.delete"]35.6.2 PII处理
python
class PIIGuard:
"""PII保护"""
PATTERNS = {
"phone": r"\b1[3-9]\d{9}\b",
"email": r"\b[\w.-]+@[\w.-]+\.\w+\b",
"id_card": r"\b\d{17}[\dXx]\b",
}
def process(self, text: str, mode: str = "mask") -> tuple[str, list]:
"""mode: mask/remove/detect"""
detections = []
processed = text
for pii_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text):
detections.append({"type": pii_type, "value": match.group()})
if mode == "mask":
processed = processed.replace(match.group(), match.group()[:2] + "****")
elif mode == "remove":
processed = processed.replace(match.group(), f"[{pii_type}_REDACTED]")
return processed, detections35.7 高可用与容错模式
35.7.1 多模型容错
python
class MultiModelFallback:
"""多模型容错"""
def __init__(self, models: list[dict]):
self.models = sorted(models, key=lambda m: m.get("priority", 99))
async def generate(self, messages: list[dict], **kwargs) -> str:
for model_cfg in self.models:
try:
client = self._get_client(model_cfg)
resp = await client.chat.completions.create(
model=model_cfg["model"], messages=messages, **kwargs
)
return resp.choices[0].message.content
except Exception as e:
print(f"模型 {model_cfg['model']} 失败: {e}")
continue
raise RuntimeError("所有模型均失败")
# 使用
fallback = MultiModelFallback([
{"model": "claude-3.5-sonnet", "priority": 1, "provider": "anthropic"},
{"model": "gpt-4o", "priority": 2, "provider": "openai"},
{"model": "gpt-4o-mini", "priority": 3, "provider": "openai"},
])35.7.2 优雅降级
python
class DegradationManager:
"""优雅降级"""
def __init__(self, agent):
self.agent = agent
self.level = 0 # 0=正常, 1=部分降级, 2=完全降级
async def run(self, task: str) -> str:
if self.level == 0:
try:
return await self.agent.run(task)
except Exception:
self.level = 1
if self.level == 1:
try:
return await self.agent.run_lite(task)
except Exception:
self.level = 2
return "服务暂时不可用,请稍后重试。"35.8 成本管理模式
python
class CostManager:
"""成本管理"""
def __init__(self):
self._budgets: dict[str, float] = {}
self._usage: dict[str, float] = {}
def set_budget(self, tenant_id: str, daily_budget: float):
self._budgets[tenant_id] = daily_budget
async def check_budget(self, tenant_id: str, est_cost: float) -> bool:
budget = self._budgets.get(tenant_id, float('inf'))
used = self._get_daily_usage(tenant_id)
return used + est_cost <= budget
async def record_cost(self, tenant_id: str, model: str, tokens: int):
cost = self._calc_cost(model, tokens)
key = f"{tenant_id}:{date.today().isoformat()}"
self._usage[key] = self._usage.get(key, 0.0) + cost
def get_report(self, tenant_id: str) -> dict:
budget = self._budgets.get(tenant_id, 0)
used = self._get_daily_usage(tenant_id)
return {"budget": budget, "used": round(used, 4),
"remaining": round(budget - used, 4),
"percent": (used / budget * 100) if budget else 0}35.9 企业级安全模式
35.9.1 RBAC权限控制
python
from enum import Enum
class Permission(Enum):
AGENT_RUN = "agent:run"
AGENT_CONFIG = "agent:config"
DATA_EXPORT = "data:export"
ADMIN = "admin"
ROLE_PERMISSIONS = {
"viewer": [Permission.AGENT_RUN],
"operator": [Permission.AGENT_RUN, Permission.AGENT_CONFIG],
"admin": [p for p in Permission],
}
class RBACManager:
"""RBAC权限管理"""
def __init__(self):
self._user_roles: dict[str, list[str]] = {}
def assign_role(self, user_id: str, role: str):
self._user_roles.setdefault(user_id, []).append(role)
def check(self, user_id: str, perm: Permission) -> bool:
for role in self._user_roles.get(user_id, []):
if perm in ROLE_PERMISSIONS.get(role, []):
return True
return False35.9.2 零信任安全
python
class ZeroTrustGuard:
"""零信任安全守卫"""
async def verify_every_request(self, request: dict) -> bool:
"""每个请求都验证:不信任任何上下文"""
checks = [
self._verify_identity(request),
self._verify_permissions(request),
self._verify_payload(request),
self._verify_rate(request),
]
return all(checks)
def _verify_identity(self, request) -> bool:
token = request.get("auth_token")
return token is not None and validate_token(token)
def _verify_permissions(self, request) -> bool:
user_id = request.get("user_id")
action = request.get("action")
return self.rbac.check(user_id, Permission(action))
def _verify_payload(self, request) -> bool:
# 检查payload大小、内容
return len(json.dumps(request)) < 10 * 1024 * 1024最佳实践
- 安全先行:设计阶段就考虑安全、合规和隐私
- 配置外置:Prompt、模型、工具列表通过配置管理
- 渐进部署:金丝雀发布,先5%再全量
- 全面审计:所有关键操作记录日志,保留90天
- 成本可视化:实时展示每个租户的成本消耗
常见陷阱
- 忽略租户隔离:多租户数据混存导致隐私泄露
- 硬编码配置:变更需要重新部署
- 无回滚机制:上线出问题无法快速回退
- 审计不足:失败操作无日志,无法追溯
- 成本失控:无预算限制导致成本爆炸
小结
企业级 Agent 的核心要求是:可靠性是基础,安全性是底线,成本可控是前提。本章介绍的模式——网关、注册发现、多租户、版本管理、审计合规、高可用、成本管理、安全——构成了企业级 Agent 的基础设施骨架。
延伸阅读
- 书籍: "Designing Data-Intensive Applications" (Martin Kleppmann)
- 书籍: "Microservices Patterns" (Chris Richardson)
- 论文: "The Twelve-Factor App"
- NIST AI RMF: AI风险管理框架
- SOC 2: 安全合规标准