第36章 生产环境架构设计
"架构不是画出来的,是演进出来的。但好的初始设计,能让演进更顺畅。" —— 一位匿名架构师
36.1 概述:从原型到生产的跨越
将一个 Agent 系统从实验室推向生产环境,绝不仅仅是"多加几台服务器"这么简单。生产环境意味着你面对的是一个真实、复杂、不可预测的世界:用户流量忽高忽低、上游 LLM 服务偶尔不稳定、数据需要持久化和加密、不同租户之间的数据必须隔离、每次发布都可能引入新的风险。
本章将从架构设计的角度,系统地讨论如何构建一个生产级的 Agent 平台。我们的讨论将围绕以下几个核心问题展开:
- 服务应该如何拆分? —— 微服务 vs. 单体 vs. 模块化单体
- 请求如何路由? —— API Gateway 的设计哲学
- 组件之间如何通信? —— 同步调用 vs. 异步消息
- 数据如何存储? —— 多种数据引擎的协作
- 多租户如何隔离? —— 数据隔离与资源隔离的策略
在深入具体设计之前,我们先看一个典型的生产级 Agent 平台的整体架构图。
36.1.1 全局架构总览
graph TB
subgraph Clients["客户端层"]
Web[Web 应用]
Mobile[移动端]
API_Consumer[第三方 API]
CLI[CLI 工具]
end
subgraph Edge["边缘层"]
CDN[CDN]
WAF[WAF 防火墙]
LB[负载均衡器]
end
subgraph Gateway["API 网关层"]
AG[API Gateway]
Auth[认证服务]
RateLimit[限流服务]
Route[路由服务]
end
subgraph Core["核心服务层"]
ChatSvc[Chat 服务]
AgentSvc[Agent 编排服务]
ToolSvc[Tool 执行服务]
RAGSvc[RAG 检索服务]
SessionSvc[Session 管理服务]
SkillSvc[Skill 管理服务]
KBService[知识库服务]
end
subgraph AI["AI 推理层"]
Router[模型路由]
Cache[推理缓存]
LLM_A[LLM Provider A]
LLM_B[LLM Provider B]
LLM_C[LLM Provider C]
EmbeddingSvc[Embedding 服务]
end
subgraph Data["数据层"]
PG[(PostgreSQL)]
Redis[(Redis Cluster)]
VectorDB[(向量数据库)]
ObjectStore[(对象存储)]
MQ[消息队列]
end
subgraph Infra["基础设施层"]
K8s[Kubernetes]
Monitor[监控系统]
LogCenter[日志中心]
end
Clients --> Edge
Edge --> Gateway
Gateway --> Core
Core --> AI
Core --> Data
Core --> MQ
AI --> LLM_A & LLM_B & LLM_C
AI --> EmbeddingSvc
Infra -.-> Core
Infra -.-> Gateway
Infra -.-> Data这个架构图展示了一个完整的生产级 Agent 平台的各个层次。接下来,我们将逐层深入分析。
36.2 微服务架构下的 Agent 服务
36.2.1 服务拆分原则
Agent 系统的服务拆分需要遵循领域驱动设计(DDD)的原则,同时考虑实际运维的复杂度。过早的微服务化会导致运维成本急剧上升,而过晚的拆分则可能让系统陷入"大泥球"。
推荐策略:先模块化单体,再按需拆分
阶段一:模块化单体(0-1K QPS)
┌─────────────────────────────────────┐
│ Agent Platform │
│ ┌─────────┬──────────┬───────────┐ │
│ │ Chat │ Agent │ Session │ │
│ │ Module │ Module │ Module │ │
│ ├─────────┼──────────┼───────────┤ │
│ │ RAG │ Skill │ Tool │ │
│ │ Module │ Module │ Module │ │
│ └─────────┴──────────┴───────────┘ │
└─────────────────────────────────────┘
阶段二:核心服务拆分(1K-10K QPS)
┌──────────┐ ┌──────────┐ ┌──────────┐
│Chat Svc │ │Agent Svc │ │RAG Svc │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────┴──────────────┴──────────────┴────┐
│ 共享模块(Session/Skill/Tool) │
└───────────────────────────────────────┘
阶段三:完全微服务化(10K+ QPS)
每个核心能力独立部署,通过消息队列和 API 通信服务拆分的具体建议:
| 服务 | 职责 | 拆分时机 | 数据库策略 |
|---|---|---|---|
| Chat Service | 处理用户对话请求 | 启动时即独立 | 独立 Schema |
| Agent Orchestrator | Agent 编排和任务调度 | QPS > 500 | 独立 Schema |
| RAG Service | 检索增强生成 | 知识库功能上线时 | 共享向量库 |
| Tool Execution | 工具调用执行 | 工具数量 > 20 | 独立 Schema |
| Session Manager | 会话状态管理 | 需要跨服务共享状态 | Redis + PG |
| Skill Registry | 技能注册与管理 | 技能系统上线时 | 独立 Schema |
| Knowledge Base | 知识库管理 | RAG 上线时 | 独立 Schema |
| Model Router | LLM 模型路由 | 多模型支持时 | 配置存储 |
36.2.2 服务间通信
在微服务架构中,服务间通信是最关键的设计决策之一。
同步通信 vs. 异步通信的选择矩阵:
graph LR
A[请求类型] --> B{是否需要即时响应?}
B -->|是| C{是否可以容忍失败?}
B -->|否| D[异步消息]
C -->|否| E[同步 HTTP/gRPC]
C -->|是| F{调用链深度 > 3?}
F -->|是| D
F -->|否| E具体实践:
同步调用(HTTP/gRPC):适用于需要即时响应的场景
- 用户发送消息 → Chat Service → Agent Service → 返回响应
- 推荐使用 gRPC 进行内部服务间调用,性能优于 HTTP/JSON
异步消息(MQ):适用于不需要即时响应的场景
- Agent 执行结果通知
- 知识库文档索引构建
- 使用统计和分析
- 审计日志收集
gRPC 服务定义示例:
// agent_service.proto
syntax = "proto3";
package agent.v1;
service AgentService {
// 执行 Agent 任务
rpc ExecuteTask(ExecuteTaskRequest) returns (stream TaskResponse);
// 获取任务状态
rpc GetTaskStatus(TaskStatusRequest) returns (TaskStatus);
// 取消任务
rpc CancelTask(CancelTaskRequest) returns (CancelResponse);
}
message ExecuteTaskRequest {
string session_id = 1;
string user_message = 2;
string agent_type = 3; // "chat", "rag", "tool_call"
map<string, string> context = 4;
int32 timeout_seconds = 5;
}
message TaskResponse {
string task_id = 1;
oneof payload {
ChunkResponse chunk = 2;
ToolCallResponse tool_call = 3;
FinalResponse final = 4;
ErrorResponse error = 5;
}
}
message FinalResponse {
string content = 1;
int32 total_tokens = 2;
float latency_ms = 3;
repeated ToolCallResult tool_results = 4;
}36.2.3 服务治理
服务注册与发现配置示例(Consul):
# consul-agent.hcl
datacenter = "agent-platform"
data_dir = "/opt/consul/data"
server = true
bootstrap_expect = 3
services {
id = "agent-service-1"
name = "agent-service"
tags = ["v2.1.0", "production"]
port = 50051
check {
id = "agent-health"
name = "Agent Service Health"
http = "http://localhost:8080/health"
interval = "10s"
timeout = "3s"
}
}Service Mesh 选型建议:
| 方案 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| Istio | 大规模集群(>100 节点) | 功能全面 | 运维复杂 |
| Linkerd | 中小规模集群 | 轻量、易用 | 功能较少 |
| 不使用 Service Mesh | 团队 < 10 人 | 简单直接 | 缺少流量治理 |
建议:对于 Agent 平台初期的团队规模,推荐先不引入 Service Mesh,通过 API Gateway + 服务注册中心实现基本的流量管理。
36.3 API Gateway 设计
36.3.1 Gateway 的核心职责
API Gateway 是整个 Agent 平台的统一入口,承担着流量管理、认证授权、限流熔断等关键职责。
graph LR
Request[客户端请求] --> GW[API Gateway]
GW --> Auth{认证检查}
Auth -->|通过| RL{限流检查}
Auth -->|失败| Reject401[401 Unauthorized]
RL -->|通过| Route{路由匹配}
RL -->|超限| Reject429[429 Too Many Requests]
Route --> SVC[后端服务]
subgraph Gateway 内部
Auth
RL
Route
end36.3.2 Gateway 架构选型
主流 API Gateway 对比:
| 方案 | 语言 | 性能 | 可扩展性 | 运维复杂度 |
|---|---|---|---|---|
| Kong + Lua | Lua | 高 | 高(插件丰富) | 中 |
| APISIX | Lua | 极高 | 高(原生 Nginx) | 中 |
| Tyk | Go | 高 | 中 | 低 |
| 自研 Gateway | Go/Rust | 最高 | 完全可控 | 高 |
推荐方案:APISIX 或自研轻量 Gateway。对于 Agent 平台的特殊需求(如 SSE 流式响应、Token 级别的限流),自研 Gateway 可能是更好的选择。
自研 Gateway 核心代码示例(Go):
// gateway/main.go
package main
import (
"context"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
type AgentGateway struct {
router *http.ServeMux
authService AuthService
rateLimiter *TokenRateLimiter
circuitBreakers map[string]*CircuitBreaker
config *GatewayConfig
}
type GatewayConfig struct {
ListenAddr string `yaml:"listen_addr"`
AuthServiceAddr string `yaml:"auth_service_addr"`
Routes []RouteConfig `yaml:"routes"`
RateLimits RateLimitConfig `yaml:"rate_limits"`
}
type RouteConfig struct {
Path string `yaml:"path"`
Upstream string `yaml:"upstream"`
StripPrefix bool `yaml:"strip_prefix"`
Timeout int `yaml:"timeout_ms"`
MaxRetries int `yaml:"max_retries"`
}
func (gw *AgentGateway) handleChat(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 1. 认证
userID, _, err := gw.authService.Authenticate(r)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 2. Token 级别限流
if !gw.rateLimiter.Allow(userID, "chat") {
w.Header().Set("Retry-After", "60")
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
// 3. 熔断检查
cb := gw.circuitBreakers["agent-service"]
if !cb.Allow() {
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
return
}
// 4. 转发请求(支持 SSE 流式)
gw.proxyChatRequest(w, r, userID)
_ = time.Since(start).Seconds() // 记录延迟指标
}
// proxyChatRequest 代理聊天请求,支持 SSE 流式传输
func (gw *AgentGateway) proxyChatRequest(w http.ResponseWriter, r *http.Request, userID string) {
upstream := gw.config.Routes[0].Upstream
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()
proxyReq, _ := http.NewRequestWithContext(ctx, r.Method, upstream+r.URL.Path, r.Body)
proxyReq.Header = r.Header.Clone()
proxyReq.Header.Set("X-User-ID", userID)
resp, err := http.DefaultClient.Do(proxyReq)
if err != nil {
http.Error(w, "Upstream error", http.StatusBadGateway)
return
}
defer resp.Body.Close()
contentType := resp.Header.Get("Content-Type")
if strings.Contains(contentType, "text/event-stream") {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, _ := w.(http.Flusher)
buf := make([]byte, 4096)
for {
n, err := resp.Body.Read(buf)
if n > 0 {
w.Write(buf[:n])
flusher.Flush()
}
if err == io.EOF {
break
}
if err != nil {
break
}
}
} else {
w.Header().Set("Content-Type", contentType)
io.Copy(w, resp.Body)
}
}36.3.3 限流策略
Agent 平台的限流需要考虑多个维度:
# rate_limit_config.yaml
rate_limits:
global:
requests_per_second: 10000
burst: 15000
user:
free_tier:
requests_per_minute: 20
tokens_per_day: 100000
max_concurrent: 2
pro_tier:
requests_per_minute: 100
tokens_per_day: 2000000
max_concurrent: 10
enterprise_tier:
requests_per_minute: 500
tokens_per_day: -1 # 无限制
max_concurrent: 50
api:
/api/v1/chat:
requests_per_minute: 60
token_cost_multiplier: 1.0
/api/v1/agent/execute:
requests_per_minute: 30
token_cost_multiplier: 1.5
/api/v1/rag/search:
requests_per_minute: 120
token_cost_multiplier: 0.3Token 级别限流实现(Python + Redis):
# rate_limiter.py
import time
import redis
class TokenAwareRateLimiter:
"""基于 Token 消耗的限流器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self._lua_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local window_start = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
local current = tonumber(redis.call('ZCARD', key))
if current + cost <= limit then
redis.call('ZADD', key, now, now .. ':' .. cost)
redis.call('EXPIRE', key, window)
return {1, limit - current - cost, now + window}
else
local oldest = tonumber(
redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')[2])
local retry_after = oldest + window - now
return {0, 0, now + window, retry_after}
end
"""
def check(self, user_id: str, token_cost: int = 1,
limit_type: str = "minute") -> dict:
"""检查请求是否被允许"""
configs = {
"minute": (f"rl:{user_id}:minute", 60),
"day": (f"rl:{user_id}:day", 86400),
}
key, window = configs.get(limit_type, configs["minute"])
# 根据用户等级获取不同限额
limit = self._get_user_limit(user_id, limit_type)
result = self.redis.eval(
self._lua_script, 1, key, limit, window,
token_cost, time.time()
)
return {
"allowed": bool(result[0]),
"remaining": int(result[1]),
"reset_at": float(result[2]),
"retry_after": float(result[3]) if len(result) > 3 else None
}
def _get_user_limit(self, user_id: str, limit_type: str) -> int:
"""获取用户限额(从配置或数据库中读取)"""
tier = self._get_user_tier(user_id)
limits = {
"free": {"minute": 20, "day": 100000},
"pro": {"minute": 100, "day": 2000000},
"enterprise": {"minute": 500, "day": 999999999},
}
return limits.get(tier, limits["free"]).get(limit_type, 60)36.4 消息队列集成
36.4.1 消息队列在 Agent 平台中的作用
Agent 系统的许多操作具有天然异步性:文档索引、任务执行结果通知、使用统计分析、审计日志收集等。消息队列将这些操作解耦,提升系统的可靠性和响应速度。
graph TB
subgraph Producers["生产者"]
ChatSvc[Chat Service]
AgentSvc[Agent Service]
RAGSvc[RAG Service]
end
subgraph MQ["消息队列"]
T1[topic: task.result]
T2[topic: document.index]
T3[topic: usage.stats]
T4[topic: audit.log]
T5[topic: notification]
end
subgraph Consumers["消费者"]
NotifSvc[通知服务]
Analytics[分析服务]
IndexSvc[索引服务]
AuditSvc[审计服务]
end
ChatSvc --> T1 & T3 & T5
AgentSvc --> T1 & T3 & T4
RAGSvc --> T2 & T3
T1 --> NotifSvc & Analytics
T2 --> IndexSvc
T3 --> Analytics
T4 --> AuditSvc
T5 --> NotifSvc36.4.2 消息队列选型
| 方案 | 吞吐量 | 延迟 | 持久化 | 适用场景 |
|---|---|---|---|---|
| Kafka | 极高(百万/秒) | 中 | 强 | 审计日志、使用统计 |
| RabbitMQ | 高(万/秒) | 低 | 可选 | 任务通知、事件驱动 |
| Redis Streams | 高 | 极低 | 可选 | 轻量级实时通知 |
| NATS | 极高 | 极低 | JetStream | 高性能内部通信 |
推荐组合:Kafka 用于高吞吐量的数据流(审计、统计),RabbitMQ 用于任务分发和事件通知。
36.4.3 事件消息格式设计
统一的事件消息格式是系统可靠性的基础:
{
"event_id": "evt_8f7d2a1b3c4d",
"event_type": "agent.task.completed",
"event_version": "1.0",
"timestamp": "2026-04-01T00:00:00.000Z",
"source": "agent-service",
"correlation_id": "corr_5e6f7a8b9c0d",
"data": {
"task_id": "task_abc123",
"session_id": "sess_def456",
"user_id": "user_ghi789",
"agent_type": "rag_agent",
"status": "completed",
"result": {
"response": "根据检索结果...",
"sources": ["doc_1", "doc_3"],
"tokens_used": 1523,
"latency_ms": 2340
}
},
"metadata": {
"schema_version": "1.0",
"partition_key": "user_ghi789",
"retry_count": 0
}
}36.4.4 消息消费者幂等性
生产环境中,消息可能被重复投递。消费者必须实现幂等性:
import hashlib
import json
import redis
from datetime import datetime
from typing import Any, Callable
class IdempotentConsumer:
"""幂等消息消费者"""
def __init__(self, redis_client: redis.Redis, handler: Callable):
self.redis = redis_client
self.handler = handler
def consume(self, message: dict) -> dict:
event_id = message["event_id"]
msg_hash = self._compute_hash(message)
# 检查是否已处理
if self._is_processed(event_id, msg_hash):
return {"status": "duplicate", "event_id": event_id}
# 获取处理锁(防止并发重复处理)
lock_key = f"lock:msg:{event_id}"
if not self.redis.set(lock_key, "1", nx=True, ex=300):
return {"status": "locked", "event_id": event_id}
try:
# Double check
if self._is_processed(event_id, msg_hash):
return {"status": "duplicate", "event_id": event_id}
result = self.handler(message)
self._mark_processed(event_id, msg_hash, result)
return {"status": "processed", "result": result}
except Exception as e:
return {"status": "error", "error": str(e)}
finally:
self.redis.delete(lock_key)
def _compute_hash(self, message: dict) -> str:
canonical = json.dumps(message["data"], sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
def _is_processed(self, event_id: str, msg_hash: str) -> bool:
key = f"processed:msg:{event_id}"
stored = self.redis.hget(key, "hash")
return stored == msg_hash.encode() if stored else False
def _mark_processed(self, event_id, msg_hash, result):
key = f"processed:msg:{event_id}"
self.redis.hset(key, mapping={
"hash": msg_hash,
"processed_at": datetime.utcnow().isoformat(),
"result_summary": str(result)[:200]
})
self.redis.expire(key, 86400 * 7)36.5 数据层设计
Agent 平台的数据层需要支持多种数据模型:结构化数据(用户、配置)、半结构化数据(会话历史)、向量数据(Embedding)、缓存数据(热数据)以及大对象(文档、模型文件)。
36.5.1 多引擎数据架构
graph TB
subgraph Services["服务层"]
S1[Chat Service]
S2[Agent Service]
S3[RAG Service]
end
subgraph DataLayer["数据层"]
subgraph Cache["缓存层"]
R1[(Redis Cluster<br/>会话/限流)]
R2[(本地缓存<br/>热点配置)]
end
subgraph Relational["关系型"]
PG[(PostgreSQL<br/>用户/配置/审计)]
PGR[(只读副本<br/>报表查询)]
end
subgraph Vector["向量存储"]
VDB[(Milvus/Qdrant<br/>Embedding)]
end
subgraph Document["文档存储"]
ES[(Elasticsearch<br/>全文检索)]
S3Store[(MinIO/S3<br/>原始文档)]
end
end
S1 --> R1 & PG
S2 --> R1 & PG & VDB
S3 --> VDB & ES & S3Store
PG <--> PGR36.5.2 数据库设计
核心表结构(PostgreSQL):
-- 用户表
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
display_name VARCHAR(100),
tier VARCHAR(20) DEFAULT 'free'
CHECK (tier IN ('free', 'pro', 'enterprise')),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
last_login_at TIMESTAMPTZ,
is_active BOOLEAN DEFAULT true
);
-- 租户表(多租户支持)
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
slug VARCHAR(50) UNIQUE NOT NULL,
plan VARCHAR(20) DEFAULT 'starter',
config JSONB DEFAULT '{}',
rate_limit_config JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 会话表
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
tenant_id UUID REFERENCES tenants(id),
title VARCHAR(500),
agent_type VARCHAR(50) DEFAULT 'chat',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
is_archived BOOLEAN DEFAULT false
);
-- 消息表(按时间范围分区)
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
role VARCHAR(20) NOT NULL
CHECK (role IN ('user', 'assistant', 'system', 'tool')),
content TEXT NOT NULL,
token_count INTEGER DEFAULT 0,
model VARCHAR(50),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
is_starred BOOLEAN DEFAULT false
) PARTITION BY RANGE (created_at);
-- 按月分区
CREATE TABLE messages_2026_04 PARTITION OF messages
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE messages_2026_05 PARTITION OF messages
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- 关键索引
CREATE INDEX idx_messages_session
ON messages (session_id, created_at DESC);
CREATE INDEX idx_sessions_user
ON sessions (user_id, updated_at DESC);
-- Token 使用统计表(分区)
CREATE TABLE token_usage (
id BIGSERIAL PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id),
tenant_id UUID REFERENCES tenants(id),
session_id UUID REFERENCES sessions(id),
model VARCHAR(50) NOT NULL,
prompt_tokens INTEGER NOT NULL,
completion_tokens INTEGER NOT NULL,
total_tokens INTEGER NOT NULL,
cost_usd DECIMAL(10, 6),
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);
CREATE INDEX idx_token_usage_user_date
ON token_usage (user_id, created_at);
CREATE INDEX idx_token_usage_tenant_date
ON token_usage (tenant_id, created_at);36.5.3 缓存策略
# cache_manager.py
import json
import hashlib
import redis
from dataclasses import dataclass
from functools import wraps
from typing import Optional
@dataclass
class CacheConfig:
session_ttl: int = 3600 # 会话缓存 1小时
context_ttl: int = 1800 # 上下文缓存 30分钟
embedding_ttl: int = 86400 * 7 # Embedding 缓存 7天
config_ttl: int = 300 # 配置缓存 5分钟
tool_result_ttl: int = 600 # 工具结果缓存 10分钟
class AgentCacheManager:
"""Agent 平台统一缓存管理"""
def __init__(self, redis_client: redis.Redis, config: CacheConfig):
self.redis = redis_client
self.config = config
def get_session_context(self, session_id: str) -> Optional[dict]:
"""获取会话上下文"""
key = f"session:ctx:{session_id}"
data = self.redis.get(key)
return json.loads(data) if data else None
def set_session_context(self, session_id: str, context: dict):
"""设置会话上下文"""
key = f"session:ctx:{session_id}"
self.redis.setex(
key, self.config.session_ttl, json.dumps(context)
)
def get_embedding_cache(self, text: str) -> Optional[list]:
"""获取 Embedding 缓存"""
h = hashlib.md5(text.encode()).hexdigest()[:16]
data = self.redis.get(f"embedding:{h}")
return json.loads(data) if data else None
def set_embedding_cache(self, text: str, embedding: list):
"""设置 Embedding 缓存"""
h = hashlib.md5(text.encode()).hexdigest()[:16]
self.redis.setex(
f"embedding:{h}",
self.config.embedding_ttl,
json.dumps(embedding)
)
def cached(self, key_prefix: str, ttl: Optional[int] = None):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
arg_str = json.dumps(args, default=str)
kwarg_str = json.dumps(kwargs, sort_keys=True, default=str)
raw = f"{key_prefix}:{func.__name__}:{arg_str}:{kwarg_str}"
cache_key = f"cache:{hashlib.md5(raw.encode()).hexdigest()[:24]}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
if result is not None:
self.redis.setex(
cache_key, ttl or self.config.config_ttl,
json.dumps(result)
)
return result
return wrapper
return decorator36.5.4 向量数据库设计
# vector_store.py
from typing import List, Optional
@dataclass
class VectorSearchResult:
id: str
score: float
metadata: dict
content: str
class VectorStoreManager:
"""向量数据库管理(基于 Milvus)"""
def create_collection(self, name: str, dim: int = 1536):
"""创建向量集合"""
from pymilvus import (
CollectionSchema, FieldSchema, DataType, Collection
)
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR,
is_primary=True, max_length=64),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="metadata", dtype=DataType.JSON),
FieldSchema(name="tenant_id", dtype=DataType.VARCHAR, max_length=64),
]
schema = CollectionSchema(fields=fields, description=name)
collection = Collection(name=name, schema=schema)
# IVF_FLAT 索引适合中等规模
collection.create_index(
field_name="embedding",
index_params={
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 1024}
}
)
return collection
def hybrid_search(self, query_embedding: List[float],
tenant_id: str, top_k: int = 10,
vector_weight: float = 0.7,
keyword: Optional[str] = None
) -> List[VectorSearchResult]:
"""混合检索:向量相似度 + 关键词匹配(RRF 融合)"""
vector_results = self._vector_search(
query_embedding, tenant_id, top_k * 2
)
keyword_results = (
self._keyword_search(keyword, tenant_id, top_k * 2)
if keyword else []
)
return self._reciprocal_rank_fusion(
vector_results, keyword_results, top_k, vector_weight
)
def _reciprocal_rank_fusion(self, vector_results, keyword_results,
top_k, vector_weight):
"""RRF 结果融合"""
scores = {}
k = 60
for rank, r in enumerate(vector_results):
scores.setdefault(r.id, 0)
scores[r.id] += vector_weight / (k + rank + 1)
for rank, r in enumerate(keyword_results):
scores.setdefault(r.id, 0)
scores[r.id] += (1 - vector_weight) / (k + rank + 1)
all_results = {r.id: r for r in vector_results + keyword_results}
sorted_ids = sorted(scores, key=scores.get, reverse=True)[:top_k]
return [all_results[i] for i in sorted_ids if i in all_results]36.6 多租户架构
36.6.1 租户隔离策略
多租户是 Agent 平台的必经之路。不同的租户(企业客户)需要数据隔离、配置隔离和资源隔离。
隔离级别光谱:
最小隔离 ←————————————————————————→ 最大隔离
共享数据库 共享 Schema 独立 Schema 独立实例
共享表 行级隔离 命名空间 物理隔离
成本最低 ←————————————————————————→ 成本最高
运维最简 ←————————————————————————→ 运维最复杂推荐方案:共享数据库 + Schema 隔离 + 资源配额
36.6.2 租户管理实现
# tenant_manager.py
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class TenantPlan(Enum):
STARTER = "starter"
PROFESSIONAL = "professional"
ENTERPRISE = "enterprise"
PLAN_LIMITS = {
TenantPlan.STARTER: {
"max_sessions": 100,
"max_tokens_per_day": 500_000,
"max_documents": 1_000,
"max_concurrent": 3,
"allowed_models": ["gpt-4o-mini", "claude-3-haiku"],
},
TenantPlan.PROFESSIONAL: {
"max_sessions": 10_000,
"max_tokens_per_day": 10_000_000,
"max_documents": 100_000,
"max_concurrent": 20,
"allowed_models": ["gpt-4o", "claude-3-sonnet", "claude-3-haiku"],
},
TenantPlan.ENTERPRISE: {
"max_sessions": -1, # 无限制
"max_tokens_per_day": -1,
"max_documents": -1,
"max_concurrent": 100,
"allowed_models": ["*"], # 全部模型
},
}
class TenantManager:
"""多租户管理器"""
def check_quota(self, tenant_id: str, resource: str) -> bool:
"""检查租户配额"""
plan = self._get_tenant_plan(tenant_id)
limits = PLAN_LIMITS[plan]
limit = limits[resource]
if limit == -1:
return True # 无限制
usage = self._get_current_usage(tenant_id, resource)
return usage < limit
def enforce_tenant_isolation(self, query, tenant_id: str):
"""在查询中强制加入租户隔离条件"""
# 所有涉及租户数据的查询都必须加入 tenant_id 过滤
if hasattr(query, 'where'):
query = query.where(tenant_id=tenant_id)
return query36.6.3 资源配额执行
# tenant_resource_quota.yaml
tenant_quotas:
- tenant_id: "tenant_acme"
plan: "professional"
resources:
sessions:
limit: 10000
current: 4523
alert_threshold: 0.8 # 80%时告警
tokens_daily:
limit: 10000000
current: 6500000
alert_threshold: 0.9
concurrent_requests:
limit: 20
current: 8
knowledge_base:
documents_limit: 100000
storage_limit_gb: 50
current_documents: 23456
current_storage_gb: 12.336.7 架构演进路线
36.7.1 演进阶段
graph LR
A[阶段一<br/>单体应用<br/>0-1K QPS] --> B[阶段二<br/>模块化拆分<br/>1K-10K QPS]
B --> C[阶段三<br/>微服务化<br/>10K-100K QPS]
C --> D[阶段四<br/>多区域部署<br/>100K+ QPS]
style A fill:#e1f5fe
style B fill:#fff3e0
style C fill:#fce4ec
style D fill:#e8f5e9各阶段关键决策点:
| 阶段 | QPS | 关键变化 | 基础设施 |
|---|---|---|---|
| 阶段一 | 0-1K | 单体部署,共享数据库 | 单台服务器 + RDS |
| 阶段二 | 1K-10K | 核心服务拆分,Redis 缓存 | K8s 集群(3-5 节点) |
| 阶段三 | 10K-100K | 完全微服务,消息队列 | K8s 集群(10+ 节点)+ 专线 |
| 阶段四 | 100K+ | 多区域部署,全球路由 | 多云 + CDN + Anycast |
36.7.2 架构决策记录(ADR)模板
每次重大架构决策都应该留下记录:
# ADR-001: 选择 gRPC 作为内部服务间通信协议
## 状态
已采纳
## 背景
Agent 平台的服务间通信需要高性能、强类型的协议。
## 决策
使用 gRPC + Protocol Buffers 作为内部服务间通信协议。
## 理由
1. 性能:比 HTTP/JSON 快 5-10 倍(Protobuf 序列化)
2. 流式支持:原生支持 Server Streaming(适配 SSE 响应)
3. 强类型:Protobuf 提供编译时类型检查
4. 代码生成:自动生成多语言客户端代码
## 后果
- 正面:性能提升、类型安全、开发效率
- 负面:调试比 REST 困难、浏览器不支持 gRPC
- 缓解:保留 HTTP/REST 用于外部 API
## 决策人
@架构组 2026-03-1536.7.3 技术债务管理
生产环境中不可避免会产生技术债务。关键是要有意识地管理它:
# 技术债务看板
## 高优先级
- [ ] Agent Service 的同步调用链过长(最大深度 7)→ 引入异步编排
- [ ] 消息表缺少分区策略 → 按月分区
## 中优先级
- [ ] 缓存失效策略不一致 → 统一 CacheManager
- [ ] 部分服务缺少健康检查端点 → 统一健康检查框架
## 低优先级
- [ ] API 版本管理不够规范 → 制定版本策略
- [ ] 日志格式不完全统一 → 引入结构化日志标准36.8 部署架构示例
36.8.1 Kubernetes 部署配置
# k8s/agent-platform-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
namespace: agent-platform
labels:
app: agent-service
version: v2.1.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # 零停机
selector:
matchLabels:
app: agent-service
template:
metadata:
labels:
app: agent-service
version: v2.1.0
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
containers:
- name: agent-service
image: registry.example.com/agent-service:v2.1.0
ports:
- containerPort: 8080
name: http
- containerPort: 9090
name: metrics
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2000m"
memory: "2Gi"
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: redis-url
- name: JWT_SECRET
valueFrom:
secretKeyRef:
name: agent-secrets
key: jwt-secret
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: agent-config
---
apiVersion: v1
kind: Service
metadata:
name: agent-service
namespace: agent-platform
spec:
selector:
app: agent-service
ports:
- port: 80
targetPort: 8080
type: ClusterIP36.8.2 基础设施即代码(Terraform)
# terraform/main.tf
terraform {
required_version = ">= 1.5"
required_providers {
kubernetes = {
source = "hashicorp/kubernetes"
version = "~> 2.23"
}
rediscloud = {
source = "rediscloud/rediscloud"
version = "~> 0.13"
}
}
}
# Redis Cluster 配置
resource "rediscloud_subscription" "agent_platform" {
name = "agent-platform-cache"
payment_method = "credit-card"
memory_storage = "ram"
redis_version = "7.2"
plan {
memory_limit_in_gb = 10
quantity = 3 # 3节点集群
throughput_measurement {
by = "operations-per-second"
value = 50000
}
}
}
# PostgreSQL 数据库
resource "aws_db_instance" "agent_platform" {
identifier = "agent-platform-db"
engine = "postgres"
engine_version = "16.1"
instance_class = "db.r6g.xlarge"
allocated_storage = 500
max_allocated_storage = 1000
storage_type = "gp3"
multi_az = true
db_subnet_group_name = aws_db_subnet_group.agent.name
vpc_security_group_ids = [aws_security_group.db.id]
backup_retention_period = 30
backup_window = "03:00-04:00"
maintenance_window = "Mon:04:00-Mon:05:00"
skip_final_snapshot = false
final_snapshot_identifier = "agent-platform-final"
tags = {
Project = "agent-platform"
Env = "production"
}
}36.9 本章小结
本章从全局视角介绍了生产级 Agent 平台的架构设计,涵盖了以下核心内容:
- 微服务架构:采用"先模块化单体,再按需拆分"的渐进式策略
- API Gateway:作为统一入口处理认证、限流、路由等横切关注点
- 消息队列:通过异步解耦提升系统的可靠性和响应速度
- 数据层设计:多引擎协作(PostgreSQL + Redis + 向量库 + 对象存储)
- 多租户架构:通过 Schema 隔离和资源配额实现安全的租户隔离
- 架构演进:从单体到微服务再到多区域部署的清晰路线图
架构设计没有银弹。关键是要根据团队规模、业务阶段和技术能力做出合理的选择,并持续演进。下一章我们将深入讨论可扩展性与高可用的具体实现策略。