第25章:调试与诊断
概述
传统程序的调试是确定性的——断点、单步执行、变量检查足以定位大多数问题。但 Agent 系统调试面临独特挑战:LLM 的非确定性输出、多步推理的复杂链路、工具调用的副作用、以及"幻觉"等 LLM 特有的问题。本章将系统地介绍 Agent 调试的方法论和工具链,从本地环境搭建到生产环境故障排查,帮助你高效地定位和修复 Agent 系统中的各种问题。
25.1 Agent调试的挑战
25.1.1 非确定性
python
class NonDeterminismDebugger:
"""非确定性调试工具"""
def __init__(self):
self._execution_history: list[dict] = []
async def debug_with_replay(self, agent, input_data: str,
num_runs: int = 5) -> dict:
"""多次执行同一输入,对比差异"""
results = []
for i in range(num_runs):
result = await agent.run(input_data)
results.append({
"run": i + 1,
"output": result,
"steps": agent.get_last_trace(),
})
# 分析一致性
outputs = [r["output"] for r in results]
unique_outputs = set(outputs)
return {
"consistency_rate": len(unique_outputs) / num_runs,
"unique_outputs": len(unique_outputs),
"total_runs": num_runs,
"results": results,
"diagnosis": self._diagnose_inconsistency(results),
}
def _diagnose_inconsistency(self, results: list[dict]) -> str:
if all(r["output"] == results[0]["output"] for r in results):
return "输出一致,非确定性不是问题来源。"
steps_variations = set(
tuple(s["tool"] for s in r["steps"])
for r in results
)
if len(steps_variations) > 1:
return "推理路径不一致:Agent在不同运行中选择了不同的工具/步骤。"
return "推理路径一致但输出不同:可能是LLM生成波动,尝试降低temperature。"25.1.2 调试难度等级
| 问题类型 | 难度 | 常见表现 | 调试方法 |
|---|---|---|---|
| 代码Bug | ⭐ | 异常、崩溃 | 传统调试器 |
| Prompt问题 | ⭐⭐ | 输出格式错误、不遵循指令 | Prompt版本测试 |
| 工具调用失败 | ⭐⭐ | 超时、参数错误 | 工具日志分析 |
| 推理路径错误 | ⭐⭐⭐ | 选择了错误的工具/方向 | 思维链可视化 |
| 幻觉 | ⭐⭐⭐⭐ | 编造信息 | 事实核查、RAG增强 |
| 循环推理 | ⭐⭐⭐⭐ | Agent反复执行相同操作 | 步骤计数器、终止条件 |
| 上下文溢出 | ⭐⭐⭐ | Token超限、截断 | 上下文监控 |
25.2 本地调试环境搭建
25.2.1 Mock LLM
python
from typing import Any
class MockLLM:
"""Mock LLM:用于调试时替代真实LLM调用"""
def __init__(self):
self._responses: list[str] = []
self._call_log: list[dict] = []
self._mode = "record" # record / replay / fixed
def set_fixed_response(self, response: str):
"""设置固定响应"""
self._mode = "fixed"
self._fixed_response = response
def set_replay_responses(self, responses: list[str]):
"""设置回放响应序列"""
self._mode = "replay"
self._responses = responses
self._index = 0
async def chat(self, messages: list[dict], **kwargs) -> str:
"""模拟LLM调用"""
# 记录调用
call_info = {
"messages": messages,
"model": kwargs.get("model", "mock"),
"timestamp": datetime.now().isoformat(),
}
self._call_log.append(call_info)
if self._mode == "fixed":
return self._fixed_response
elif self._mode == "replay":
if self._index < len(self._responses):
response = self._responses[self._index]
self._index += 1
return response
return "Mock response exhausted"
else:
return "[MOCK] This is a mock response"
def get_call_log(self) -> list[dict]:
return self._call_log
# 使用示例
mock_llm = MockLLM()
mock_llm.set_fixed_response(
'我需要调用搜索工具来回答这个问题。\n'
'```json\n{"tool": "search", "args": {"query": "Python Agent"}}\n```'
)
agent = Agent(llm=mock_llm, tools=[search_tool])
result = await agent.run("什么是Python Agent?")
print(mock_llm.get_call_log()) # 查看LLM被调用的情况25.2.2 Replay机制
python
import pickle
class ExecutionReplayer:
"""执行回放器"""
def __init__(self, storage_path: str = "replays/"):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
async def record(self, trace_id: str,
agent: Any, input_data: str,
result: str):
"""记录完整执行过程"""
recording = {
"trace_id": trace_id,
"input": input_data,
"result": result,
"llm_calls": agent.llm_call_log,
"tool_calls": agent.tool_call_log,
"decisions": agent.decision_log,
"timestamp": datetime.now().isoformat(),
}
filepath = os.path.join(self.storage_path, f"{trace_id}.pkl")
with open(filepath, "wb") as f:
pickle.dump(recording, f)
async def replay(self, trace_id: str) -> dict:
"""回放执行过程"""
filepath = os.path.join(self.storage_path, f"{trace_id}.pkl")
with open(filepath, "rb") as f:
recording = pickle.load(f)
print(f"📋 回放 Trace: {trace_id}")
print(f" 输入: {recording['input'][:100]}...")
print(f" 结果: {recording['result'][:100]}...")
print(f" LLM调用: {len(recording['llm_calls'])}次")
print(f" 工具调用: {len(recording['tool_calls'])}次")
return recording25.2.3 交互式调试器
python
class AgentDebugger:
"""Agent交互式调试器"""
def __init__(self, agent):
self.agent = agent
self.breakpoints: set[str] = set()
self._step_mode = False
def set_breakpoint(self, event_type: str):
"""设置断点"""
self.breakpoints.add(event_type)
async def debug_run(self, input_data: str) -> str:
"""带调试的执行"""
self.agent.on("llm_call", self._on_llm_call)
self.agent.on("tool_call", self._on_tool_call)
self.agent.on("decision", self._on_decision)
result = await self.agent.run(input_data)
return result
async def _on_llm_call(self, event: dict):
if "llm_call" in self.breakpoints or self._step_mode:
print(f"\n🧠 LLM调用:")
print(f" 输入: {event['input'][:200]}...")
print(f" 模型: {event.get('model', 'unknown')}")
if self._step_mode:
action = input(" [c]继续 [s]单步 [m]修改 [q]退出: ")
if action == 'q':
raise KeyboardInterrupt()
elif action == 'm':
new_input = input(" 新输入: ")
event['input'] = new_input
async def _on_tool_call(self, event: dict):
if "tool_call" in self.breakpoints or self._step_mode:
print(f"\n🔧 工具调用: {event['tool_name']}")
print(f" 参数: {event['args']}")
if self._step_mode:
action = input(" [c]继续 [s]跳过 [e]编辑参数 [q]退出: ")
if action == 's':
event['skip'] = True
elif action == 'e':
new_args = input(" 新参数(JSON): ")
event['args'] = json.loads(new_args)
async def _on_decision(self, event: dict):
if "decision" in self.breakpoints or self._step_mode:
print(f"\n💭 决策: {event['reasoning'][:200]}...")25.3 思维链调试
25.3.1 CoT可视化
python
class ChainOfThoughtDebugger:
"""思维链调试器"""
def visualize(self, trace: list[dict]) -> str:
"""将思维链可视化为文本"""
lines = ["\n" + "=" * 70, "🧠 Agent 思维链", "=" * 70]
for i, step in enumerate(trace):
step_type = step.get("type", "unknown")
if step_type == "reasoning":
lines.append(f"\n📌 步骤 {i+1}: 推理")
lines.append(f" {step['content']}")
elif step_type == "tool_call":
lines.append(f"\n🔧 步骤 {i+1}: 调用工具")
lines.append(f" 工具: {step['tool_name']}")
lines.append(f" 参数: {json.dumps(step['args'], ensure_ascii=False)}")
status = "✅" if step.get("success") else "❌"
lines.append(f" 结果: {status} {str(step.get('result', ''))[:200]}")
elif step_type == "observation":
lines.append(f"\n👁️ 步骤 {i+1}: 观察")
lines.append(f" {step['content'][:300]}")
elif step_type == "final_answer":
lines.append(f"\n✅ 最终回答:")
lines.append(f" {step['content']}")
lines.append("\n" + "=" * 70)
return "\n".join(lines)
def find_anomalies(self, trace: list[dict]) -> list[dict]:
"""检测思维链中的异常"""
anomalies = []
for i, step in enumerate(trace):
# 检测循环
if i > 0 and step.get("type") == "tool_call":
for j in range(max(0, i-5), i):
if (trace[j].get("tool_name") == step.get("tool_name") and
trace[j].get("args") == step.get("args")):
anomalies.append({
"type": "loop_detected",
"step": i,
"detail": f"重复调用 {step['tool_name']},参数相同"
})
# 检测工具调用失败
if step.get("type") == "tool_call" and not step.get("success"):
anomalies.append({
"type": "tool_failure",
"step": i,
"detail": f"{step['tool_name']} 失败: {step.get('error', '')}"
})
# 检测推理过长
if step.get("type") == "reasoning":
if len(step.get("content", "")) > 2000:
anomalies.append({
"type": "verbose_reasoning",
"step": i,
"detail": "推理内容过长,可能导致Token浪费"
})
return anomalies25.3.2 推理步骤检查
python
class StepInspector:
"""推理步骤检查器"""
def check_prompt_drift(self, trace: list[dict]) -> list[str]:
"""检查Prompt漂移——Agent是否偏离了原始目标"""
warnings = []
if not trace:
return warnings
original_goal = None
for step in trace:
if step.get("type") == "user_input":
original_goal = step["content"]
break
if not original_goal:
return warnings
# 检查后续步骤是否与原始目标相关
for i, step in enumerate(trace):
if step.get("type") == "tool_call":
tool_name = step["tool_name"]
# 如果工具调用与目标无关
if not self._is_relevant(tool_name, original_goal):
warnings.append(
f"步骤{i}: 工具 {tool_name} 可能与目标不相关"
)
return warnings
def _is_relevant(self, tool_name: str, goal: str) -> bool:
"""简单的相关性判断"""
goal_lower = goal.lower()
tool_lower = tool_name.lower()
# 关键词匹配
keywords_map = {
"search": ["搜索", "查找", "查询", "search", "find"],
"calculate": ["计算", "统计", "分析", "calculate"],
"database": ["数据库", "记录", "数据", "database"],
"email": ["邮件", "发送", "通知", "email", "send"],
}
for key, keywords in keywords_map.items():
if key in tool_lower:
if any(kw in goal_lower for kw in keywords):
return True
return True # 默认相关25.4 工具调用调试
25.4.1 工具执行追踪
python
class ToolCallTracer:
"""工具调用追踪器"""
def __init__(self):
self._traces: list[dict] = []
def trace_call(self, tool_name: str, args: dict,
result: Any, duration_ms: float,
success: bool, error: str = None):
self._traces.append({
"tool_name": tool_name,
"args": args,
"result_preview": str(result)[:200] if result else None,
"duration_ms": duration_ms,
"success": success,
"error": error,
"timestamp": datetime.now().isoformat(),
})
def get_failed_calls(self) -> list[dict]:
return [t for t in self._traces if not t["success"]]
def get_slow_calls(self, threshold_ms: float = 2000) -> list[dict]:
return [t for t in self._traces if t["duration_ms"] > threshold_ms]
def get_summary(self) -> dict:
if not self._traces:
return {"total_calls": 0}
return {
"total_calls": len(self._traces),
"success_rate": sum(1 for t in self._traces if t["success"]) / len(self._traces),
"avg_duration_ms": sum(t["duration_ms"] for t in self._traces) / len(self._traces),
"failed_calls": self.get_failed_calls(),
"slow_calls": self.get_slow_calls(),
"tools_called": list(set(t["tool_name"] for t in self._traces)),
}25.4.2 参数验证
python
from pydantic import BaseModel, ValidationError
class ToolParameterValidator:
"""工具参数验证器"""
def __init__(self, tool_schemas: dict[str, type[BaseModel]]):
self.schemas = tool_schemas
def validate(self, tool_name: str, args: dict) -> tuple[bool, str]:
"""验证工具参数"""
if tool_name not in self.schemas:
return True, "" # 未注册的工具跳过验证
schema = self.schemas[tool_name]
try:
schema(**args)
return True, ""
except ValidationError as e:
return False, self._format_error(e)
def _format_error(self, error: ValidationError) -> str:
errors = []
for err in error.errors():
field = ".".join(str(loc) for loc in err["loc"])
errors.append(f" {field}: {err['msg']}")
return "参数验证失败:\n" + "\n".join(errors)
# 示例:定义工具参数Schema
class SearchArgs(BaseModel):
query: str
max_results: int = 5
language: str = "zh"
validator = ToolParameterValidator({"search": SearchArgs})
valid, error_msg = validator.validate("search", {"query": "test"})25.4.3 副作用回放
python
class SideEffectRecorder:
"""副作用记录器"""
def __init__(self):
self._recordings: list[dict] = []
self._replaying = False
self._replay_index = 0
def start_recording(self):
self._recordings = []
self._replaying = False
def start_replaying(self):
self._replaying = True
self._replay_index = 0
async def execute_with_recording(self, func, *args, **kwargs):
if self._replaying:
# 回放模式:返回记录的结果
recording = self._recordings[self._replay_index]
self._replay_index += 1
return recording["result"]
# 记录模式:执行并记录
result = await func(*args, **kwargs)
self._recordings.append({
"func": func.__name__,
"args": str(args),
"kwargs": str(kwargs),
"result": result,
})
return result25.5 Prompt调试
25.5.1 A/B测试
python
class PromptABTest:
"""Prompt A/B测试"""
async def run_test(self, prompt_a: str, prompt_b: str,
test_cases: list[dict],
evaluator) -> dict:
"""运行A/B测试"""
results_a = []
results_b = []
for case in test_cases:
input_data = case["input"]
expected = case["expected"]
# 测试Prompt A
output_a = await self._run_with_prompt(prompt_a, input_data)
score_a = await evaluator.evaluate(expected, output_a)
results_a.append(score_a)
# 测试Prompt B
output_b = await self._run_with_prompt(prompt_b, input_data)
score_b = await evaluator.evaluate(expected, output_b)
results_b.append(score_b)
return {
"prompt_a": {
"avg_score": sum(results_a) / len(results_a),
"scores": results_a,
},
"prompt_b": {
"avg_score": sum(results_b) / len(results_b),
"scores": results_b,
},
"winner": "A" if sum(results_a) > sum(results_b) else "B",
"improvement": (
(sum(results_b) - sum(results_a)) / sum(results_a) * 100
)
}25.5.2 Prompt版本管理
python
class PromptVersionManager:
"""Prompt版本管理"""
def __init__(self, storage_path: str = "prompts/"):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
def save_version(self, prompt_name: str, version: str,
content: str, description: str = ""):
"""保存Prompt版本"""
version_data = {
"name": prompt_name,
"version": version,
"content": content,
"description": description,
"created_at": datetime.now().isoformat(),
"content_hash": hashlib.md5(content.encode()).hexdigest(),
}
filepath = os.path.join(
self.storage_path, f"{prompt_name}_v{version}.json"
)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(version_data, f, ensure_ascii=False, indent=2)
def load_version(self, prompt_name: str,
version: str) -> str:
"""加载Prompt版本"""
filepath = os.path.join(
self.storage_path, f"{prompt_name}_v{version}.json"
)
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
return data["content"]
def list_versions(self, prompt_name: str) -> list[dict]:
"""列出所有版本"""
versions = []
pattern = os.path.join(
self.storage_path, f"{prompt_name}_v*.json"
)
for filepath in glob.glob(pattern):
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
versions.append({
"version": data["version"],
"description": data["description"],
"created_at": data["created_at"],
"hash": data["content_hash"],
})
return sorted(versions, key=lambda x: x["version"])
def diff_versions(self, prompt_name: str,
v1: str, v2: str) -> str:
"""对比两个版本"""
c1 = self.load_version(prompt_name, v1)
c2 = self.load_version(prompt_name, v2)
import difflib
diff = difflib.unified_diff(
c1.splitlines(), c2.splitlines(),
fromfile=f"v{v1}", tofile=f"v{v2}", lineterm=""
)
return "\n".join(diff)25.6 常见Agent故障诊断
25.6.1 循环推理
python
class LoopDetector:
"""循环推理检测器"""
def __init__(self, max_repeated_steps: int = 3):
self.max_repeated = max_repeated_steps
def detect(self, trace: list[dict]) -> dict | None:
"""检测循环推理"""
if len(trace) < self.max_repeated * 2:
return None
# 检查最近N步是否有重复模式
recent = trace[-self.max_repeated * 2:]
for pattern_len in range(1, self.max_repeated + 1):
pattern = recent[:pattern_len]
repetitions = 0
for i in range(0, len(recent), pattern_len):
segment = recent[i:i + pattern_len]
if self._steps_match(pattern, segment):
repetitions += 1
else:
break
if repetitions >= self.max_repeated:
return {
"type": "loop",
"pattern_length": pattern_len,
"repetitions": repetitions,
"pattern": [
s.get("tool_name", s.get("type", "?"))
for s in pattern
],
"suggestion": "检查工具返回值是否有变化,或添加终止条件"
}
return None
def _steps_match(self, a: list[dict], b: list[dict]) -> bool:
if len(a) != len(b):
return False
for sa, sb in zip(a, b):
if sa.get("tool_name") != sb.get("tool_name"):
return False
if sa.get("args") != sb.get("args"):
return False
return True25.6.2 工具调用失败
python
class ToolFailureDiagnoser:
"""工具故障诊断"""
COMMON_FAILURES = {
"timeout": {
"symptoms": ["TimeoutError", "timed out", "连接超时"],
"solutions": [
"增加超时时间",
"检查网络连接",
"添加重试机制",
"使用异步调用"
]
},
"auth_failure": {
"symptoms": ["401", "403", "Unauthorized", "Authentication"],
"solutions": [
"检查API Key是否过期",
"验证权限范围",
"检查Token刷新逻辑",
"确认账户状态"
]
},
"rate_limit": {
"symptoms": ["429", "Rate limit", "Too many requests"],
"solutions": [
"实现限流器",
"增加请求间隔",
"使用缓存减少调用",
"申请更高的速率限制"
]
},
"invalid_params": {
"symptoms": ["400", "Bad Request", "Invalid", "required"],
"solutions": [
"检查参数类型和格式",
"验证必需参数是否提供",
"检查枚举值是否正确",
"添加参数校验"
]
},
}
def diagnose(self, error: Exception) -> dict:
"""诊断工具失败原因"""
error_str = str(error)
for failure_type, info in self.COMMON_FAILURES.items():
for symptom in info["symptoms"]:
if symptom.lower() in error_str.lower():
return {
"type": failure_type,
"solutions": info["solutions"],
"error": error_str,
}
return {
"type": "unknown",
"solutions": ["查看详细错误日志", "检查API文档", "联系服务提供方"],
"error": error_str,
}25.6.3 上下文溢出
python
class ContextOverflowHandler:
"""上下文溢出处理器"""
def __init__(self, max_tokens: int = 128000):
self.max_tokens = max_tokens
def check_and_fix(self, messages: list[dict]) -> tuple[list[dict], str]:
"""检查并修复上下文溢出"""
total = sum(self._estimate_tokens(m["content"]) for m in messages)
if total <= self.max_tokens:
return messages, "ok"
# 修复策略
messages = self._emergency_trim(messages)
new_total = sum(self._estimate_tokens(m["content"]) for m in messages)
return messages, (
f"context_overflow: {total} → {new_total} tokens "
f"(trimmed {total - new_total} tokens)"
)
def _emergency_trim(self, messages: list[dict]) -> list[dict]:
"""紧急裁剪:保留system + 最近3轮"""
system = [m for m in messages if m["role"] == "system"]
non_system = [m for m in messages if m["role"] != "system"]
kept = non_system[-6:] if len(non_system) > 6 else non_system
return system + kept
def _estimate_tokens(self, text: str) -> int:
return len(text) // 325.7 日志分析与问题定位
25.7.1 日志关联分析
python
class LogCorrelator:
"""日志关联分析器"""
def find_root_cause(self, trace_id: str,
logs: list[dict]) -> dict:
"""通过日志关联找到根本原因"""
trace_logs = [l for l in logs if l.get("trace_id") == trace_id]
# 时间线排序
trace_logs.sort(key=lambda l: l.get("timestamp", ""))
# 查找第一个错误
first_error = None
for log in trace_logs:
if log.get("level") in ("ERROR", "WARNING"):
first_error = log
break
# 查找最慢的步骤
slowest = max(
(l for l in trace_logs if l.get("duration_ms")),
key=lambda l: l.get("duration_ms", 0),
default=None
)
# 查找Token消耗最大的步骤
most_tokens = max(
(l for l in trace_logs if l.get("tokens")),
key=lambda l: l.get("tokens", 0),
default=None
)
return {
"trace_id": trace_id,
"total_steps": len(trace_logs),
"first_error": first_error,
"slowest_step": slowest,
"most_tokens_step": most_tokens,
"root_cause_hypothesis": self._hypothesize(
first_error, slowest, most_tokens
),
}
def _hypothesize(self, error, slowest, most_tokens) -> str:
if error:
return f"首个错误: {error.get('message', 'unknown')}"
if slowest and slowest.get("duration_ms", 0) > 5000:
return f"性能瓶颈: {slowest.get('type', 'unknown')} 耗时 {slowest.get('duration_ms')}ms"
return "未发现明显异常"25.8 Agent调试工具生态
25.8.1 主流工具对比
| 工具 | 类型 | 核心功能 | 适用场景 | 开源 |
|---|---|---|---|---|
| LangSmith | 云服务 | 追踪、评估、调试 | LangChain生态 | ❌ |
| Weave | 开源 | 追踪、评估、实验管理 | 通用LLM应用 | ✅ |
| PromptFoo | 开源 | Prompt测试、评估、红队 | Prompt工程 | ✅ |
| Arize Phoenix | 开源 | 追踪、评估、可观测 | 生产监控 | ✅ |
| Langfuse | 开源 | 追踪、Prompt管理、评估 | 团队协作 | ✅ |
| Helicone | 开源 | 缓存、日志、成本追踪 | 成本优化 | ✅ |
25.8.2 Langfuse集成示例
python
class LangfuseDebugger:
"""Langfuse调试集成"""
def __init__(self, public_key: str, secret_key: str,
host: str = "https://cloud.langfuse.com"):
from langfuse import Langfuse
self.langfuse = Langfuse(public_key, secret_key, host)
def trace_agent(self, trace_id: str, agent_name: str,
input_data: str, output_data: str,
metadata: dict = None):
"""记录Agent执行追踪"""
self.langfuse.trace(
id=trace_id,
name=f"{agent_name}_execution",
input=input_data,
output=output_data,
metadata=metadata or {},
)
def log_llm_call(self, trace_id: str, span_id: str,
model: str, prompt: str, completion: str,
usage: dict):
"""记录LLM调用"""
self.langfuse.generation(
trace_id=trace_id,
id=span_id,
model=model,
input=prompt,
output=completion,
usage=usage,
)25.9 生产环境故障排查
25.9.1 实时监控仪表盘
python
class AgentHealthDashboard:
"""Agent健康状态仪表盘"""
def __init__(self, metrics_collector):
self.metrics = metrics_collector
def get_health_status(self) -> dict:
"""获取健康状态"""
summary = self.metrics.get_summary()
health = "healthy"
issues = []
# 检查错误率
error_rate = 1 - summary["requests"]["success_rate"] / 100
if error_rate > 0.1:
health = "unhealthy"
issues.append(f"错误率过高: {error_rate:.1%}")
elif error_rate > 0.05:
health = "degraded"
issues.append(f"错误率偏高: {error_rate:.1%}")
# 检查延迟
p95 = summary["latency_ms"]["e2e_p95"]
if p95 > 30000:
health = "unhealthy"
issues.append(f"P95延迟过高: {p95:.0f}ms")
elif p95 > 15000:
health = "degraded"
issues.append(f"P95延迟偏高: {p95:.0f}ms")
return {
"status": health,
"issues": issues,
"metrics": summary,
}25.9.2 故障注入测试
python
class FaultInjector:
"""故障注入器:主动测试Agent的容错能力"""
def __init__(self, agent):
self.agent = agent
self._fault_config: dict = {}
def inject_delay(self, tool_name: str, delay_ms: float):
"""注入延迟"""
self._fault_config[f"delay_{tool_name}"] = delay_ms
def inject_error(self, tool_name: str, error_msg: str):
"""注入错误"""
self._fault_config[f"error_{tool_name}"] = error_msg
def inject_timeout(self, tool_name: str):
"""注入超时"""
self._fault_config[f"timeout_{tool_name}"] = True
async def run_with_faults(self, task: str) -> dict:
"""在故障条件下执行"""
start = time.time()
try:
result = await self.agent.run(task)
return {
"success": True,
"result": result,
"duration_ms": (time.time() - start) * 1000,
"faults_applied": self._fault_config,
}
except Exception as e:
return {
"success": False,
"error": str(e),
"duration_ms": (time.time() - start) * 1000,
"faults_applied": self._fault_config,
}最佳实践
- 结构化追踪:从第一天起就记录完整的执行追踪(LLM I/O、工具调用、决策)
- Mock优先:本地调试时用Mock LLM替代真实调用,快速迭代
- 版本化Prompt:每次修改Prompt都保存版本,方便回滚和A/B测试
- 设置护栏:最大步数、超时、成本上限,防止Agent失控
- 故障注入:主动注入故障,验证Agent的容错能力
常见陷阱
- 只在线上调试:在生产环境直接调试是危险的。搭建本地调试环境
- 忽略temperature:调试时忘记设temperature=0,导致结果不可复现
- 日志过于详细:记录了完整Prompt导致日志存储成本巨大。脱敏+截断
- 不保存执行记录:出了问题没有回放数据。始终保存追踪记录
- 忽略间歇性故障:只在持续故障时才排查。间歇性故障需要长时间监控
小结
Agent 调试是一个兼具技术和耐心的过程。通过 Mock LLM、执行回放、思维链可视化、工具调用追踪、Prompt 版本管理等工具和方法,我们可以有效定位 Agent 系统中的各种问题。关键原则是:可观测性先行——没有日志和追踪,调试就是盲人摸象。
延伸阅读
- LangSmith文档: https://docs.smith.langchain.com/
- Langfuse文档: https://langfuse.com/docs
- PromptFoo: https://promptfoo.dev/ — Prompt测试框架
- Weave: https://wandb.ai/weave — LLM追踪工具
- 论文: "Debugging LLM-as-a-Judge" — 评估方法的调试验证