第22章:错误处理与重试策略
构建不怕失败的Agent系统
22.1 引言
在传统软件中,错误是例外(Exception)。在 Agent 系统中,错误是常态(Norm)。LLM 的输出不确定性、外部 API 的不可靠性、网络波动、Token 超限——这些"故障"时刻都在发生。
一个健壮的 Agent 系统不是不犯错,而是犯了错能优雅地恢复。本章将系统讲解 Agent 系统中的错误处理策略,从 LLM 输出解析到工具调用失败,从超时降级到指数退避,构建一个"不怕失败"的容错体系。
本章学习目标
- 理解 Agent 系统中错误的分类和特征
- 掌握 LLM 输出解析错误的处理方法
- 实现工具调用失败的容错机制
- 设计超时与降级策略
- 构建指数退避重试框架
- 建立错误日志与报警体系
22.2 错误分类体系
22.2.1 Agent错误分类
Agent 错误
├── 输入错误
│ ├── 用户输入无效/不完整
│ ├── Prompt 注入
│ └── 输入超长(Token 超限)
├── 模型错误
│ ├── 输出格式不符(JSON 解析失败等)
│ ├── 输出内容偏离(幻觉、无关内容)
│ ├── 输出被截断(max_tokens 不足)
│ └── 模型拒绝回答(安全过滤)
├── 工具错误
│ ├── API 调用失败(网络、认证、限流)
│ ├── 工具参数错误
│ ├── 工具执行超时
│ └── 工具返回无效数据
├── 系统错误
│ ├── 内存不足
│ ├── 并发超限
│ ├── 服务不可用
│ └── 配置错误
└── 业务错误
├── 权限不足
├── 数据不存在
├── 业务规则冲突
└── 状态不一致22.2.2 错误严重等级
python
from enum import Enum
class ErrorSeverity(Enum):
"""错误严重等级"""
LOW = "low" # 可忽略,不影响主流程
MEDIUM = "medium" # 影响部分功能,可降级处理
HIGH = "high" # 影响主流程,需要重试或降级
CRITICAL = "critical" # 系统级故障,需要立即干预
class AgentError:
"""统一的错误类"""
def __init__(
self,
error_type: str,
message: str,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
recoverable: bool = True,
context: dict = None,
cause: Exception = None,
):
self.error_type = error_type
self.message = message
self.severity = severity
self.recoverable = recoverable
self.context = context or {}
self.cause = cause
self.timestamp = __import__('time').time()
def __str__(self):
return f"[{self.severity.value}] {self.error_type}: {self.message}"
def to_dict(self) -> dict:
return {
"type": self.error_type,
"message": self.message,
"severity": self.severity.value,
"recoverable": self.recoverable,
"context": self.context,
"cause": str(self.cause) if self.cause else None,
}22.3 LLM输出解析错误处理
22.3.1 JSON解析失败的处理
LLM 返回的 JSON 经常格式不完美——缺少引号、多余逗号、混合了自然语言等:
python
import json
import re
from typing import Any, TypeVar, Type
T = TypeVar('T')
class OutputParser:
"""LLM 输出解析器——容错版"""
@staticmethod
def parse_json(text: str, strict: bool = False) -> dict:
"""
解析 LLM 输出的 JSON,支持多种容错模式。
容错策略按优先级:
1. 直接解析
2. 提取JSON代码块
3. 修复常见格式问题
4. 降级为结构化文本
"""
# 策略1:直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
if strict:
raise ValueError(f"严格的JSON解析失败: {text[:200]}")
# 策略2:提取 ```json ... ``` 代码块
json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# 策略3:提取 { ... } 或 [ ... ]
brace_match = re.search(r'\{.*\}', text, re.DOTALL)
bracket_match = re.search(r'\[.*\]', text, re.DOTALL)
for match in [brace_match, bracket_match]:
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
cleaned = OutputParser._fix_common_issues(match.group(0))
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
# 策略4:降级为键值对提取
return OutputParser._fallback_parse(text)
@staticmethod
def _fix_common_issues(text: str) -> str:
"""修复常见的JSON格式问题"""
# 移除注释(// ...)
text = re.sub(r'//.*', '', text)
# 移除尾随逗号
text = re.sub(r',\s*([}\]])', r'\1', text)
# 单引号转双引号
text = re.sub(r"'", '"', text)
# None -> null
text = re.sub(r'\bNone\b', 'null', text)
# True/False -> true/false
text = re.sub(r'\bTrue\b', 'true', text)
re.sub(r'\bFalse\b', 'false', text)
return text
@staticmethod
def _fallback_parse(text: str) -> dict:
"""降级解析——提取键值对"""
result = {}
# 匹配 key: value 或 key=value
pairs = re.findall(r'["\']?(\w+)["\']?\s*[:=]\s*["\']?([^"\',\n]+)["\']?', text)
for key, value in pairs:
result[key.strip()] = value.strip()
return result if result else {"raw_text": text}
@staticmethod
def parse_with_schema(text: str, schema: dict, max_retries: int = 2) -> dict:
"""
使用Schema约束解析。如果直接解析失败,
使用schema信息尝试修复。
"""
result = OutputParser.parse_json(text)
if not OutputParser._validate_schema(result, schema):
# 尝试用schema的默认值补全缺失字段
for key, config in schema.get("properties", {}).items():
if key not in result and "default" in config:
result[key] = config["default"]
if not OutputParser._validate_schema(result, schema):
raise ValueError(
f"输出不符合Schema。期望字段: {list(schema.get('properties', {}).keys())},"
f"实际字段: {list(result.keys())}"
)
return result
@staticmethod
def _validate_schema(data: dict, schema: dict) -> bool:
"""简易Schema验证"""
required = schema.get("required", [])
return all(k in data for k in required)
# 使用示例
raw_output = """
让我分析一下这个请求:
```json
{
"sentiment": "positive",
"confidence": 0.85,
"reason": "用户表达了满意",
// 这是一个注释
}"""
result = OutputParser.parse_json(raw_output) print(result) #
### 22.3.2 结构化输出重试
```python
from typing import Callable, Optional
class StructuredOutputRetry:
"""结构化输出重试器"""
def __init__(self, llm, max_retries: int = 3):
self.llm = llm
self.max_retries = max_retries
def call(
self,
prompt: str,
output_schema: dict,
fix_hint: str = None,
) -> dict:
"""调用LLM并确保返回结构化输出"""
last_error = None
for attempt in range(self.max_retries):
# 构建带Schema的Prompt
schema_prompt = self._build_schema_prompt(prompt, output_schema)
if attempt > 0 and last_error:
schema_prompt += f"\n\n注意:上次解析失败({last_error})。请严格按照JSON格式输出。"
raw = self.llm.generate(user=schema_prompt, temperature=0.1)
try:
result = OutputParser.parse_json(raw)
if OutputParser._validate_schema(result, output_schema):
return result
last_error = f"缺少必需字段"
except ValueError as e:
last_error = str(e)
# 所有重试失败,返回最佳尝试
return {"error": "无法获取结构化输出", "raw": raw, "attempts": self.max_retries}
def _build_schema_prompt(self, prompt: str, schema: dict) -> str:
"""构建带Schema约束的Prompt"""
schema_str = json.dumps(schema, ensure_ascii=False, indent=2)
return f"""{prompt}
请以JSON格式回复,Schema如下:
{schema_str}
只输出JSON,不要包含其他文字。"""22.4 工具调用失败处理
22.4.1 工具执行器
python
import time
from typing import Any, Callable, Optional
from dataclasses import dataclass, field
@dataclass
class ToolResult:
"""工具执行结果"""
success: bool
data: Any = None
error: str = ""
duration_ms: float = 0
retries_used: int = 0
@dataclass
class ToolConfig:
"""工具配置"""
name: str
handler: Callable
timeout_seconds: float = 30.0
max_retries: int = 3
retry_delay_seconds: float = 1.0
required_params: list[str] = field(default_factory=list)
fallback: Optional[Callable] = None # 降级函数
class ToolExecutor:
"""工具执行器——带超时、重试和降级"""
def __init__(self):
self._tools: dict[str, ToolConfig] = {}
self._execution_log: list[dict] = []
def register(self, config: ToolConfig):
self._tools[config.name] = config
async def execute(self, tool_name: str, params: dict = None) -> ToolResult:
config = self._tools.get(tool_name)
if not config:
return ToolResult(success=False, error=f"工具 {tool_name} 未注册")
# 参数验证
missing = [p for p in config.required_params if not params.get(p)]
if missing:
return ToolResult(
success=False,
error=f"缺少必需参数: {missing}"
)
last_error = ""
retries = 0
for attempt in range(config.max_retries + 1):
try:
t0 = time.time()
# 异步执行(带超时)
result = await self._execute_with_timeout(
config.handler, params, config.timeout_seconds
)
duration = (time.time() - t0) * 1000
self._log(tool_name, True, duration, retries)
return ToolResult(
success=True,
data=result,
duration_ms=duration,
retries_used=retries,
)
except TimeoutError:
last_error = f"执行超时({config.timeout_seconds}s)"
retries += 1
except Exception as e:
last_error = str(e)
retries += 1
if retries <= config.max_retries:
await self._wait(config.retry_delay_seconds * (2 ** (retries - 1)))
# 所有重试失败——尝试降级
if config.fallback:
try:
fallback_result = await config.fallback(params)
return ToolResult(
success=True, data=fallback_result,
error=f"主工具失败,使用降级: {last_error}",
retries_used=retries,
)
except Exception as e:
last_error = f"降级也失败: {e}"
self._log(tool_name, False, 0, retries)
return ToolResult(
success=False,
error=f"工具 {tool_name} 执行失败({retries}次重试后): {last_error}",
retries_used=retries,
)
async def _execute_with_timeout(self, fn, params, timeout):
"""带超时的异步执行"""
import asyncio
try:
return await asyncio.wait_for(fn(params), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError()
@staticmethod
async def _wait(seconds: float):
import asyncio
await asyncio.sleep(seconds)
def _log(self, tool_name: str, success: bool, duration_ms: float, retries: int):
self._execution_log.append({
"tool": tool_name,
"success": success,
"duration_ms": duration_ms,
"retries": retries,
"timestamp": time.time(),
})22.4.2 TypeScript工具执行器
typescript
interface ToolConfig {
name: string;
handler: (params: any) => Promise<any>;
timeoutMs: number;
maxRetries: number;
fallback?: (params: any) => Promise<any>;
}
interface ToolResult {
success: boolean;
data?: any;
error?: string;
durationMs: number;
retriesUsed: number;
}
export class ToolExecutor {
private tools = new Map<string, ToolConfig>();
register(config: ToolConfig): void {
this.tools.set(config.name, config);
}
async execute(toolName: string, params: any): Promise<ToolResult> {
const config = this.tools.get(toolName);
if (!config) return { success: false, error: `Tool ${toolName} not found`, durationMs: 0, retriesUsed: 0 };
let lastError = "";
for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
try {
const t0 = Date.now();
const result = await Promise.race([
config.handler(params),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), config.timeoutMs)
),
]);
return { success: true, data: result, durationMs: Date.now() - t0, retriesUsed: attempt };
} catch (e: any) {
lastError = e.message;
if (attempt < config.maxRetries) {
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
}
}
}
// Fallback
if (config.fallback) {
try {
const data = await config.fallback(params);
return { success: true, data, error: `Fallback used: ${lastError}`, durationMs: 0, retriesUsed: config.maxRetries };
} catch {}
}
return { success: false, error: lastError, durationMs: 0, retriesUsed: config.maxRetries };
}
}22.5 超时与降级策略
22.5.1 分层超时设计
python
class TimeoutConfig:
"""分层超时配置"""
# 整体请求超时
REQUEST_TIMEOUT = 60.0 # 秒
# LLM 调用超时
LLM_CALL_TIMEOUT = 30.0
# 工具执行超时
TOOL_EXECUTION_TIMEOUT = 15.0
# 外部 API 超时
EXTERNAL_API_TIMEOUT = 10.0
# 流式响应首个 Token 超时
FIRST_TOKEN_TIMEOUT = 5.0
class GracefulDegradation:
"""优雅降级策略"""
def __init__(self):
self._strategies: dict[str, list[Callable]] = {}
def register(self, service: str, strategies: list[Callable]):
"""
注册降级策略,按优先级排列。
第一个是首选策略,最后是最终降级方案。
"""
self._strategies[service] = strategies
async def call(self, service: str, **kwargs) -> any:
"""尝试调用服务,失败时按策略降级"""
strategies = self._strategies.get(service, [])
if not strategies:
raise RuntimeError(f"服务 {service} 没有注册任何策略")
errors = []
for i, strategy in enumerate(strategies):
try:
result = await strategy(**kwargs)
if i > 0:
print(f"[降级] 服务 {service} 使用了第 {i+1} 降级策略")
return result
except Exception as e:
errors.append(str(e))
continue
raise RuntimeError(
f"服务 {service} 所有策略均失败: {errors}"
)
# 使用示例
degradation = GracefulDegradation()
degradation.register("search", [
# 策略1:实时搜索
lambda query: realtime_search(query),
# 策略2:缓存搜索
lambda query: cached_search(query),
# 策略3:返回提示信息
lambda query: {"results": [], "message": "搜索服务暂时不可用,请稍后重试"},
])
degradation.register("translation", [
lambda text, lang: llm_translate(text, lang), # LLM翻译
lambda text, lang: rule_based_translate(text, lang), # 规则翻译
lambda text, lang: {"text": text, "note": "翻译服务不可用,返回原文"},
])22.5.2 断路器模式
python
import time
from enum import Enum
from collections import deque
class CircuitState(Enum):
CLOSED = "closed" # 正常工作
OPEN = "open" # 断开(拒绝请求)
HALF_OPEN = "half_open" # 半开(试探性放行)
class CircuitBreaker:
"""
断路器——防止故障级联扩散。
工作原理:
1. CLOSED(关闭):正常转发请求,统计失败次数
2. OPEN(打开):失败次数超过阈值,拒绝请求
3. HALF_OPEN(半开):超时后尝试放行一个请求,
成功则恢复,失败则继续打开
"""
def __init__(
self,
failure_threshold: int = 5, # 触发断路的失败次数
recovery_timeout: float = 30.0, # 断路恢复超时(秒)
half_open_max_calls: int = 3, # 半开状态下最多试探请求数
window_seconds: float = 60.0, # 统计窗口
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.window_seconds = window_seconds
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float = 0
self._half_open_calls = 0
self._window: deque = deque() # (timestamp, success)
@property
def state(self) -> CircuitState:
self._check_state()
return self._state
def _check_state(self):
"""检查是否应该从OPEN转到HALF_OPEN"""
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
def _prune_window(self):
"""清理过期的统计窗口"""
cutoff = time.time() - self.window_seconds
while self._window and self._window[0][0] < cutoff:
self._window.popleft()
def allow_request(self) -> bool:
"""判断是否允许请求通过"""
state = self.state
if state == CircuitState.CLOSED:
return True
if state == CircuitState.HALF_OPEN:
if self._half_open_calls < self.half_open_max_calls:
self._half_open_calls += 1
return True
return False
return False # OPEN
def record_success(self):
"""记录成功"""
self._window.append((time.time(), True))
self._success_count += 1
if self._state == CircuitState.HALF_OPEN:
# 半开状态下成功,恢复
self._state = CircuitState.CLOSED
self._failure_count = 0
def record_failure(self):
"""记录失败"""
self._window.append((time.time(), False))
self._failure_count += 1
self._last_failure_time = time.time()
self._prune_window()
recent_failures = sum(1 for _, s in self._window if not s)
if recent_failures >= self.failure_threshold:
self._state = CircuitState.OPEN
if self._state == CircuitState.HALF_OPEN:
# 半开状态下失败,回到OPEN
self._state = CircuitState.OPEN
def get_stats(self) -> dict:
self._prune_window()
return {
"state": self._state.value,
"failure_count": self._failure_count,
"success_count": self._success_count,
"window_size": len(self._window),
"recent_failures": sum(1 for _, s in self._window if not s),
}
# 使用示例
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10)
async def call_with_breaker(fn, *args, **kwargs):
if not breaker.allow_request():
return {"error": "服务暂时不可用(断路器已打开)", "fallback": True}
try:
result = await fn(*args, **kwargs)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise22.6 指数退避重试
22.6.1 指数退避实现
python
import random
import asyncio
from dataclasses import dataclass
@dataclass
class RetryResult:
success: bool
data: any = None
error: str = ""
attempts: int = 0
total_delay_ms: float = 0
class ExponentialBackoff:
"""
指数退避重试器。
退避公式: delay = min(base * 2^attempt + jitter, max_delay)
支持以下特性:
- 指数增长延迟
- 随机抖动(Jitter)避免雷群效应
- 可配置的退避参数
- 可重试错误过滤
"""
def __init__(
self,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable_errors: tuple = None,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
self.retryable_errors = retryable_errors or (
TimeoutError, ConnectionError,
)
def calculate_delay(self, attempt: int) -> float:
"""计算第N次重试的延迟"""
delay = self.base_delay * (2 ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# 全抖动:在 [0, delay] 范围内随机
delay = random.uniform(0, delay)
return delay
def should_retry(self, error: Exception) -> bool:
"""判断错误是否应该重试"""
return isinstance(error, self.retryable_errors)
async def execute(self, fn, *args, **kwargs) -> RetryResult:
"""执行带重试的操作"""
total_delay = 0
for attempt in range(self.max_retries + 1):
try:
result = await fn(*args, **kwargs)
return RetryResult(
success=True, data=result,
attempts=attempt + 1, total_delay_ms=total_delay * 1000,
)
except Exception as e:
if attempt == self.max_retries or not self.should_retry(e):
return RetryResult(
success=False, error=str(e),
attempts=attempt + 1, total_delay_ms=total_delay * 1000,
)
delay = self.calculate_delay(attempt)
total_delay += delay
print(f"[重试] 第 {attempt + 1} 次失败 ({e}),"
f"{delay:.1f}s 后重试...")
await asyncio.sleep(delay)
def get_schedule(self) -> list[float]:
"""获取重试时间表(预览)"""
return [self.calculate_delay(i) for i in range(self.max_retries)]
# 使用示例
backoff = ExponentialBackoff(max_retries=5, base_delay=1.0, max_delay=60)
print(f"重试时间表: {backoff.get_schedule()}")
# [0.3, 1.7, 3.2, 7.8, 15.4] (带抖动,每次不同)22.6.2 自适应退避
python
class AdaptiveBackoff:
"""
自适应退避——根据服务器响应动态调整退避策略。
如果服务器返回 Retry-After 头,优先使用;
如果检测到限流(429),增加退避时间;
如果连续成功,逐步减少退避时间。
"""
def __init__(
self,
max_retries: int = 5,
initial_delay: float = 1.0,
max_delay: float = 120.0,
backoff_factor: float = 2.0,
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
self._current_delay = initial_delay
self._consecutive_successes = 0
def get_delay(self, response_headers: dict = None) -> float:
"""计算下次重试的延迟"""
# 优先使用服务器的 Retry-After
if response_headers:
retry_after = response_headers.get("Retry-After")
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
return min(self._current_delay, self.max_delay)
def record_success(self):
"""记录成功——逐步恢复初始延迟"""
self._consecutive_successes += 1
if self._consecutive_successes >= 3:
self._current_delay = max(
self.initial_delay,
self._current_delay / self.backoff_factor
)
self._consecutive_successes = 0
def record_failure(self, status_code: int = 0):
"""记录失败——增加退避时间"""
if status_code == 429: # 限流
self._current_delay *= self.backoff_factor * 1.5
else:
self._current_delay *= self.backoff_factor
self._consecutive_successes = 022.7 错误日志与报警
22.7.1 结构化错误日志
python
import logging
import json
from datetime import datetime
class AgentErrorLogger:
"""Agent 专用错误日志器"""
def __init__(self, service_name: str):
self.service_name = service_name
self._logger = logging.getLogger(f"agent.{service_name}")
self._error_counts: dict[str, int] = {}
self._alert_thresholds: dict[str, tuple[int, float]] = {}
# {error_type: (count_threshold, time_window_seconds)}
self._last_alert_time: dict[str, float] = {}
def set_alert_threshold(
self, error_type: str, count: int, window_seconds: float = 300
):
"""设置报警阈值"""
self._alert_thresholds[error_type] = (count, window_seconds)
def log_error(
self,
error_type: str,
message: str,
severity: str = "medium",
context: dict = None,
):
"""记录错误"""
entry = {
"timestamp": datetime.now().isoformat(),
"service": self.service_name,
"error_type": error_type,
"severity": severity,
"message": message,
"context": context or {},
}
# 写入日志
self._logger.error(json.dumps(entry, ensure_ascii=False))
# 更新计数
self._error_counts[error_type] = self._error_counts.get(error_type, 0) + 1
# 检查报警
self._check_alert(error_type)
def _check_alert(self, error_type: str):
"""检查是否需要报警"""
if error_type not in self._alert_thresholds:
return
threshold, window = self._alert_thresholds[error_type]
count = self._error_counts.get(error_type, 0)
now = time.time()
# 检查是否在冷却期
last_alert = self._last_alert_time.get(error_type, 0)
if now - last_alert < window:
return
if count >= threshold:
self._send_alert(error_type, count, threshold)
self._last_alert_time[error_type] = now
self._error_counts[error_type] = 0
def _send_alert(self, error_type: str, count: int, threshold: int):
"""发送报警(实际中对接通知系统)"""
alert_msg = (
f"🚨 [Agent报警] 服务: {self.service_name}\n"
f"错误类型: {error_type}\n"
f"发生次数: {count}(阈值: {threshold})\n"
f"时间: {datetime.now().isoformat()}"
)
print(alert_msg) # 实际中发送到 Slack/邮件/短信等22.7.2 错误仪表板数据结构
python
@dataclass
class ErrorMetrics:
"""错误指标"""
period: str
total_errors: int = 0
by_type: dict[str, int] = field(default_factory=dict)
by_severity: dict[str, int] = field(default_factory=dict)
top_errors: list[dict] = field(default_factory=list)
def to_dashboard(self) -> dict:
return {
"period": self.period,
"total_errors": self.total_errors,
"error_rate": self._calc_error_rate(),
"by_type": dict(sorted(
self.by_type.items(), key=lambda x: -x[1]
)[:10]),
"by_severity": self.by_severity,
"top_errors": self.top_errors[:5],
}22.8 最佳实践
22.8.1 错误处理策略清单
| 场景 | 策略 | 实现方式 |
|---|---|---|
| LLM输出格式错误 | 重试 + 自修复 | OutputParser + StructuredOutputRetry |
| 工具API失败 | 指数退避重试 | ExponentialBackoff |
| 工具API持续故障 | 断路器 | CircuitBreaker |
| 服务降级 | 多级降级 | GracefulDegradation |
| 超时 | 分层超时 | TimeoutConfig |
| 用户输入无效 | 友好提示 | 输入验证 + 引导 |
| 未知错误 | 降级响应 + 日志 | try-catch + AgentErrorLogger |
22.8.2 常见陷阱
陷阱1:吞掉错误
python
# ❌ 吞掉所有错误
try:
result = call_llm()
except:
result = "出错了"
# ✅ 分类处理
try:
result = call_llm()
except LLMTimeoutError:
result = fallback_response()
logger.warning("LLM超时,使用降级")
except Exception as e:
result = error_response()
logger.error(f"未知错误: {e}")陷阱2:无限重试
python
# ❌ 无限重试
while True:
try:
return call_api()
except:
pass
# ✅ 有限重试 + 退避
backoff = ExponentialBackoff(max_retries=3)
result = await backoff.execute(call_api)陷阱3:重试非幂等操作
python
# ❌ 重试非幂等操作(可能导致重复扣款)
backoff.execute(process_payment, amount=100)
# ✅ 非幂等操作需要先检查状态
async def safe_process_payment(params):
if await check_payment_exists(params["order_id"]):
return {"status": "already_processed"}
return await process_payment(params)22.9 本章小结
本章系统介绍了 Agent 系统的错误处理策略:
- 错误分类是第一步——理解错误类型才能对症下药
- 输出容错(JSON修复、Schema验证)应对 LLM 的不确定性
- 工具执行器(超时、重试、降级)是工具调用的安全网
- 断路器防止故障级联扩散
- 指数退避避免雷群效应
- 结构化日志是故障诊断的基础
记住:在 Agent 系统中,假设一切都会失败。你的任务是确保失败时的行为是可预测和可恢复的。