Agent 可观测性体系构建:从链路追踪到自动评估
系统讲解 Agent 可观测性的三大支柱——链路追踪、指标监控和自动评估,帮你构建生产级 Agent 监控体系。
Agent 可观测性体系构建:从链路追踪到自动评估
Agent 上线后的第一个问题不是"它能不能用",而是"它刚才做了什么,为什么那样做,做得怎么样"。没有可观测性的 Agent 系统,本质上是一个无法调试、无法优化、无法信任的黑箱。本文将拆解 Agent 可观测性的三大支柱,并给出从零构建的实战代码。
Agent 可观测性 ≠ 传统应用监控
传统应用的监控关注"请求是否成功、延迟多少、错误率多高"。Agent 系统需要回答更复杂的问题:
- 链路追踪:Agent 经过了哪些决策节点?每次工具调用的输入输出是什么?LLM 的推理链路是怎样的?
- 指标监控:Agent 的任务成功率是多少?每次任务花费多少 token?工具调用失败率多高?
- 自动评估:Agent 的输出质量如何?相比上周是变好了还是变差了?哪些类型的任务表现最差?
这三个维度分别回答"发生了什么"、"趋势如何"和"质量怎样"。
支柱一:结构化链路追踪
链路追踪是 Agent 可观测性的基础。不是简单地记录日志,而是捕获完整的决策链路。
设计模式:Span 树
每个 Agent 任务是一棵 span 树:
TaskSpan (根节点)
├── LLMCallSpan (模型调用)
│ ├── input_tokens: 1250
│ ├── output_tokens: 380
│ ├── model: gpt-4o
│ └── latency_ms: 2300
├── ToolCallSpan (工具调用)
│ ├── tool: search_docs
│ ├── input: {"query": "RAG 最佳实践"}
│ ├── output: {"results": [...]}
│ └── latency_ms: 450
└── DecisionSpan (决策节点)
├── reasoning: "需要更多信息来完成回答"
└── action: "调用 search_docs 工具"
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json
@dataclass
class Span:
trace_id: str
span_id: str
parent_id: str | None
name: str
kind: str # "llm", "tool", "decision", "task"
start_time: datetime
end_time: datetime | None = None
attributes: dict[str, Any] = field(default_factory=dict)
status: str = "ok" # "ok", "error", "timeout"
@property
def duration_ms(self) -> float:
if not self.end_time:
return 0.0
return (self.end_time - self.start_time).total_seconds() * 1000
class AgentTracer:
def __init__(self):
self.traces: dict[str, list[Span]] = {}
self.active_spans: dict[str, Span] = {}
def start_trace(self, task_name: str) -> str:
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_id=None,
name=task_name,
kind="task",
start_time=datetime.now(),
)
self.traces[trace_id] = [span]
self.active_spans[span_id] = span
return trace_id
def start_span(self, trace_id: str, parent_id: str,
name: str, kind: str, **attrs) -> str:
span_id = str(uuid.uuid4())
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_id=parent_id,
name=name,
kind=kind,
start_time=datetime.now(),
attributes=attrs,
)
self.traces[trace_id].append(span)
self.active_spans[span_id] = span
return span_id
def end_span(self, span_id: str, status: str = "ok", **attrs):
span = self.active_spans.pop(span_id, None)
if span:
span.end_time = datetime.now()
span.status = status
span.attributes.update(attrs)
def get_trace(self, trace_id: str) -> list[Span]:
return self.traces.get(trace_id, [])
def export_trace(self, trace_id: str) -> str:
spans = self.get_trace(trace_id)
return json.dumps([
{
"span_id": s.span_id,
"parent_id": s.parent_id,
"name": s.name,
"kind": s.kind,
"duration_ms": s.duration_ms,
"status": s.status,
"attributes": s.attributes,
}
for s in spans
], default=str, indent=2)
使用示例
tracer = AgentTracer()
def run_agent_task(user_query: str):
trace_id = tracer.start_trace("answer_user_query")
root_span = tracer.traces[trace_id][0]
try:
# LLM 调用
llm_span = tracer.start_span(
trace_id, root_span.span_id, "llm_call", "llm",
model="gpt-4o", input_preview=user_query[:100],
)
response = call_llm(user_query)
tracer.end_span(llm_span, token_input=response.usage.input_tokens,
token_output=response.usage.output_tokens)
# 工具调用(如果需要)
if response.tool_calls:
for tc in response.tool_calls:
tool_span = tracer.start_span(
trace_id, root_span.span_id,
f"tool_{tc.name}", "tool", input=tc.arguments,
)
result = execute_tool(tc.name, tc.arguments)
tracer.end_span(tool_span, output_preview=str(result)[:200])
tracer.end_span(root_span.span_id, status="ok")
except Exception as e:
tracer.end_span(root_span.span_id, status="error", error=str(e))
return tracer.export_trace(trace_id)
支柱二:指标监控
链路追踪回答"这次发生了什么",指标监控回答"趋势如何"。Agent 系统需要关注的核心指标:
from collections import defaultdict
from datetime import datetime, timedelta
@dataclass
class MetricPoint:
timestamp: datetime
value: float
tags: dict[str, str]
class AgentMetrics:
def __init__(self):
self.metrics: dict[str, list[MetricPoint]] = defaultdict(list)
def record(self, name: str, value: float, **tags):
point = MetricPoint(
timestamp=datetime.now(),
value=value,
tags=tags,
)
self.metrics[name].append(point)
def aggregate(self, name: str, window_minutes: int = 60) -> dict:
cutoff = datetime.now() - timedelta(minutes=window_minutes)
points = [p for p in self.metrics.get(name, []) if p.timestamp > cutoff]
if not points:
return {"count": 0}
values = [p.value for p in points]
return {
"count": len(values),
"mean": sum(values) / len(values),
"p50": sorted(values)[len(values) // 2],
"p95": sorted(values)[int(len(values) * 0.95)] if len(values) >= 20 else max(values),
"min": min(values),
"max": max(values),
}
# 使用示例
metrics = AgentMetrics()
# 每次任务完成后记录
metrics.record("task.duration_ms", duration_ms, task_type="qa", agent="research")
metrics.record("task.token_cost", token_count, model="gpt-4o")
metrics.record("task.success", 1.0 if success else 0.0, task_type="qa")
metrics.record("tool.call_duration_ms", tool_duration, tool_name="search_docs")
metrics.record("tool.error", 1.0 if error else 0.0, tool_name="search_docs")
关键指标看板
| 指标 | 计算方式 | 告警阈值建议 |
|---|---|---|
| 任务成功率 | success=1 / total |
< 80% |
| P95 任务延迟 | 第 95 百分位延迟 | > 30s |
| 平均 token 成本 | 每任务 token 消耗均值 | 周环比增长 > 20% |
| 工具调用失败率 | tool.error=1 / tool.total |
> 5% |
| 每任务工具调用次数 | 工具调用总数 / 任务总数 | > 15 次(可能死循环) |
支柱三:自动评估
指标告诉"成功率多少",但不知道"质量如何"。自动评估是回答这个问题的关键。
评估模式:基于规则 + 基于 LLM
from abc import ABC, abstractmethod
class Evaluator(ABC):
@abstractmethod
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
pass
class RelevanceEvaluator(Evaluator):
"""检查输出是否与输入相关(基于关键词重叠的快速评估)"""
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
input_words = set(input_text.lower().split())
output_words = set(output.lower().split())
overlap = len(input_words & output_words) / max(len(input_words), 1)
return {
"name": "relevance",
"score": min(overlap * 3, 1.0), # 归一化到 0-1
"passed": overlap > 0.15,
"detail": f"关键词重叠率: {overlap:.2%}",
}
class LengthEvaluator(Evaluator):
"""检查输出长度是否合理"""
def __init__(self, min_words: int = 10, max_words: int = 500):
self.min_words = min_words
self.max_words = max_words
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
word_count = len(output.split())
passed = self.min_words <= word_count <= self.max_words
return {
"name": "output_length",
"score": 1.0 if passed else 0.0,
"passed": passed,
"detail": f"输出长度: {word_count} 词 (范围: {self.min_words}-{self.max_words})",
}
class ToolUsageEvaluator(Evaluator):
"""检查工具调用是否合理——是否调用了不需要的工具"""
def __init__(self, allowed_tools: set[str], forbidden_tools: set[str]):
self.allowed_tools = allowed_tools
self.forbidden_tools = forbidden_tools
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
# context 中应包含工具调用记录
tools_used = set()
if context:
for line in context.split("\n"):
if line.startswith("tool:"):
tools_used.add(line.split(":")[1].strip())
forbidden_used = tools_used & self.forbidden_tools
return {
"name": "tool_usage",
"score": 0.0 if forbidden_used else 1.0,
"passed": len(forbidden_used) == 0,
"detail": f"使用的工具: {tools_used}, 违规工具: {forbidden_used}",
}
class EvaluationPipeline:
def __init__(self, evaluators: list[Evaluator]):
self.evaluators = evaluators
def run(self, input_text: str, output: str,
context: str | None = None) -> dict:
results = []
for ev in self.evaluators:
result = ev.evaluate(input_text, output, context)
results.append(result)
overall_passed = all(r["passed"] for r in results)
avg_score = sum(r["score"] for r in results) / len(results) if results else 0
return {
"passed": overall_passed,
"score": avg_score,
"details": results,
}
使用示例
pipeline = EvaluationPipeline([
RelevanceEvaluator(),
LengthEvaluator(min_words=20, max_words=300),
ToolUsageEvaluator(
allowed_tools={"search_docs", "read_file"},
forbidden_tools={"delete_file", "execute_sql"},
),
])
# 每次任务完成后运行评估
eval_result = pipeline.run(
input_text=user_query,
output=agent_response,
context=tool_call_log,
)
if not eval_result["passed"]:
print(f"评估未通过: {[d['detail'] for d in eval_result['details'] if not d['passed']]}")
告警策略:检测静默退化
Agent 系统最危险的问题不是崩溃(崩溃容易发现),而是静默退化:Agent 仍在运行,但输出质量在逐渐下降。
class DegradationDetector:
def __init__(self, window_size: int = 50, threshold: float = 0.15):
self.window_size = window_size
self.threshold = threshold # 15% 的下降触发告警
self.recent_scores: list[float] = []
def record(self, score: float):
self.recent_scores.append(score)
if len(self.recent_scores) > self.window_size * 2:
self.recent_scores = self.recent_scores[-self.window_size * 2:]
def check(self) -> dict | None:
if len(self.recent_scores) < self.window_size:
return None
mid = len(self.recent_scores) // 2
recent = self.recent_scores[mid:]
previous = self.recent_scores[:mid]
recent_avg = sum(recent) / len(recent)
previous_avg = sum(previous) / len(previous)
if previous_avg == 0:
return None
drop = (previous_avg - recent_avg) / previous_avg
if drop > self.threshold:
return {
"alert": "quality_degradation",
"previous_avg": round(previous_avg, 3),
"recent_avg": round(recent_avg, 3),
"drop_pct": f"{drop:.1%}",
"message": f"Agent 质量在过去 {self.window_size} 次任务中下降了 {drop:.1%}",
}
return None
常见误区
误区一:"有日志就够了,不需要结构化追踪" 文本日志在 debug 单个请求时有用,但无法回答"过去一周 P95 延迟是多少"或"工具调用失败率趋势如何"。结构化追踪让每条记录都是可查询、可聚合的数据点。
误区二:"只监控成功率和延迟" 成功率和延迟是必要但不充分的指标。一个 Agent 可能 100% 成功但给出空洞的回答。自动评估补充了质量维度,让你知道 Agent 不仅"完成了",还"做好了"。
误区三:"评估等于写单元测试" 单元测试验证确定性行为,Agent 的输出是概率性的。评估需要处理"部分正确"的情况,用评分而非 pass/fail 来衡量质量。
总结
- 链路追踪捕获"发生了什么":用 span 树记录完整的决策链路,包括 LLM 调用、工具调用和推理过程
- 指标监控回答"趋势如何":关注成功率、延迟、成本和工具调用模式
- 自动评估判断"质量怎样":基于规则和语义的评估管线补充了指标无法覆盖的质量维度
- 静默退化检测是最关键的告警:Agent 不崩溃不代表没出问题
- 三者协同才构成完整的可观测性体系:缺少任何一块都是盲区
本文由 AgentList 团队整理,更多 Agent 可观测性相关项目请浏览本站项目列表。
本文涉及的项目
Arize Phoenix
9.6k ⭐Phoenix 是面向 LLM 与 Agent 应用的开源观测与评估工具,支持在线追踪与离线诊断。
AgentOps
5.5k ⭐AgentOps 是一个 AI Agent 可观测性平台,提供 Agent 监控、调试和评估功能,帮助开发者优化 Agent 性能。
Logfire
4.2k ⭐面向生产环境LLM和Agent系统的AI可观测性平台,由Pydantic团队打造,提供实时监控、追踪和调试能力。
OpenInference
965 ⭐OpenInference 是一个基于 OpenTelemetry 的 AI 可观测性检测规范和工具包,为 LLM 应用的推理过程提供标准化追踪、指标采集和 Span 定义,帮助开发者监控和调试 AI Agent 系统。
Langfuse
27.0k ⭐Langfuse 是开源 LLM 可观测性平台,支持 trace、评估、提示词版本管理与成本分析。