Skip to content

第25章:调试与诊断

概述

传统程序的调试是确定性的——断点、单步执行、变量检查足以定位大多数问题。但 Agent 系统调试面临独特挑战:LLM 的非确定性输出、多步推理的复杂链路、工具调用的副作用、以及"幻觉"等 LLM 特有的问题。本章将系统地介绍 Agent 调试的方法论和工具链,从本地环境搭建到生产环境故障排查,帮助你高效地定位和修复 Agent 系统中的各种问题。

25.1 Agent调试的挑战

25.1.1 非确定性

python
class NonDeterminismDebugger:
    """非确定性调试工具"""
    
    def __init__(self):
        self._execution_history: list[dict] = []
    
    async def debug_with_replay(self, agent, input_data: str,
                                 num_runs: int = 5) -> dict:
        """多次执行同一输入,对比差异"""
        results = []
        for i in range(num_runs):
            result = await agent.run(input_data)
            results.append({
                "run": i + 1,
                "output": result,
                "steps": agent.get_last_trace(),
            })
        
        # 分析一致性
        outputs = [r["output"] for r in results]
        unique_outputs = set(outputs)
        
        return {
            "consistency_rate": len(unique_outputs) / num_runs,
            "unique_outputs": len(unique_outputs),
            "total_runs": num_runs,
            "results": results,
            "diagnosis": self._diagnose_inconsistency(results),
        }
    
    def _diagnose_inconsistency(self, results: list[dict]) -> str:
        if all(r["output"] == results[0]["output"] for r in results):
            return "输出一致,非确定性不是问题来源。"
        
        steps_variations = set(
            tuple(s["tool"] for s in r["steps"]) 
            for r in results
        )
        
        if len(steps_variations) > 1:
            return "推理路径不一致:Agent在不同运行中选择了不同的工具/步骤。"
        return "推理路径一致但输出不同:可能是LLM生成波动,尝试降低temperature。"

25.1.2 调试难度等级

问题类型难度常见表现调试方法
代码Bug异常、崩溃传统调试器
Prompt问题⭐⭐输出格式错误、不遵循指令Prompt版本测试
工具调用失败⭐⭐超时、参数错误工具日志分析
推理路径错误⭐⭐⭐选择了错误的工具/方向思维链可视化
幻觉⭐⭐⭐⭐编造信息事实核查、RAG增强
循环推理⭐⭐⭐⭐Agent反复执行相同操作步骤计数器、终止条件
上下文溢出⭐⭐⭐Token超限、截断上下文监控

25.2 本地调试环境搭建

25.2.1 Mock LLM

python
from typing import Any

class MockLLM:
    """Mock LLM:用于调试时替代真实LLM调用"""
    
    def __init__(self):
        self._responses: list[str] = []
        self._call_log: list[dict] = []
        self._mode = "record"  # record / replay / fixed
    
    def set_fixed_response(self, response: str):
        """设置固定响应"""
        self._mode = "fixed"
        self._fixed_response = response
    
    def set_replay_responses(self, responses: list[str]):
        """设置回放响应序列"""
        self._mode = "replay"
        self._responses = responses
        self._index = 0
    
    async def chat(self, messages: list[dict], **kwargs) -> str:
        """模拟LLM调用"""
        # 记录调用
        call_info = {
            "messages": messages,
            "model": kwargs.get("model", "mock"),
            "timestamp": datetime.now().isoformat(),
        }
        self._call_log.append(call_info)
        
        if self._mode == "fixed":
            return self._fixed_response
        elif self._mode == "replay":
            if self._index < len(self._responses):
                response = self._responses[self._index]
                self._index += 1
                return response
            return "Mock response exhausted"
        else:
            return "[MOCK] This is a mock response"
    
    def get_call_log(self) -> list[dict]:
        return self._call_log

# 使用示例
mock_llm = MockLLM()
mock_llm.set_fixed_response(
    '我需要调用搜索工具来回答这个问题。\n'
    '```json\n{"tool": "search", "args": {"query": "Python Agent"}}\n```'
)
agent = Agent(llm=mock_llm, tools=[search_tool])
result = await agent.run("什么是Python Agent?")
print(mock_llm.get_call_log())  # 查看LLM被调用的情况

25.2.2 Replay机制

python
import pickle

class ExecutionReplayer:
    """执行回放器"""
    
    def __init__(self, storage_path: str = "replays/"):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)
    
    async def record(self, trace_id: str, 
                     agent: Any, input_data: str,
                     result: str):
        """记录完整执行过程"""
        recording = {
            "trace_id": trace_id,
            "input": input_data,
            "result": result,
            "llm_calls": agent.llm_call_log,
            "tool_calls": agent.tool_call_log,
            "decisions": agent.decision_log,
            "timestamp": datetime.now().isoformat(),
        }
        
        filepath = os.path.join(self.storage_path, f"{trace_id}.pkl")
        with open(filepath, "wb") as f:
            pickle.dump(recording, f)
    
    async def replay(self, trace_id: str) -> dict:
        """回放执行过程"""
        filepath = os.path.join(self.storage_path, f"{trace_id}.pkl")
        with open(filepath, "rb") as f:
            recording = pickle.load(f)
        
        print(f"📋 回放 Trace: {trace_id}")
        print(f"   输入: {recording['input'][:100]}...")
        print(f"   结果: {recording['result'][:100]}...")
        print(f"   LLM调用: {len(recording['llm_calls'])}次")
        print(f"   工具调用: {len(recording['tool_calls'])}次")
        
        return recording

25.2.3 交互式调试器

python
class AgentDebugger:
    """Agent交互式调试器"""
    
    def __init__(self, agent):
        self.agent = agent
        self.breakpoints: set[str] = set()
        self._step_mode = False
    
    def set_breakpoint(self, event_type: str):
        """设置断点"""
        self.breakpoints.add(event_type)
    
    async def debug_run(self, input_data: str) -> str:
        """带调试的执行"""
        self.agent.on("llm_call", self._on_llm_call)
        self.agent.on("tool_call", self._on_tool_call)
        self.agent.on("decision", self._on_decision)
        
        result = await self.agent.run(input_data)
        return result
    
    async def _on_llm_call(self, event: dict):
        if "llm_call" in self.breakpoints or self._step_mode:
            print(f"\n🧠 LLM调用:")
            print(f"   输入: {event['input'][:200]}...")
            print(f"   模型: {event.get('model', 'unknown')}")
            
            if self._step_mode:
                action = input("   [c]继续 [s]单步 [m]修改 [q]退出: ")
                if action == 'q':
                    raise KeyboardInterrupt()
                elif action == 'm':
                    new_input = input("   新输入: ")
                    event['input'] = new_input
    
    async def _on_tool_call(self, event: dict):
        if "tool_call" in self.breakpoints or self._step_mode:
            print(f"\n🔧 工具调用: {event['tool_name']}")
            print(f"   参数: {event['args']}")
            
            if self._step_mode:
                action = input("   [c]继续 [s]跳过 [e]编辑参数 [q]退出: ")
                if action == 's':
                    event['skip'] = True
                elif action == 'e':
                    new_args = input("   新参数(JSON): ")
                    event['args'] = json.loads(new_args)
    
    async def _on_decision(self, event: dict):
        if "decision" in self.breakpoints or self._step_mode:
            print(f"\n💭 决策: {event['reasoning'][:200]}...")

25.3 思维链调试

25.3.1 CoT可视化

python
class ChainOfThoughtDebugger:
    """思维链调试器"""
    
    def visualize(self, trace: list[dict]) -> str:
        """将思维链可视化为文本"""
        lines = ["\n" + "=" * 70, "🧠 Agent 思维链", "=" * 70]
        
        for i, step in enumerate(trace):
            step_type = step.get("type", "unknown")
            
            if step_type == "reasoning":
                lines.append(f"\n📌 步骤 {i+1}: 推理")
                lines.append(f"   {step['content']}")
            
            elif step_type == "tool_call":
                lines.append(f"\n🔧 步骤 {i+1}: 调用工具")
                lines.append(f"   工具: {step['tool_name']}")
                lines.append(f"   参数: {json.dumps(step['args'], ensure_ascii=False)}")
                status = "✅" if step.get("success") else "❌"
                lines.append(f"   结果: {status} {str(step.get('result', ''))[:200]}")
            
            elif step_type == "observation":
                lines.append(f"\n👁️ 步骤 {i+1}: 观察")
                lines.append(f"   {step['content'][:300]}")
            
            elif step_type == "final_answer":
                lines.append(f"\n✅ 最终回答:")
                lines.append(f"   {step['content']}")
        
        lines.append("\n" + "=" * 70)
        return "\n".join(lines)
    
    def find_anomalies(self, trace: list[dict]) -> list[dict]:
        """检测思维链中的异常"""
        anomalies = []
        
        for i, step in enumerate(trace):
            # 检测循环
            if i > 0 and step.get("type") == "tool_call":
                for j in range(max(0, i-5), i):
                    if (trace[j].get("tool_name") == step.get("tool_name") and
                        trace[j].get("args") == step.get("args")):
                        anomalies.append({
                            "type": "loop_detected",
                            "step": i,
                            "detail": f"重复调用 {step['tool_name']},参数相同"
                        })
            
            # 检测工具调用失败
            if step.get("type") == "tool_call" and not step.get("success"):
                anomalies.append({
                    "type": "tool_failure",
                    "step": i,
                    "detail": f"{step['tool_name']} 失败: {step.get('error', '')}"
                })
            
            # 检测推理过长
            if step.get("type") == "reasoning":
                if len(step.get("content", "")) > 2000:
                    anomalies.append({
                        "type": "verbose_reasoning",
                        "step": i,
                        "detail": "推理内容过长,可能导致Token浪费"
                    })
        
        return anomalies

25.3.2 推理步骤检查

python
class StepInspector:
    """推理步骤检查器"""
    
    def check_prompt_drift(self, trace: list[dict]) -> list[str]:
        """检查Prompt漂移——Agent是否偏离了原始目标"""
        warnings = []
        
        if not trace:
            return warnings
        
        original_goal = None
        for step in trace:
            if step.get("type") == "user_input":
                original_goal = step["content"]
                break
        
        if not original_goal:
            return warnings
        
        # 检查后续步骤是否与原始目标相关
        for i, step in enumerate(trace):
            if step.get("type") == "tool_call":
                tool_name = step["tool_name"]
                # 如果工具调用与目标无关
                if not self._is_relevant(tool_name, original_goal):
                    warnings.append(
                        f"步骤{i}: 工具 {tool_name} 可能与目标不相关"
                    )
        
        return warnings
    
    def _is_relevant(self, tool_name: str, goal: str) -> bool:
        """简单的相关性判断"""
        goal_lower = goal.lower()
        tool_lower = tool_name.lower()
        
        # 关键词匹配
        keywords_map = {
            "search": ["搜索", "查找", "查询", "search", "find"],
            "calculate": ["计算", "统计", "分析", "calculate"],
            "database": ["数据库", "记录", "数据", "database"],
            "email": ["邮件", "发送", "通知", "email", "send"],
        }
        
        for key, keywords in keywords_map.items():
            if key in tool_lower:
                if any(kw in goal_lower for kw in keywords):
                    return True
        
        return True  # 默认相关

25.4 工具调用调试

25.4.1 工具执行追踪

python
class ToolCallTracer:
    """工具调用追踪器"""
    
    def __init__(self):
        self._traces: list[dict] = []
    
    def trace_call(self, tool_name: str, args: dict,
                   result: Any, duration_ms: float,
                   success: bool, error: str = None):
        self._traces.append({
            "tool_name": tool_name,
            "args": args,
            "result_preview": str(result)[:200] if result else None,
            "duration_ms": duration_ms,
            "success": success,
            "error": error,
            "timestamp": datetime.now().isoformat(),
        })
    
    def get_failed_calls(self) -> list[dict]:
        return [t for t in self._traces if not t["success"]]
    
    def get_slow_calls(self, threshold_ms: float = 2000) -> list[dict]:
        return [t for t in self._traces if t["duration_ms"] > threshold_ms]
    
    def get_summary(self) -> dict:
        if not self._traces:
            return {"total_calls": 0}
        
        return {
            "total_calls": len(self._traces),
            "success_rate": sum(1 for t in self._traces if t["success"]) / len(self._traces),
            "avg_duration_ms": sum(t["duration_ms"] for t in self._traces) / len(self._traces),
            "failed_calls": self.get_failed_calls(),
            "slow_calls": self.get_slow_calls(),
            "tools_called": list(set(t["tool_name"] for t in self._traces)),
        }

25.4.2 参数验证

python
from pydantic import BaseModel, ValidationError

class ToolParameterValidator:
    """工具参数验证器"""
    
    def __init__(self, tool_schemas: dict[str, type[BaseModel]]):
        self.schemas = tool_schemas
    
    def validate(self, tool_name: str, args: dict) -> tuple[bool, str]:
        """验证工具参数"""
        if tool_name not in self.schemas:
            return True, ""  # 未注册的工具跳过验证
        
        schema = self.schemas[tool_name]
        try:
            schema(**args)
            return True, ""
        except ValidationError as e:
            return False, self._format_error(e)
    
    def _format_error(self, error: ValidationError) -> str:
        errors = []
        for err in error.errors():
            field = ".".join(str(loc) for loc in err["loc"])
            errors.append(f"  {field}: {err['msg']}")
        return "参数验证失败:\n" + "\n".join(errors)

# 示例:定义工具参数Schema
class SearchArgs(BaseModel):
    query: str
    max_results: int = 5
    language: str = "zh"

validator = ToolParameterValidator({"search": SearchArgs})
valid, error_msg = validator.validate("search", {"query": "test"})

25.4.3 副作用回放

python
class SideEffectRecorder:
    """副作用记录器"""
    
    def __init__(self):
        self._recordings: list[dict] = []
        self._replaying = False
        self._replay_index = 0
    
    def start_recording(self):
        self._recordings = []
        self._replaying = False
    
    def start_replaying(self):
        self._replaying = True
        self._replay_index = 0
    
    async def execute_with_recording(self, func, *args, **kwargs):
        if self._replaying:
            # 回放模式:返回记录的结果
            recording = self._recordings[self._replay_index]
            self._replay_index += 1
            return recording["result"]
        
        # 记录模式:执行并记录
        result = await func(*args, **kwargs)
        self._recordings.append({
            "func": func.__name__,
            "args": str(args),
            "kwargs": str(kwargs),
            "result": result,
        })
        return result

25.5 Prompt调试

25.5.1 A/B测试

python
class PromptABTest:
    """Prompt A/B测试"""
    
    async def run_test(self, prompt_a: str, prompt_b: str,
                      test_cases: list[dict],
                      evaluator) -> dict:
        """运行A/B测试"""
        results_a = []
        results_b = []
        
        for case in test_cases:
            input_data = case["input"]
            expected = case["expected"]
            
            # 测试Prompt A
            output_a = await self._run_with_prompt(prompt_a, input_data)
            score_a = await evaluator.evaluate(expected, output_a)
            results_a.append(score_a)
            
            # 测试Prompt B
            output_b = await self._run_with_prompt(prompt_b, input_data)
            score_b = await evaluator.evaluate(expected, output_b)
            results_b.append(score_b)
        
        return {
            "prompt_a": {
                "avg_score": sum(results_a) / len(results_a),
                "scores": results_a,
            },
            "prompt_b": {
                "avg_score": sum(results_b) / len(results_b),
                "scores": results_b,
            },
            "winner": "A" if sum(results_a) > sum(results_b) else "B",
            "improvement": (
                (sum(results_b) - sum(results_a)) / sum(results_a) * 100
            )
        }

25.5.2 Prompt版本管理

python
class PromptVersionManager:
    """Prompt版本管理"""
    
    def __init__(self, storage_path: str = "prompts/"):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)
    
    def save_version(self, prompt_name: str, version: str,
                     content: str, description: str = ""):
        """保存Prompt版本"""
        version_data = {
            "name": prompt_name,
            "version": version,
            "content": content,
            "description": description,
            "created_at": datetime.now().isoformat(),
            "content_hash": hashlib.md5(content.encode()).hexdigest(),
        }
        
        filepath = os.path.join(
            self.storage_path, f"{prompt_name}_v{version}.json"
        )
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(version_data, f, ensure_ascii=False, indent=2)
    
    def load_version(self, prompt_name: str, 
                     version: str) -> str:
        """加载Prompt版本"""
        filepath = os.path.join(
            self.storage_path, f"{prompt_name}_v{version}.json"
        )
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data["content"]
    
    def list_versions(self, prompt_name: str) -> list[dict]:
        """列出所有版本"""
        versions = []
        pattern = os.path.join(
            self.storage_path, f"{prompt_name}_v*.json"
        )
        for filepath in glob.glob(pattern):
            with open(filepath, "r", encoding="utf-8") as f:
                data = json.load(f)
            versions.append({
                "version": data["version"],
                "description": data["description"],
                "created_at": data["created_at"],
                "hash": data["content_hash"],
            })
        return sorted(versions, key=lambda x: x["version"])
    
    def diff_versions(self, prompt_name: str,
                      v1: str, v2: str) -> str:
        """对比两个版本"""
        c1 = self.load_version(prompt_name, v1)
        c2 = self.load_version(prompt_name, v2)
        
        import difflib
        diff = difflib.unified_diff(
            c1.splitlines(), c2.splitlines(),
            fromfile=f"v{v1}", tofile=f"v{v2}", lineterm=""
        )
        return "\n".join(diff)

25.6 常见Agent故障诊断

25.6.1 循环推理

python
class LoopDetector:
    """循环推理检测器"""
    
    def __init__(self, max_repeated_steps: int = 3):
        self.max_repeated = max_repeated_steps
    
    def detect(self, trace: list[dict]) -> dict | None:
        """检测循环推理"""
        if len(trace) < self.max_repeated * 2:
            return None
        
        # 检查最近N步是否有重复模式
        recent = trace[-self.max_repeated * 2:]
        
        for pattern_len in range(1, self.max_repeated + 1):
            pattern = recent[:pattern_len]
            repetitions = 0
            
            for i in range(0, len(recent), pattern_len):
                segment = recent[i:i + pattern_len]
                if self._steps_match(pattern, segment):
                    repetitions += 1
                else:
                    break
            
            if repetitions >= self.max_repeated:
                return {
                    "type": "loop",
                    "pattern_length": pattern_len,
                    "repetitions": repetitions,
                    "pattern": [
                        s.get("tool_name", s.get("type", "?"))
                        for s in pattern
                    ],
                    "suggestion": "检查工具返回值是否有变化,或添加终止条件"
                }
        
        return None
    
    def _steps_match(self, a: list[dict], b: list[dict]) -> bool:
        if len(a) != len(b):
            return False
        for sa, sb in zip(a, b):
            if sa.get("tool_name") != sb.get("tool_name"):
                return False
            if sa.get("args") != sb.get("args"):
                return False
        return True

25.6.2 工具调用失败

python
class ToolFailureDiagnoser:
    """工具故障诊断"""
    
    COMMON_FAILURES = {
        "timeout": {
            "symptoms": ["TimeoutError", "timed out", "连接超时"],
            "solutions": [
                "增加超时时间",
                "检查网络连接",
                "添加重试机制",
                "使用异步调用"
            ]
        },
        "auth_failure": {
            "symptoms": ["401", "403", "Unauthorized", "Authentication"],
            "solutions": [
                "检查API Key是否过期",
                "验证权限范围",
                "检查Token刷新逻辑",
                "确认账户状态"
            ]
        },
        "rate_limit": {
            "symptoms": ["429", "Rate limit", "Too many requests"],
            "solutions": [
                "实现限流器",
                "增加请求间隔",
                "使用缓存减少调用",
                "申请更高的速率限制"
            ]
        },
        "invalid_params": {
            "symptoms": ["400", "Bad Request", "Invalid", "required"],
            "solutions": [
                "检查参数类型和格式",
                "验证必需参数是否提供",
                "检查枚举值是否正确",
                "添加参数校验"
            ]
        },
    }
    
    def diagnose(self, error: Exception) -> dict:
        """诊断工具失败原因"""
        error_str = str(error)
        
        for failure_type, info in self.COMMON_FAILURES.items():
            for symptom in info["symptoms"]:
                if symptom.lower() in error_str.lower():
                    return {
                        "type": failure_type,
                        "solutions": info["solutions"],
                        "error": error_str,
                    }
        
        return {
            "type": "unknown",
            "solutions": ["查看详细错误日志", "检查API文档", "联系服务提供方"],
            "error": error_str,
        }

25.6.3 上下文溢出

python
class ContextOverflowHandler:
    """上下文溢出处理器"""
    
    def __init__(self, max_tokens: int = 128000):
        self.max_tokens = max_tokens
    
    def check_and_fix(self, messages: list[dict]) -> tuple[list[dict], str]:
        """检查并修复上下文溢出"""
        total = sum(self._estimate_tokens(m["content"]) for m in messages)
        
        if total <= self.max_tokens:
            return messages, "ok"
        
        # 修复策略
        messages = self._emergency_trim(messages)
        new_total = sum(self._estimate_tokens(m["content"]) for m in messages)
        
        return messages, (
            f"context_overflow: {total}{new_total} tokens "
            f"(trimmed {total - new_total} tokens)"
        )
    
    def _emergency_trim(self, messages: list[dict]) -> list[dict]:
        """紧急裁剪:保留system + 最近3轮"""
        system = [m for m in messages if m["role"] == "system"]
        non_system = [m for m in messages if m["role"] != "system"]
        
        kept = non_system[-6:] if len(non_system) > 6 else non_system
        return system + kept
    
    def _estimate_tokens(self, text: str) -> int:
        return len(text) // 3

25.7 日志分析与问题定位

25.7.1 日志关联分析

python
class LogCorrelator:
    """日志关联分析器"""
    
    def find_root_cause(self, trace_id: str, 
                        logs: list[dict]) -> dict:
        """通过日志关联找到根本原因"""
        trace_logs = [l for l in logs if l.get("trace_id") == trace_id]
        
        # 时间线排序
        trace_logs.sort(key=lambda l: l.get("timestamp", ""))
        
        # 查找第一个错误
        first_error = None
        for log in trace_logs:
            if log.get("level") in ("ERROR", "WARNING"):
                first_error = log
                break
        
        # 查找最慢的步骤
        slowest = max(
            (l for l in trace_logs if l.get("duration_ms")),
            key=lambda l: l.get("duration_ms", 0),
            default=None
        )
        
        # 查找Token消耗最大的步骤
        most_tokens = max(
            (l for l in trace_logs if l.get("tokens")),
            key=lambda l: l.get("tokens", 0),
            default=None
        )
        
        return {
            "trace_id": trace_id,
            "total_steps": len(trace_logs),
            "first_error": first_error,
            "slowest_step": slowest,
            "most_tokens_step": most_tokens,
            "root_cause_hypothesis": self._hypothesize(
                first_error, slowest, most_tokens
            ),
        }
    
    def _hypothesize(self, error, slowest, most_tokens) -> str:
        if error:
            return f"首个错误: {error.get('message', 'unknown')}"
        if slowest and slowest.get("duration_ms", 0) > 5000:
            return f"性能瓶颈: {slowest.get('type', 'unknown')} 耗时 {slowest.get('duration_ms')}ms"
        return "未发现明显异常"

25.8 Agent调试工具生态

25.8.1 主流工具对比

工具类型核心功能适用场景开源
LangSmith云服务追踪、评估、调试LangChain生态
Weave开源追踪、评估、实验管理通用LLM应用
PromptFoo开源Prompt测试、评估、红队Prompt工程
Arize Phoenix开源追踪、评估、可观测生产监控
Langfuse开源追踪、Prompt管理、评估团队协作
Helicone开源缓存、日志、成本追踪成本优化

25.8.2 Langfuse集成示例

python
class LangfuseDebugger:
    """Langfuse调试集成"""
    
    def __init__(self, public_key: str, secret_key: str,
                 host: str = "https://cloud.langfuse.com"):
        from langfuse import Langfuse
        self.langfuse = Langfuse(public_key, secret_key, host)
    
    def trace_agent(self, trace_id: str, agent_name: str,
                    input_data: str, output_data: str,
                    metadata: dict = None):
        """记录Agent执行追踪"""
        self.langfuse.trace(
            id=trace_id,
            name=f"{agent_name}_execution",
            input=input_data,
            output=output_data,
            metadata=metadata or {},
        )
    
    def log_llm_call(self, trace_id: str, span_id: str,
                     model: str, prompt: str, completion: str,
                     usage: dict):
        """记录LLM调用"""
        self.langfuse.generation(
            trace_id=trace_id,
            id=span_id,
            model=model,
            input=prompt,
            output=completion,
            usage=usage,
        )

25.9 生产环境故障排查

25.9.1 实时监控仪表盘

python
class AgentHealthDashboard:
    """Agent健康状态仪表盘"""
    
    def __init__(self, metrics_collector):
        self.metrics = metrics_collector
    
    def get_health_status(self) -> dict:
        """获取健康状态"""
        summary = self.metrics.get_summary()
        
        health = "healthy"
        issues = []
        
        # 检查错误率
        error_rate = 1 - summary["requests"]["success_rate"] / 100
        if error_rate > 0.1:
            health = "unhealthy"
            issues.append(f"错误率过高: {error_rate:.1%}")
        elif error_rate > 0.05:
            health = "degraded"
            issues.append(f"错误率偏高: {error_rate:.1%}")
        
        # 检查延迟
        p95 = summary["latency_ms"]["e2e_p95"]
        if p95 > 30000:
            health = "unhealthy"
            issues.append(f"P95延迟过高: {p95:.0f}ms")
        elif p95 > 15000:
            health = "degraded"
            issues.append(f"P95延迟偏高: {p95:.0f}ms")
        
        return {
            "status": health,
            "issues": issues,
            "metrics": summary,
        }

25.9.2 故障注入测试

python
class FaultInjector:
    """故障注入器:主动测试Agent的容错能力"""
    
    def __init__(self, agent):
        self.agent = agent
        self._fault_config: dict = {}
    
    def inject_delay(self, tool_name: str, delay_ms: float):
        """注入延迟"""
        self._fault_config[f"delay_{tool_name}"] = delay_ms
    
    def inject_error(self, tool_name: str, error_msg: str):
        """注入错误"""
        self._fault_config[f"error_{tool_name}"] = error_msg
    
    def inject_timeout(self, tool_name: str):
        """注入超时"""
        self._fault_config[f"timeout_{tool_name}"] = True
    
    async def run_with_faults(self, task: str) -> dict:
        """在故障条件下执行"""
        start = time.time()
        try:
            result = await self.agent.run(task)
            return {
                "success": True,
                "result": result,
                "duration_ms": (time.time() - start) * 1000,
                "faults_applied": self._fault_config,
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "duration_ms": (time.time() - start) * 1000,
                "faults_applied": self._fault_config,
            }

最佳实践

  1. 结构化追踪:从第一天起就记录完整的执行追踪(LLM I/O、工具调用、决策)
  2. Mock优先:本地调试时用Mock LLM替代真实调用,快速迭代
  3. 版本化Prompt:每次修改Prompt都保存版本,方便回滚和A/B测试
  4. 设置护栏:最大步数、超时、成本上限,防止Agent失控
  5. 故障注入:主动注入故障,验证Agent的容错能力

常见陷阱

  1. 只在线上调试:在生产环境直接调试是危险的。搭建本地调试环境
  2. 忽略temperature:调试时忘记设temperature=0,导致结果不可复现
  3. 日志过于详细:记录了完整Prompt导致日志存储成本巨大。脱敏+截断
  4. 不保存执行记录:出了问题没有回放数据。始终保存追踪记录
  5. 忽略间歇性故障:只在持续故障时才排查。间歇性故障需要长时间监控

小结

Agent 调试是一个兼具技术和耐心的过程。通过 Mock LLM、执行回放、思维链可视化、工具调用追踪、Prompt 版本管理等工具和方法,我们可以有效定位 Agent 系统中的各种问题。关键原则是:可观测性先行——没有日志和追踪,调试就是盲人摸象。

延伸阅读

  1. LangSmith文档: https://docs.smith.langchain.com/
  2. Langfuse文档: https://langfuse.com/docs
  3. PromptFoo: https://promptfoo.dev/ — Prompt测试框架
  4. Weave: https://wandb.ai/weave — LLM追踪工具
  5. 论文: "Debugging LLM-as-a-Judge" — 评估方法的调试验证

基于 MIT 许可发布