Building Agent Observability: From Distributed Tracing to Automated Evaluation

A systematic guide to the three pillars of agent observability — distributed tracing, metrics monitoring, and automated evaluation — for building production-grade agent monitoring.

AgentList Team · April 21, 2026
AI Agent可观测性链路追踪监控评估

Building Agent Observability: From Distributed Tracing to Automated Evaluation

The first question after deploying an agent is not "does it work?" but "what did it just do, why did it do that, and how well did it do it?" An agent system without observability is an un-debuggable, un-optimizable, un-trustable black box. This article breaks down the three pillars of agent observability with implementation code you can build from scratch.

Agent Observability Is Not Traditional Application Monitoring

Traditional application monitoring focuses on "did the request succeed, what is the latency, what is the error rate." Agent systems need to answer harder questions:

  • Distributed tracing: Which decision nodes did the agent pass through? What were the inputs and outputs of each tool call? What does the LLM reasoning chain look like?
  • Metrics monitoring: What is the agent's task success rate? How many tokens does each task consume? What is the tool call failure rate?
  • Automated evaluation: How good is the agent's output quality? Is it better or worse than last week? Which task types perform worst?

These three dimensions answer "what happened," "what is the trend," and "how is the quality" respectively.

Pillar 1: Structured Distributed Tracing

Tracing is the foundation of agent observability. Not simple logging, but capturing the complete decision chain.

Design Pattern: The Span Tree

Each agent task is a span tree:

TaskSpan (root)
├── LLMCallSpan (model call)
│   ├── input_tokens: 1250
│   ├── output_tokens: 380
│   ├── model: gpt-4o
│   └── latency_ms: 2300
├── ToolCallSpan (tool call)
│   ├── tool: search_docs
│   ├── input: {"query": "RAG best practices"}
│   ├── output: {"results": [...]}
│   └── latency_ms: 450
└── DecisionSpan (decision node)
    ├── reasoning: "Need more information to complete the answer"
    └── action: "Call search_docs tool"
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json

@dataclass
class Span:
    trace_id: str
    span_id: str
    parent_id: str | None
    name: str
    kind: str  # "llm", "tool", "decision", "task"
    start_time: datetime
    end_time: datetime | None = None
    attributes: dict[str, Any] = field(default_factory=dict)
    status: str = "ok"  # "ok", "error", "timeout"

    @property
    def duration_ms(self) -> float:
        if not self.end_time:
            return 0.0
        return (self.end_time - self.start_time).total_seconds() * 1000

class AgentTracer:
    def __init__(self):
        self.traces: dict[str, list[Span]] = {}
        self.active_spans: dict[str, Span] = {}

    def start_trace(self, task_name: str) -> str:
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=None,
            name=task_name,
            kind="task",
            start_time=datetime.now(),
        )
        self.traces[trace_id] = [span]
        self.active_spans[span_id] = span
        return trace_id

    def start_span(self, trace_id: str, parent_id: str,
                   name: str, kind: str, **attrs) -> str:
        span_id = str(uuid.uuid4())
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=parent_id,
            name=name,
            kind=kind,
            start_time=datetime.now(),
            attributes=attrs,
        )
        self.traces[trace_id].append(span)
        self.active_spans[span_id] = span
        return span_id

    def end_span(self, span_id: str, status: str = "ok", **attrs):
        span = self.active_spans.pop(span_id, None)
        if span:
            span.end_time = datetime.now()
            span.status = status
            span.attributes.update(attrs)

    def get_trace(self, trace_id: str) -> list[Span]:
        return self.traces.get(trace_id, [])

    def export_trace(self, trace_id: str) -> str:
        spans = self.get_trace(trace_id)
        return json.dumps([
            {
                "span_id": s.span_id,
                "parent_id": s.parent_id,
                "name": s.name,
                "kind": s.kind,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "attributes": s.attributes,
            }
            for s in spans
        ], default=str, indent=2)

Pillar 2: Metrics Monitoring

Tracing answers "what happened this time." Metrics answer "what is the trend."

from collections import defaultdict
from datetime import datetime, timedelta

@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    tags: dict[str, str]

class AgentMetrics:
    def __init__(self):
        self.metrics: dict[str, list[MetricPoint]] = defaultdict(list)

    def record(self, name: str, value: float, **tags):
        point = MetricPoint(timestamp=datetime.now(), value=value, tags=tags)
        self.metrics[name].append(point)

    def aggregate(self, name: str, window_minutes: int = 60) -> dict:
        cutoff = datetime.now() - timedelta(minutes=window_minutes)
        points = [p for p in self.metrics.get(name, []) if p.timestamp > cutoff]
        if not points:
            return {"count": 0}
        values = [p.value for p in points]
        return {
            "count": len(values),
            "mean": sum(values) / len(values),
            "p50": sorted(values)[len(values) // 2],
            "p95": sorted(values)[int(len(values) * 0.95)] if len(values) >= 20 else max(values),
            "min": min(values),
            "max": max(values),
        }

Key Metrics Dashboard

Metric Calculation Alert Threshold
Task success rate success=1 / total < 80%
P95 task latency 95th percentile latency > 30s
Avg token cost Mean tokens per task Week-over-week increase > 20%
Tool call failure rate tool.error=1 / tool.total > 5%
Tool calls per task Total tool calls / total tasks > 15 (possible infinite loop)

Pillar 3: Automated Evaluation

Metrics tell you the success rate, but not the quality. Automated evaluation closes this gap.

from abc import ABC, abstractmethod

class Evaluator(ABC):
    @abstractmethod
    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        pass

class RelevanceEvaluator(Evaluator):
    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        input_words = set(input_text.lower().split())
        output_words = set(output.lower().split())
        overlap = len(input_words & output_words) / max(len(input_words), 1)
        return {
            "name": "relevance",
            "score": min(overlap * 3, 1.0),
            "passed": overlap > 0.15,
            "detail": f"Keyword overlap: {overlap:.2%}",
        }

class LengthEvaluator(Evaluator):
    def __init__(self, min_words: int = 10, max_words: int = 500):
        self.min_words = min_words
        self.max_words = max_words

    def evaluate(self, input_text: str, output: str,
                 context: str | None = None) -> dict:
        word_count = len(output.split())
        passed = self.min_words <= word_count <= self.max_words
        return {
            "name": "output_length",
            "score": 1.0 if passed else 0.0,
            "passed": passed,
            "detail": f"Output length: {word_count} words (range: {self.min_words}-{self.max_words})",
        }

class EvaluationPipeline:
    def __init__(self, evaluators: list[Evaluator]):
        self.evaluators = evaluators

    def run(self, input_text: str, output: str,
            context: str | None = None) -> dict:
        results = [ev.evaluate(input_text, output, context) for ev in self.evaluators]
        overall_passed = all(r["passed"] for r in results)
        avg_score = sum(r["score"] for r in results) / len(results) if results else 0
        return {"passed": overall_passed, "score": avg_score, "details": results}

Detecting Silent Degradation

The most dangerous problem in agent systems is not crashes (those are easy to spot) but silent degradation: the agent keeps running but output quality gradually declines.

class DegradationDetector:
    def __init__(self, window_size: int = 50, threshold: float = 0.15):
        self.window_size = window_size
        self.threshold = threshold
        self.recent_scores: list[float] = []

    def record(self, score: float):
        self.recent_scores.append(score)
        if len(self.recent_scores) > self.window_size * 2:
            self.recent_scores = self.recent_scores[-self.window_size * 2:]

    def check(self) -> dict | None:
        if len(self.recent_scores) < self.window_size:
            return None
        mid = len(self.recent_scores) // 2
        recent = self.recent_scores[mid:]
        previous = self.recent_scores[:mid]
        recent_avg = sum(recent) / len(recent)
        previous_avg = sum(previous) / len(previous)
        if previous_avg == 0:
            return None
        drop = (previous_avg - recent_avg) / previous_avg
        if drop > self.threshold:
            return {
                "alert": "quality_degradation",
                "previous_avg": round(previous_avg, 3),
                "recent_avg": round(recent_avg, 3),
                "drop_pct": f"{drop:.1%}",
                "message": f"Agent quality dropped {drop:.1%} over the last {self.window_size} tasks",
            }
        return None

Common Mistakes

Mistake 1: "Logs are enough, no structured tracing needed" Text logs help debug individual requests but cannot answer "what was the P95 latency last week" or "what is the tool call failure rate trend." Structured tracing makes every record a queryable, aggregatable data point.

Mistake 2: "Only monitor success rate and latency" Success rate and latency are necessary but insufficient. An agent might succeed 100% of the time while giving empty answers. Automated evaluation adds the quality dimension.

Mistake 3: "Evaluation equals writing unit tests" Unit tests verify deterministic behavior; agent output is probabilistic. Evaluation needs to handle "partially correct" cases with scores rather than pass/fail.

Summary

  • Distributed tracing captures "what happened": use span trees to record complete decision chains including LLM calls, tool calls, and reasoning
  • Metrics monitoring answers "what is the trend": track success rate, latency, cost, and tool call patterns
  • Automated evaluation judges "how is the quality": rule-based and semantic evaluation pipelines cover dimensions that metrics cannot
  • Silent degradation detection is the most critical alert: an agent not crashing does not mean nothing is wrong
  • All three pillars working together form a complete observability stack: missing any one is a blind spot

Prepared by AgentList. Explore more agent observability projects in our directory.