Skip to content

第22章:错误处理与重试策略

构建不怕失败的Agent系统


22.1 引言

在传统软件中,错误是例外(Exception)。在 Agent 系统中,错误是常态(Norm)。LLM 的输出不确定性、外部 API 的不可靠性、网络波动、Token 超限——这些"故障"时刻都在发生。

一个健壮的 Agent 系统不是不犯错,而是犯了错能优雅地恢复。本章将系统讲解 Agent 系统中的错误处理策略,从 LLM 输出解析到工具调用失败,从超时降级到指数退避,构建一个"不怕失败"的容错体系。

本章学习目标

  • 理解 Agent 系统中错误的分类和特征
  • 掌握 LLM 输出解析错误的处理方法
  • 实现工具调用失败的容错机制
  • 设计超时与降级策略
  • 构建指数退避重试框架
  • 建立错误日志与报警体系

22.2 错误分类体系

22.2.1 Agent错误分类

Agent 错误
├── 输入错误
│   ├── 用户输入无效/不完整
│   ├── Prompt 注入
│   └── 输入超长(Token 超限)
├── 模型错误
│   ├── 输出格式不符(JSON 解析失败等)
│   ├── 输出内容偏离(幻觉、无关内容)
│   ├── 输出被截断(max_tokens 不足)
│   └── 模型拒绝回答(安全过滤)
├── 工具错误
│   ├── API 调用失败(网络、认证、限流)
│   ├── 工具参数错误
│   ├── 工具执行超时
│   └── 工具返回无效数据
├── 系统错误
│   ├── 内存不足
│   ├── 并发超限
│   ├── 服务不可用
│   └── 配置错误
└── 业务错误
    ├── 权限不足
    ├── 数据不存在
    ├── 业务规则冲突
    └── 状态不一致

22.2.2 错误严重等级

python
from enum import Enum


class ErrorSeverity(Enum):
    """错误严重等级"""
    LOW = "low"           # 可忽略,不影响主流程
    MEDIUM = "medium"     # 影响部分功能,可降级处理
    HIGH = "high"         # 影响主流程,需要重试或降级
    CRITICAL = "critical" # 系统级故障,需要立即干预


class AgentError:
    """统一的错误类"""
    
    def __init__(
        self,
        error_type: str,
        message: str,
        severity: ErrorSeverity = ErrorSeverity.MEDIUM,
        recoverable: bool = True,
        context: dict = None,
        cause: Exception = None,
    ):
        self.error_type = error_type
        self.message = message
        self.severity = severity
        self.recoverable = recoverable
        self.context = context or {}
        self.cause = cause
        self.timestamp = __import__('time').time()
    
    def __str__(self):
        return f"[{self.severity.value}] {self.error_type}: {self.message}"
    
    def to_dict(self) -> dict:
        return {
            "type": self.error_type,
            "message": self.message,
            "severity": self.severity.value,
            "recoverable": self.recoverable,
            "context": self.context,
            "cause": str(self.cause) if self.cause else None,
        }

22.3 LLM输出解析错误处理

22.3.1 JSON解析失败的处理

LLM 返回的 JSON 经常格式不完美——缺少引号、多余逗号、混合了自然语言等:

python
import json
import re
from typing import Any, TypeVar, Type


T = TypeVar('T')


class OutputParser:
    """LLM 输出解析器——容错版"""
    
    @staticmethod
    def parse_json(text: str, strict: bool = False) -> dict:
        """
        解析 LLM 输出的 JSON,支持多种容错模式。
        
        容错策略按优先级:
        1. 直接解析
        2. 提取JSON代码块
        3. 修复常见格式问题
        4. 降级为结构化文本
        """
        # 策略1:直接解析
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass
        
        if strict:
            raise ValueError(f"严格的JSON解析失败: {text[:200]}")
        
        # 策略2:提取 ```json ... ``` 代码块
        json_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', text, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group(1))
            except json.JSONDecodeError:
                pass
        
        # 策略3:提取 { ... } 或 [ ... ]
        brace_match = re.search(r'\{.*\}', text, re.DOTALL)
        bracket_match = re.search(r'\[.*\]', text, re.DOTALL)
        
        for match in [brace_match, bracket_match]:
            if match:
                try:
                    return json.loads(match.group(0))
                except json.JSONDecodeError:
                    cleaned = OutputParser._fix_common_issues(match.group(0))
                    try:
                        return json.loads(cleaned)
                    except json.JSONDecodeError:
                        pass
        
        # 策略4:降级为键值对提取
        return OutputParser._fallback_parse(text)
    
    @staticmethod
    def _fix_common_issues(text: str) -> str:
        """修复常见的JSON格式问题"""
        # 移除注释(// ...)
        text = re.sub(r'//.*', '', text)
        # 移除尾随逗号
        text = re.sub(r',\s*([}\]])', r'\1', text)
        # 单引号转双引号
        text = re.sub(r"'", '"', text)
        # None -> null
        text = re.sub(r'\bNone\b', 'null', text)
        # True/False -> true/false
        text = re.sub(r'\bTrue\b', 'true', text)
        re.sub(r'\bFalse\b', 'false', text)
        return text
    
    @staticmethod
    def _fallback_parse(text: str) -> dict:
        """降级解析——提取键值对"""
        result = {}
        # 匹配 key: value 或 key=value
        pairs = re.findall(r'["\']?(\w+)["\']?\s*[:=]\s*["\']?([^"\',\n]+)["\']?', text)
        for key, value in pairs:
            result[key.strip()] = value.strip()
        return result if result else {"raw_text": text}
    
    @staticmethod
    def parse_with_schema(text: str, schema: dict, max_retries: int = 2) -> dict:
        """
        使用Schema约束解析。如果直接解析失败,
        使用schema信息尝试修复。
        """
        result = OutputParser.parse_json(text)
        
        if not OutputParser._validate_schema(result, schema):
            # 尝试用schema的默认值补全缺失字段
            for key, config in schema.get("properties", {}).items():
                if key not in result and "default" in config:
                    result[key] = config["default"]
            
            if not OutputParser._validate_schema(result, schema):
                raise ValueError(
                    f"输出不符合Schema。期望字段: {list(schema.get('properties', {}).keys())},"
                    f"实际字段: {list(result.keys())}"
                )
        
        return result
    
    @staticmethod
    def _validate_schema(data: dict, schema: dict) -> bool:
        """简易Schema验证"""
        required = schema.get("required", [])
        return all(k in data for k in required)


# 使用示例
raw_output = """
让我分析一下这个请求:

```json
{
    "sentiment": "positive",
    "confidence": 0.85,
    "reason": "用户表达了满意",
    // 这是一个注释
}

"""

result = OutputParser.parse_json(raw_output) print(result) #


### 22.3.2 结构化输出重试

```python
from typing import Callable, Optional


class StructuredOutputRetry:
    """结构化输出重试器"""
    
    def __init__(self, llm, max_retries: int = 3):
        self.llm = llm
        self.max_retries = max_retries
    
    def call(
        self,
        prompt: str,
        output_schema: dict,
        fix_hint: str = None,
    ) -> dict:
        """调用LLM并确保返回结构化输出"""
        last_error = None
        
        for attempt in range(self.max_retries):
            # 构建带Schema的Prompt
            schema_prompt = self._build_schema_prompt(prompt, output_schema)
            
            if attempt > 0 and last_error:
                schema_prompt += f"\n\n注意:上次解析失败({last_error})。请严格按照JSON格式输出。"
            
            raw = self.llm.generate(user=schema_prompt, temperature=0.1)
            
            try:
                result = OutputParser.parse_json(raw)
                if OutputParser._validate_schema(result, output_schema):
                    return result
                last_error = f"缺少必需字段"
            except ValueError as e:
                last_error = str(e)
        
        # 所有重试失败,返回最佳尝试
        return {"error": "无法获取结构化输出", "raw": raw, "attempts": self.max_retries}
    
    def _build_schema_prompt(self, prompt: str, schema: dict) -> str:
        """构建带Schema约束的Prompt"""
        schema_str = json.dumps(schema, ensure_ascii=False, indent=2)
        return f"""{prompt}

请以JSON格式回复,Schema如下:
{schema_str}

只输出JSON,不要包含其他文字。"""

22.4 工具调用失败处理

22.4.1 工具执行器

python
import time
from typing import Any, Callable, Optional
from dataclasses import dataclass, field


@dataclass
class ToolResult:
    """工具执行结果"""
    success: bool
    data: Any = None
    error: str = ""
    duration_ms: float = 0
    retries_used: int = 0


@dataclass
class ToolConfig:
    """工具配置"""
    name: str
    handler: Callable
    timeout_seconds: float = 30.0
    max_retries: int = 3
    retry_delay_seconds: float = 1.0
    required_params: list[str] = field(default_factory=list)
    fallback: Optional[Callable] = None  # 降级函数


class ToolExecutor:
    """工具执行器——带超时、重试和降级"""
    
    def __init__(self):
        self._tools: dict[str, ToolConfig] = {}
        self._execution_log: list[dict] = []
    
    def register(self, config: ToolConfig):
        self._tools[config.name] = config
    
    async def execute(self, tool_name: str, params: dict = None) -> ToolResult:
        config = self._tools.get(tool_name)
        if not config:
            return ToolResult(success=False, error=f"工具 {tool_name} 未注册")
        
        # 参数验证
        missing = [p for p in config.required_params if not params.get(p)]
        if missing:
            return ToolResult(
                success=False, 
                error=f"缺少必需参数: {missing}"
            )
        
        last_error = ""
        retries = 0
        
        for attempt in range(config.max_retries + 1):
            try:
                t0 = time.time()
                
                # 异步执行(带超时)
                result = await self._execute_with_timeout(
                    config.handler, params, config.timeout_seconds
                )
                
                duration = (time.time() - t0) * 1000
                
                self._log(tool_name, True, duration, retries)
                
                return ToolResult(
                    success=True,
                    data=result,
                    duration_ms=duration,
                    retries_used=retries,
                )
                
            except TimeoutError:
                last_error = f"执行超时({config.timeout_seconds}s)"
                retries += 1
                
            except Exception as e:
                last_error = str(e)
                retries += 1
            
            if retries <= config.max_retries:
                await self._wait(config.retry_delay_seconds * (2 ** (retries - 1)))
        
        # 所有重试失败——尝试降级
        if config.fallback:
            try:
                fallback_result = await config.fallback(params)
                return ToolResult(
                    success=True, data=fallback_result,
                    error=f"主工具失败,使用降级: {last_error}",
                    retries_used=retries,
                )
            except Exception as e:
                last_error = f"降级也失败: {e}"
        
        self._log(tool_name, False, 0, retries)
        
        return ToolResult(
            success=False,
            error=f"工具 {tool_name} 执行失败({retries}次重试后): {last_error}",
            retries_used=retries,
        )
    
    async def _execute_with_timeout(self, fn, params, timeout):
        """带超时的异步执行"""
        import asyncio
        try:
            return await asyncio.wait_for(fn(params), timeout=timeout)
        except asyncio.TimeoutError:
            raise TimeoutError()
    
    @staticmethod
    async def _wait(seconds: float):
        import asyncio
        await asyncio.sleep(seconds)
    
    def _log(self, tool_name: str, success: bool, duration_ms: float, retries: int):
        self._execution_log.append({
            "tool": tool_name,
            "success": success,
            "duration_ms": duration_ms,
            "retries": retries,
            "timestamp": time.time(),
        })

22.4.2 TypeScript工具执行器

typescript
interface ToolConfig {
  name: string;
  handler: (params: any) => Promise<any>;
  timeoutMs: number;
  maxRetries: number;
  fallback?: (params: any) => Promise<any>;
}

interface ToolResult {
  success: boolean;
  data?: any;
  error?: string;
  durationMs: number;
  retriesUsed: number;
}

export class ToolExecutor {
  private tools = new Map<string, ToolConfig>();

  register(config: ToolConfig): void {
    this.tools.set(config.name, config);
  }

  async execute(toolName: string, params: any): Promise<ToolResult> {
    const config = this.tools.get(toolName);
    if (!config) return { success: false, error: `Tool ${toolName} not found`, durationMs: 0, retriesUsed: 0 };

    let lastError = "";
    for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
      try {
        const t0 = Date.now();
        const result = await Promise.race([
          config.handler(params),
          new Promise((_, reject) => 
            setTimeout(() => reject(new Error('Timeout')), config.timeoutMs)
          ),
        ]);
        return { success: true, data: result, durationMs: Date.now() - t0, retriesUsed: attempt };
      } catch (e: any) {
        lastError = e.message;
        if (attempt < config.maxRetries) {
          await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
        }
      }
    }

    // Fallback
    if (config.fallback) {
      try {
        const data = await config.fallback(params);
        return { success: true, data, error: `Fallback used: ${lastError}`, durationMs: 0, retriesUsed: config.maxRetries };
      } catch {}
    }

    return { success: false, error: lastError, durationMs: 0, retriesUsed: config.maxRetries };
  }
}

22.5 超时与降级策略

22.5.1 分层超时设计

python
class TimeoutConfig:
    """分层超时配置"""
    # 整体请求超时
    REQUEST_TIMEOUT = 60.0        # 秒
    
    # LLM 调用超时
    LLM_CALL_TIMEOUT = 30.0
    
    # 工具执行超时
    TOOL_EXECUTION_TIMEOUT = 15.0
    
    # 外部 API 超时
    EXTERNAL_API_TIMEOUT = 10.0
    
    # 流式响应首个 Token 超时
    FIRST_TOKEN_TIMEOUT = 5.0


class GracefulDegradation:
    """优雅降级策略"""
    
    def __init__(self):
        self._strategies: dict[str, list[Callable]] = {}
    
    def register(self, service: str, strategies: list[Callable]):
        """
        注册降级策略,按优先级排列。
        第一个是首选策略,最后是最终降级方案。
        """
        self._strategies[service] = strategies
    
    async def call(self, service: str, **kwargs) -> any:
        """尝试调用服务,失败时按策略降级"""
        strategies = self._strategies.get(service, [])
        
        if not strategies:
            raise RuntimeError(f"服务 {service} 没有注册任何策略")
        
        errors = []
        for i, strategy in enumerate(strategies):
            try:
                result = await strategy(**kwargs)
                if i > 0:
                    print(f"[降级] 服务 {service} 使用了第 {i+1} 降级策略")
                return result
            except Exception as e:
                errors.append(str(e))
                continue
        
        raise RuntimeError(
            f"服务 {service} 所有策略均失败: {errors}"
        )


# 使用示例
degradation = GracefulDegradation()

degradation.register("search", [
    # 策略1:实时搜索
    lambda query: realtime_search(query),
    # 策略2:缓存搜索
    lambda query: cached_search(query),
    # 策略3:返回提示信息
    lambda query: {"results": [], "message": "搜索服务暂时不可用,请稍后重试"},
])

degradation.register("translation", [
    lambda text, lang: llm_translate(text, lang),     # LLM翻译
    lambda text, lang: rule_based_translate(text, lang), # 规则翻译
    lambda text, lang: {"text": text, "note": "翻译服务不可用,返回原文"},
])

22.5.2 断路器模式

python
import time
from enum import Enum
from collections import deque


class CircuitState(Enum):
    CLOSED = "closed"      # 正常工作
    OPEN = "open"          # 断开(拒绝请求)
    HALF_OPEN = "half_open" # 半开(试探性放行)


class CircuitBreaker:
    """
    断路器——防止故障级联扩散。
    
    工作原理:
    1. CLOSED(关闭):正常转发请求,统计失败次数
    2. OPEN(打开):失败次数超过阈值,拒绝请求
    3. HALF_OPEN(半开):超时后尝试放行一个请求,
       成功则恢复,失败则继续打开
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,       # 触发断路的失败次数
        recovery_timeout: float = 30.0,    # 断路恢复超时(秒)
        half_open_max_calls: int = 3,      # 半开状态下最多试探请求数
        window_seconds: float = 60.0,      # 统计窗口
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        self.window_seconds = window_seconds
        
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: float = 0
        self._half_open_calls = 0
        self._window: deque = deque()  # (timestamp, success)
    
    @property
    def state(self) -> CircuitState:
        self._check_state()
        return self._state
    
    def _check_state(self):
        """检查是否应该从OPEN转到HALF_OPEN"""
        if self._state == CircuitState.OPEN:
            if time.time() - self._last_failure_time > self.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
    
    def _prune_window(self):
        """清理过期的统计窗口"""
        cutoff = time.time() - self.window_seconds
        while self._window and self._window[0][0] < cutoff:
            self._window.popleft()
    
    def allow_request(self) -> bool:
        """判断是否允许请求通过"""
        state = self.state
        
        if state == CircuitState.CLOSED:
            return True
        
        if state == CircuitState.HALF_OPEN:
            if self._half_open_calls < self.half_open_max_calls:
                self._half_open_calls += 1
                return True
            return False
        
        return False  # OPEN
    
    def record_success(self):
        """记录成功"""
        self._window.append((time.time(), True))
        self._success_count += 1
        
        if self._state == CircuitState.HALF_OPEN:
            # 半开状态下成功,恢复
            self._state = CircuitState.CLOSED
            self._failure_count = 0
    
    def record_failure(self):
        """记录失败"""
        self._window.append((time.time(), False))
        self._failure_count += 1
        self._last_failure_time = time.time()
        
        self._prune_window()
        recent_failures = sum(1 for _, s in self._window if not s)
        
        if recent_failures >= self.failure_threshold:
            self._state = CircuitState.OPEN
        
        if self._state == CircuitState.HALF_OPEN:
            # 半开状态下失败,回到OPEN
            self._state = CircuitState.OPEN
    
    def get_stats(self) -> dict:
        self._prune_window()
        return {
            "state": self._state.value,
            "failure_count": self._failure_count,
            "success_count": self._success_count,
            "window_size": len(self._window),
            "recent_failures": sum(1 for _, s in self._window if not s),
        }


# 使用示例
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10)

async def call_with_breaker(fn, *args, **kwargs):
    if not breaker.allow_request():
        return {"error": "服务暂时不可用(断路器已打开)", "fallback": True}
    
    try:
        result = await fn(*args, **kwargs)
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

22.6 指数退避重试

22.6.1 指数退避实现

python
import random
import asyncio
from dataclasses import dataclass


@dataclass
class RetryResult:
    success: bool
    data: any = None
    error: str = ""
    attempts: int = 0
    total_delay_ms: float = 0


class ExponentialBackoff:
    """
    指数退避重试器。
    
    退避公式: delay = min(base * 2^attempt + jitter, max_delay)
    
    支持以下特性:
    - 指数增长延迟
    - 随机抖动(Jitter)避免雷群效应
    - 可配置的退避参数
    - 可重试错误过滤
    """
    
    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        jitter: bool = True,
        retryable_errors: tuple = None,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter
        self.retryable_errors = retryable_errors or (
            TimeoutError, ConnectionError, 
        )
    
    def calculate_delay(self, attempt: int) -> float:
        """计算第N次重试的延迟"""
        delay = self.base_delay * (2 ** attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            # 全抖动:在 [0, delay] 范围内随机
            delay = random.uniform(0, delay)
        
        return delay
    
    def should_retry(self, error: Exception) -> bool:
        """判断错误是否应该重试"""
        return isinstance(error, self.retryable_errors)
    
    async def execute(self, fn, *args, **kwargs) -> RetryResult:
        """执行带重试的操作"""
        total_delay = 0
        
        for attempt in range(self.max_retries + 1):
            try:
                result = await fn(*args, **kwargs)
                return RetryResult(
                    success=True, data=result,
                    attempts=attempt + 1, total_delay_ms=total_delay * 1000,
                )
            except Exception as e:
                if attempt == self.max_retries or not self.should_retry(e):
                    return RetryResult(
                        success=False, error=str(e),
                        attempts=attempt + 1, total_delay_ms=total_delay * 1000,
                    )
                
                delay = self.calculate_delay(attempt)
                total_delay += delay
                print(f"[重试] 第 {attempt + 1} 次失败 ({e}),"
                      f"{delay:.1f}s 后重试...")
                await asyncio.sleep(delay)
    
    def get_schedule(self) -> list[float]:
        """获取重试时间表(预览)"""
        return [self.calculate_delay(i) for i in range(self.max_retries)]


# 使用示例
backoff = ExponentialBackoff(max_retries=5, base_delay=1.0, max_delay=60)
print(f"重试时间表: {backoff.get_schedule()}")
# [0.3, 1.7, 3.2, 7.8, 15.4]  (带抖动,每次不同)

22.6.2 自适应退避

python
class AdaptiveBackoff:
    """
    自适应退避——根据服务器响应动态调整退避策略。
    
    如果服务器返回 Retry-After 头,优先使用;
    如果检测到限流(429),增加退避时间;
    如果连续成功,逐步减少退避时间。
    """
    
    def __init__(
        self,
        max_retries: int = 5,
        initial_delay: float = 1.0,
        max_delay: float = 120.0,
        backoff_factor: float = 2.0,
    ):
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
        self._current_delay = initial_delay
        self._consecutive_successes = 0
    
    def get_delay(self, response_headers: dict = None) -> float:
        """计算下次重试的延迟"""
        # 优先使用服务器的 Retry-After
        if response_headers:
            retry_after = response_headers.get("Retry-After")
            if retry_after:
                try:
                    return float(retry_after)
                except ValueError:
                    pass
        
        return min(self._current_delay, self.max_delay)
    
    def record_success(self):
        """记录成功——逐步恢复初始延迟"""
        self._consecutive_successes += 1
        if self._consecutive_successes >= 3:
            self._current_delay = max(
                self.initial_delay, 
                self._current_delay / self.backoff_factor
            )
            self._consecutive_successes = 0
    
    def record_failure(self, status_code: int = 0):
        """记录失败——增加退避时间"""
        if status_code == 429:  # 限流
            self._current_delay *= self.backoff_factor * 1.5
        else:
            self._current_delay *= self.backoff_factor
        
        self._consecutive_successes = 0

22.7 错误日志与报警

22.7.1 结构化错误日志

python
import logging
import json
from datetime import datetime


class AgentErrorLogger:
    """Agent 专用错误日志器"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self._logger = logging.getLogger(f"agent.{service_name}")
        self._error_counts: dict[str, int] = {}
        self._alert_thresholds: dict[str, tuple[int, float]] = {}
        # {error_type: (count_threshold, time_window_seconds)}
        self._last_alert_time: dict[str, float] = {}
    
    def set_alert_threshold(
        self, error_type: str, count: int, window_seconds: float = 300
    ):
        """设置报警阈值"""
        self._alert_thresholds[error_type] = (count, window_seconds)
    
    def log_error(
        self,
        error_type: str,
        message: str,
        severity: str = "medium",
        context: dict = None,
    ):
        """记录错误"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "service": self.service_name,
            "error_type": error_type,
            "severity": severity,
            "message": message,
            "context": context or {},
        }
        
        # 写入日志
        self._logger.error(json.dumps(entry, ensure_ascii=False))
        
        # 更新计数
        self._error_counts[error_type] = self._error_counts.get(error_type, 0) + 1
        
        # 检查报警
        self._check_alert(error_type)
    
    def _check_alert(self, error_type: str):
        """检查是否需要报警"""
        if error_type not in self._alert_thresholds:
            return
        
        threshold, window = self._alert_thresholds[error_type]
        count = self._error_counts.get(error_type, 0)
        now = time.time()
        
        # 检查是否在冷却期
        last_alert = self._last_alert_time.get(error_type, 0)
        if now - last_alert < window:
            return
        
        if count >= threshold:
            self._send_alert(error_type, count, threshold)
            self._last_alert_time[error_type] = now
            self._error_counts[error_type] = 0
    
    def _send_alert(self, error_type: str, count: int, threshold: int):
        """发送报警(实际中对接通知系统)"""
        alert_msg = (
            f"🚨 [Agent报警] 服务: {self.service_name}\n"
            f"错误类型: {error_type}\n"
            f"发生次数: {count}(阈值: {threshold}\n"
            f"时间: {datetime.now().isoformat()}"
        )
        print(alert_msg)  # 实际中发送到 Slack/邮件/短信等

22.7.2 错误仪表板数据结构

python
@dataclass
class ErrorMetrics:
    """错误指标"""
    period: str
    total_errors: int = 0
    by_type: dict[str, int] = field(default_factory=dict)
    by_severity: dict[str, int] = field(default_factory=dict)
    top_errors: list[dict] = field(default_factory=list)
    
    def to_dashboard(self) -> dict:
        return {
            "period": self.period,
            "total_errors": self.total_errors,
            "error_rate": self._calc_error_rate(),
            "by_type": dict(sorted(
                self.by_type.items(), key=lambda x: -x[1]
            )[:10]),
            "by_severity": self.by_severity,
            "top_errors": self.top_errors[:5],
        }

22.8 最佳实践

22.8.1 错误处理策略清单

场景策略实现方式
LLM输出格式错误重试 + 自修复OutputParser + StructuredOutputRetry
工具API失败指数退避重试ExponentialBackoff
工具API持续故障断路器CircuitBreaker
服务降级多级降级GracefulDegradation
超时分层超时TimeoutConfig
用户输入无效友好提示输入验证 + 引导
未知错误降级响应 + 日志try-catch + AgentErrorLogger

22.8.2 常见陷阱

陷阱1:吞掉错误

python
# ❌ 吞掉所有错误
try:
    result = call_llm()
except:
    result = "出错了"

# ✅ 分类处理
try:
    result = call_llm()
except LLMTimeoutError:
    result = fallback_response()
    logger.warning("LLM超时,使用降级")
except Exception as e:
    result = error_response()
    logger.error(f"未知错误: {e}")

陷阱2:无限重试

python
# ❌ 无限重试
while True:
    try:
        return call_api()
    except:
        pass

# ✅ 有限重试 + 退避
backoff = ExponentialBackoff(max_retries=3)
result = await backoff.execute(call_api)

陷阱3:重试非幂等操作

python
# ❌ 重试非幂等操作(可能导致重复扣款)
backoff.execute(process_payment, amount=100)

# ✅ 非幂等操作需要先检查状态
async def safe_process_payment(params):
    if await check_payment_exists(params["order_id"]):
        return {"status": "already_processed"}
    return await process_payment(params)

22.9 本章小结

本章系统介绍了 Agent 系统的错误处理策略:

  1. 错误分类是第一步——理解错误类型才能对症下药
  2. 输出容错(JSON修复、Schema验证)应对 LLM 的不确定性
  3. 工具执行器(超时、重试、降级)是工具调用的安全网
  4. 断路器防止故障级联扩散
  5. 指数退避避免雷群效应
  6. 结构化日志是故障诊断的基础

记住:在 Agent 系统中,假设一切都会失败。你的任务是确保失败时的行为是可预测和可恢复的。

基于 MIT 许可发布