Agent 可观测性体系构建:从链路追踪到自动评估

系统讲解 Agent 可观测性的三大支柱——链路追踪、指标监控和自动评估,帮你构建生产级 Agent 监控体系。

AgentList Team · 2026年4月21日
AI Agent可观测性链路追踪监控评估

Agent 可观测性体系构建:从链路追踪到自动评估

Agent 上线后的第一个问题不是"它能不能用",而是"它刚才做了什么,为什么那样做,做得怎么样"。没有可观测性的 Agent 系统,本质上是一个无法调试、无法优化、无法信任的黑箱。本文将拆解 Agent 可观测性的三大支柱,并给出从零构建的实战代码。

Agent 可观测性 ≠ 传统应用监控

传统应用的监控关注"请求是否成功、延迟多少、错误率多高"。Agent 系统需要回答更复杂的问题:

  • 链路追踪:Agent 经过了哪些决策节点?每次工具调用的输入输出是什么?LLM 的推理链路是怎样的?
  • 指标监控:Agent 的任务成功率是多少?每次任务花费多少 token?工具调用失败率多高?
  • 自动评估:Agent 的输出质量如何?相比上周是变好了还是变差了?哪些类型的任务表现最差?

这三个维度分别回答"发生了什么"、"趋势如何"和"质量怎样"。

支柱一:结构化链路追踪

链路追踪是 Agent 可观测性的基础。不是简单地记录日志,而是捕获完整的决策链路。

设计模式:Span 树

每个 Agent 任务是一棵 span 树:

TaskSpan (根节点)
├── LLMCallSpan (模型调用)
│   ├── input_tokens: 1250
│   ├── output_tokens: 380
│   ├── model: gpt-4o
│   └── latency_ms: 2300
├── ToolCallSpan (工具调用)
│   ├── tool: search_docs
│   ├── input: {"query": "RAG 最佳实践"}
│   ├── output: {"results": [...]}
│   └── latency_ms: 450
└── DecisionSpan (决策节点)
    ├── reasoning: "需要更多信息来完成回答"
    └── action: "调用 search_docs 工具"
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json

@dataclass
class Span:
    trace_id: str
    span_id: str
    parent_id: str | None
    name: str
    kind: str  # "llm", "tool", "decision", "task"
    start_time: datetime
    end_time: datetime | None = None
    attributes: dict[str, Any] = field(default_factory=dict)
    status: str = "ok"  # "ok", "error", "timeout"

    @property
    def duration_ms(self) -> float:
        if not self.end_time:
            return 0.0
        return (self.end_time - self.start_time).total_seconds() * 1000

class AgentTracer:
    def __init__(self):
        self.traces: dict[str, list[Span]] = {}
        self.active_spans: dict[str, Span] = {}

    def start_trace(self, task_name: str) -> str:
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=None,
            name=task_name,
            kind="task",
            start_time=datetime.now(),
        )
        self.traces[trace_id] = [span]
        self.active_spans[span_id] = span
        return trace_id

    def start_span(self, trace_id: str, parent_id: str,
                   name: str, kind: str, **attrs) -> str:
        span_id = str(uuid.uuid4())
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=parent_id,
            name=name,
            kind=kind,
            start_time=datetime.now(),
            attributes=attrs,
        )
        self.traces[trace_id].append(span)
        self.active_spans[span_id] = span
        return span_id

    def end_span(self, span_id: str, status: str = "ok", **attrs):
        span = self.active_spans.pop(span_id, None)
        if span:
            span.end_time = datetime.now()
            span.status = status
            span.attributes.update(attrs)

    def get_trace(self, trace_id: str) -> list[Span]:
        return self.traces.get(trace_id, [])

    def export_trace(self, trace_id: str) -> str:
        spans = self.get_trace(trace_id)
        return json.dumps([
            {
                "span_id": s.span_id,
                "parent_id": s.parent_id,
                "name": s.name,
                "kind": s.kind,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "attributes": s.attributes,
            }
            for s in spans
        ], default=str, indent=2)

使用示例

tracer = AgentTracer()

def run_agent_task(user_query: str):
    trace_id = tracer.start_trace("answer_user_query")
    root_span = tracer.traces[trace_id][0]

    try:
        # LLM 调用
        llm_span = tracer.start_span(
            trace_id, root_span.span_id, "llm_call", "llm",
            model="gpt-4o", input_preview=user_query[:100],
        )
        response = call_llm(user_query)
        tracer.end_span(llm_span, token_input=response.usage.input_tokens,
                        token_output=response.usage.output_tokens)

        # 工具调用(如果需要)
        if response.tool_calls:
            for tc in response.tool_calls:
                tool_span = tracer.start_span(
                    trace_id, root_span.span_id,
                    f"tool_{tc.name}", "tool", input=tc.arguments,
                )
                result = execute_tool(tc.name, tc.arguments)
                tracer.end_span(tool_span, output_preview=str(result)[:200])

        tracer.end_span(root_span.span_id, status="ok")
    except Exception as e:
        tracer.end_span(root_span.span_id, status="error", error=str(e))

    return tracer.export_trace(trace_id)

支柱二:指标监控

链路追踪回答"这次发生了什么",指标监控回答"趋势如何"。Agent 系统需要关注的核心指标:

from collections import defaultdict
from datetime import datetime, timedelta

@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    tags: dict[str, str]

class AgentMetrics:
    def __init__(self):
        self.metrics: dict[str, list[MetricPoint]] = defaultdict(list)

    def record(self, name: str, value: float, **tags):
        point = MetricPoint(
            timestamp=datetime.now(),
            value=value,
            tags=tags,
        )
        self.metrics[name].append(point)

    def aggregate(self, name: str, window_minutes: int = 60) -> dict:
        cutoff = datetime.now() - timedelta(minutes=window_minutes)
        points = [p for p in self.metrics.get(name, []) if p.timestamp > cutoff]
        if not points:
            return {"count": 0}

        values = [p.value for p in points]
        return {
            "count": len(values),
            "mean": sum(values) / len(values),
            "p50": sorted(values)[len(values) // 2],
            "p95": sorted(values)[int(len(values) * 0.95)] if len(values) >= 20 else max(values),
            "min": min(values),
            "max": max(values),
        }

# 使用示例
metrics = AgentMetrics()

# 每次任务完成后记录
metrics.record("task.duration_ms", duration_ms, task_type="qa", agent="research")
metrics.record("task.token_cost", token_count, model="gpt-4o")
metrics.record("task.success", 1.0 if success else 0.0, task_type="qa")
metrics.record("tool.call_duration_ms", tool_duration, tool_name="search_docs")
metrics.record("tool.error", 1.0 if error else 0.0, tool_name="search_docs")

关键指标看板

指标 计算方式 告警阈值建议
任务成功率 success=1 / total < 80%
P95 任务延迟 第 95 百分位延迟 > 30s
平均 token 成本 每任务 token 消耗均值 周环比增长 > 20%
工具调用失败率 tool.error=1 / tool.total > 5%
每任务工具调用次数 工具调用总数 / 任务总数 > 15 次(可能死循环)

支柱三:自动评估

指标告诉"成功率多少",但不知道"质量如何"。自动评估是回答这个问题的关键。

评估模式:基于规则 + 基于 LLM

from abc import ABC, abstractmethod

class Evaluator(ABC):
    @abstractmethod
    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        pass

class RelevanceEvaluator(Evaluator):
    """检查输出是否与输入相关(基于关键词重叠的快速评估)"""
    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        input_words = set(input_text.lower().split())
        output_words = set(output.lower().split())
        overlap = len(input_words & output_words) / max(len(input_words), 1)

        return {
            "name": "relevance",
            "score": min(overlap * 3, 1.0),  # 归一化到 0-1
            "passed": overlap > 0.15,
            "detail": f"关键词重叠率: {overlap:.2%}",
        }

class LengthEvaluator(Evaluator):
    """检查输出长度是否合理"""
    def __init__(self, min_words: int = 10, max_words: int = 500):
        self.min_words = min_words
        self.max_words = max_words

    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        word_count = len(output.split())
        passed = self.min_words <= word_count <= self.max_words
        return {
            "name": "output_length",
            "score": 1.0 if passed else 0.0,
            "passed": passed,
            "detail": f"输出长度: {word_count} 词 (范围: {self.min_words}-{self.max_words})",
        }

class ToolUsageEvaluator(Evaluator):
    """检查工具调用是否合理——是否调用了不需要的工具"""
    def __init__(self, allowed_tools: set[str], forbidden_tools: set[str]):
        self.allowed_tools = allowed_tools
        self.forbidden_tools = forbidden_tools

    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        # context 中应包含工具调用记录
        tools_used = set()
        if context:
            for line in context.split("\n"):
                if line.startswith("tool:"):
                    tools_used.add(line.split(":")[1].strip())

        forbidden_used = tools_used & self.forbidden_tools
        return {
            "name": "tool_usage",
            "score": 0.0 if forbidden_used else 1.0,
            "passed": len(forbidden_used) == 0,
            "detail": f"使用的工具: {tools_used}, 违规工具: {forbidden_used}",
        }

class EvaluationPipeline:
    def __init__(self, evaluators: list[Evaluator]):
        self.evaluators = evaluators

    def run(self, input_text: str, output: str,
            context: str | None = None) -> dict:
        results = []
        for ev in self.evaluators:
            result = ev.evaluate(input_text, output, context)
            results.append(result)

        overall_passed = all(r["passed"] for r in results)
        avg_score = sum(r["score"] for r in results) / len(results) if results else 0

        return {
            "passed": overall_passed,
            "score": avg_score,
            "details": results,
        }

使用示例

pipeline = EvaluationPipeline([
    RelevanceEvaluator(),
    LengthEvaluator(min_words=20, max_words=300),
    ToolUsageEvaluator(
        allowed_tools={"search_docs", "read_file"},
        forbidden_tools={"delete_file", "execute_sql"},
    ),
])

# 每次任务完成后运行评估
eval_result = pipeline.run(
    input_text=user_query,
    output=agent_response,
    context=tool_call_log,
)

if not eval_result["passed"]:
    print(f"评估未通过: {[d['detail'] for d in eval_result['details'] if not d['passed']]}")

告警策略:检测静默退化

Agent 系统最危险的问题不是崩溃(崩溃容易发现),而是静默退化:Agent 仍在运行,但输出质量在逐渐下降。

class DegradationDetector:
    def __init__(self, window_size: int = 50, threshold: float = 0.15):
        self.window_size = window_size
        self.threshold = threshold  # 15% 的下降触发告警
        self.recent_scores: list[float] = []

    def record(self, score: float):
        self.recent_scores.append(score)
        if len(self.recent_scores) > self.window_size * 2:
            self.recent_scores = self.recent_scores[-self.window_size * 2:]

    def check(self) -> dict | None:
        if len(self.recent_scores) < self.window_size:
            return None

        mid = len(self.recent_scores) // 2
        recent = self.recent_scores[mid:]
        previous = self.recent_scores[:mid]

        recent_avg = sum(recent) / len(recent)
        previous_avg = sum(previous) / len(previous)

        if previous_avg == 0:
            return None

        drop = (previous_avg - recent_avg) / previous_avg
        if drop > self.threshold:
            return {
                "alert": "quality_degradation",
                "previous_avg": round(previous_avg, 3),
                "recent_avg": round(recent_avg, 3),
                "drop_pct": f"{drop:.1%}",
                "message": f"Agent 质量在过去 {self.window_size} 次任务中下降了 {drop:.1%}",
            }
        return None

常见误区

误区一:"有日志就够了,不需要结构化追踪" 文本日志在 debug 单个请求时有用,但无法回答"过去一周 P95 延迟是多少"或"工具调用失败率趋势如何"。结构化追踪让每条记录都是可查询、可聚合的数据点。

误区二:"只监控成功率和延迟" 成功率和延迟是必要但不充分的指标。一个 Agent 可能 100% 成功但给出空洞的回答。自动评估补充了质量维度,让你知道 Agent 不仅"完成了",还"做好了"。

误区三:"评估等于写单元测试" 单元测试验证确定性行为,Agent 的输出是概率性的。评估需要处理"部分正确"的情况,用评分而非 pass/fail 来衡量质量。

总结

  • 链路追踪捕获"发生了什么":用 span 树记录完整的决策链路,包括 LLM 调用、工具调用和推理过程
  • 指标监控回答"趋势如何":关注成功率、延迟、成本和工具调用模式
  • 自动评估判断"质量怎样":基于规则和语义的评估管线补充了指标无法覆盖的质量维度
  • 静默退化检测是最关键的告警:Agent 不崩溃不代表没出问题
  • 三者协同才构成完整的可观测性体系:缺少任何一块都是盲区

本文由 AgentList 团队整理,更多 Agent 可观测性相关项目请浏览本站项目列表。