Building Agent Observability: From Distributed Tracing to Automated Evaluation
A systematic guide to the three pillars of agent observability — distributed tracing, metrics monitoring, and automated evaluation — for building production-grade agent monitoring.
Building Agent Observability: From Distributed Tracing to Automated Evaluation
The first question after deploying an agent is not "does it work?" but "what did it just do, why did it do that, and how well did it do it?" An agent system without observability is an un-debuggable, un-optimizable, un-trustable black box. This article breaks down the three pillars of agent observability with implementation code you can build from scratch.
Agent Observability Is Not Traditional Application Monitoring
Traditional application monitoring focuses on "did the request succeed, what is the latency, what is the error rate." Agent systems need to answer harder questions:
- Distributed tracing: Which decision nodes did the agent pass through? What were the inputs and outputs of each tool call? What does the LLM reasoning chain look like?
- Metrics monitoring: What is the agent's task success rate? How many tokens does each task consume? What is the tool call failure rate?
- Automated evaluation: How good is the agent's output quality? Is it better or worse than last week? Which task types perform worst?
These three dimensions answer "what happened," "what is the trend," and "how is the quality" respectively.
Pillar 1: Structured Distributed Tracing
Tracing is the foundation of agent observability. Not simple logging, but capturing the complete decision chain.
Design Pattern: The Span Tree
Each agent task is a span tree:
TaskSpan (root)
├── LLMCallSpan (model call)
│ ├── input_tokens: 1250
│ ├── output_tokens: 380
│ ├── model: gpt-4o
│ └── latency_ms: 2300
├── ToolCallSpan (tool call)
│ ├── tool: search_docs
│ ├── input: {"query": "RAG best practices"}
│ ├── output: {"results": [...]}
│ └── latency_ms: 450
└── DecisionSpan (decision node)
├── reasoning: "Need more information to complete the answer"
└── action: "Call search_docs tool"
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
import json
@dataclass
class Span:
trace_id: str
span_id: str
parent_id: str | None
name: str
kind: str # "llm", "tool", "decision", "task"
start_time: datetime
end_time: datetime | None = None
attributes: dict[str, Any] = field(default_factory=dict)
status: str = "ok" # "ok", "error", "timeout"
@property
def duration_ms(self) -> float:
if not self.end_time:
return 0.0
return (self.end_time - self.start_time).total_seconds() * 1000
class AgentTracer:
def __init__(self):
self.traces: dict[str, list[Span]] = {}
self.active_spans: dict[str, Span] = {}
def start_trace(self, task_name: str) -> str:
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_id=None,
name=task_name,
kind="task",
start_time=datetime.now(),
)
self.traces[trace_id] = [span]
self.active_spans[span_id] = span
return trace_id
def start_span(self, trace_id: str, parent_id: str,
name: str, kind: str, **attrs) -> str:
span_id = str(uuid.uuid4())
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_id=parent_id,
name=name,
kind=kind,
start_time=datetime.now(),
attributes=attrs,
)
self.traces[trace_id].append(span)
self.active_spans[span_id] = span
return span_id
def end_span(self, span_id: str, status: str = "ok", **attrs):
span = self.active_spans.pop(span_id, None)
if span:
span.end_time = datetime.now()
span.status = status
span.attributes.update(attrs)
def get_trace(self, trace_id: str) -> list[Span]:
return self.traces.get(trace_id, [])
def export_trace(self, trace_id: str) -> str:
spans = self.get_trace(trace_id)
return json.dumps([
{
"span_id": s.span_id,
"parent_id": s.parent_id,
"name": s.name,
"kind": s.kind,
"duration_ms": s.duration_ms,
"status": s.status,
"attributes": s.attributes,
}
for s in spans
], default=str, indent=2)
Pillar 2: Metrics Monitoring
Tracing answers "what happened this time." Metrics answer "what is the trend."
from collections import defaultdict
from datetime import datetime, timedelta
@dataclass
class MetricPoint:
timestamp: datetime
value: float
tags: dict[str, str]
class AgentMetrics:
def __init__(self):
self.metrics: dict[str, list[MetricPoint]] = defaultdict(list)
def record(self, name: str, value: float, **tags):
point = MetricPoint(timestamp=datetime.now(), value=value, tags=tags)
self.metrics[name].append(point)
def aggregate(self, name: str, window_minutes: int = 60) -> dict:
cutoff = datetime.now() - timedelta(minutes=window_minutes)
points = [p for p in self.metrics.get(name, []) if p.timestamp > cutoff]
if not points:
return {"count": 0}
values = [p.value for p in points]
return {
"count": len(values),
"mean": sum(values) / len(values),
"p50": sorted(values)[len(values) // 2],
"p95": sorted(values)[int(len(values) * 0.95)] if len(values) >= 20 else max(values),
"min": min(values),
"max": max(values),
}
Key Metrics Dashboard
| Metric | Calculation | Alert Threshold |
|---|---|---|
| Task success rate | success=1 / total |
< 80% |
| P95 task latency | 95th percentile latency | > 30s |
| Avg token cost | Mean tokens per task | Week-over-week increase > 20% |
| Tool call failure rate | tool.error=1 / tool.total |
> 5% |
| Tool calls per task | Total tool calls / total tasks | > 15 (possible infinite loop) |
Pillar 3: Automated Evaluation
Metrics tell you the success rate, but not the quality. Automated evaluation closes this gap.
from abc import ABC, abstractmethod
class Evaluator(ABC):
@abstractmethod
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
pass
class RelevanceEvaluator(Evaluator):
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
input_words = set(input_text.lower().split())
output_words = set(output.lower().split())
overlap = len(input_words & output_words) / max(len(input_words), 1)
return {
"name": "relevance",
"score": min(overlap * 3, 1.0),
"passed": overlap > 0.15,
"detail": f"Keyword overlap: {overlap:.2%}",
}
class LengthEvaluator(Evaluator):
def __init__(self, min_words: int = 10, max_words: int = 500):
self.min_words = min_words
self.max_words = max_words
def evaluate(self, input_text: str, output: str,
context: str | None = None) -> dict:
word_count = len(output.split())
passed = self.min_words <= word_count <= self.max_words
return {
"name": "output_length",
"score": 1.0 if passed else 0.0,
"passed": passed,
"detail": f"Output length: {word_count} words (range: {self.min_words}-{self.max_words})",
}
class EvaluationPipeline:
def __init__(self, evaluators: list[Evaluator]):
self.evaluators = evaluators
def run(self, input_text: str, output: str,
context: str | None = None) -> dict:
results = [ev.evaluate(input_text, output, context) for ev in self.evaluators]
overall_passed = all(r["passed"] for r in results)
avg_score = sum(r["score"] for r in results) / len(results) if results else 0
return {"passed": overall_passed, "score": avg_score, "details": results}
Detecting Silent Degradation
The most dangerous problem in agent systems is not crashes (those are easy to spot) but silent degradation: the agent keeps running but output quality gradually declines.
class DegradationDetector:
def __init__(self, window_size: int = 50, threshold: float = 0.15):
self.window_size = window_size
self.threshold = threshold
self.recent_scores: list[float] = []
def record(self, score: float):
self.recent_scores.append(score)
if len(self.recent_scores) > self.window_size * 2:
self.recent_scores = self.recent_scores[-self.window_size * 2:]
def check(self) -> dict | None:
if len(self.recent_scores) < self.window_size:
return None
mid = len(self.recent_scores) // 2
recent = self.recent_scores[mid:]
previous = self.recent_scores[:mid]
recent_avg = sum(recent) / len(recent)
previous_avg = sum(previous) / len(previous)
if previous_avg == 0:
return None
drop = (previous_avg - recent_avg) / previous_avg
if drop > self.threshold:
return {
"alert": "quality_degradation",
"previous_avg": round(previous_avg, 3),
"recent_avg": round(recent_avg, 3),
"drop_pct": f"{drop:.1%}",
"message": f"Agent quality dropped {drop:.1%} over the last {self.window_size} tasks",
}
return None
Common Mistakes
Mistake 1: "Logs are enough, no structured tracing needed" Text logs help debug individual requests but cannot answer "what was the P95 latency last week" or "what is the tool call failure rate trend." Structured tracing makes every record a queryable, aggregatable data point.
Mistake 2: "Only monitor success rate and latency" Success rate and latency are necessary but insufficient. An agent might succeed 100% of the time while giving empty answers. Automated evaluation adds the quality dimension.
Mistake 3: "Evaluation equals writing unit tests" Unit tests verify deterministic behavior; agent output is probabilistic. Evaluation needs to handle "partially correct" cases with scores rather than pass/fail.
Summary
- Distributed tracing captures "what happened": use span trees to record complete decision chains including LLM calls, tool calls, and reasoning
- Metrics monitoring answers "what is the trend": track success rate, latency, cost, and tool call patterns
- Automated evaluation judges "how is the quality": rule-based and semantic evaluation pipelines cover dimensions that metrics cannot
- Silent degradation detection is the most critical alert: an agent not crashing does not mean nothing is wrong
- All three pillars working together form a complete observability stack: missing any one is a blind spot
Prepared by AgentList. Explore more agent observability projects in our directory.
Projects in this article
Arize Phoenix
9.6k ⭐Phoenix is an open-source observability and evaluation tool for LLM and agent applications, supporting online tracing and offline diagnosis.
AgentOps
5.5k ⭐AgentOps is an observability platform for AI agents, providing monitoring, debugging, and evaluation to help developers optimize agent performance.
Logfire
4.2k ⭐AI observability platform for production LLM and agent systems by the Pydantic team. Provides real-time monitoring, tracing, and debugging capabilities.
OpenInference
965 ⭐OpenTelemetry instrumentation for AI observability, providing standardized tracing, metrics collection, and span definitions for LLM inference processes to help developers monitor and debug AI agent systems.
Langfuse
27.0k ⭐Langfuse is an open-source observability platform for LLM applications, supporting tracing, evaluation, prompt versioning, and cost analytics.