Agent 工具调用容错:超时、重试、熔断、降级与幂等
系统梳理 Agent 工具调用的 7 大容错模式:超时分级、指数退避 + 抖动、熔断器、备用 Provider 链、可恢复错误分类、结构化校验、幂等键设计,让 Agent 在不稳定的真实环境中保持稳定输出。
Agent 系统的可靠性瓶颈几乎都集中在工具调用上:HTTP 超时、429 限流、5xx 错误、Schema 解析失败、部分成功——这些边缘情况决定了 Agent 是"勉强能用"还是"生产可用"。本文从工程实战角度系统梳理工具调用的容错模式:超时分级、重试退避、熔断降级、幂等设计、可恢复错误分类,让 Agent 在不稳定的真实环境中保持稳定输出。
为什么工具调用是 Agent 的最大可靠性瓶颈
LLM 本身是无状态的推理服务,可用性通常 99.9% 以上。但 Agent 调用的工具——搜索 API、数据库、邮件服务、第三方 SaaS——每多一个依赖,失败率就指数级叠加。一次涉及 10 个工具调用的 Agent 任务,即使每个工具的可用性是 99.5%,端到端的成功率也只有 95.1%。
更糟糕的是工具失败的"长尾"现象:
- 偶发超时:5% 的请求 5xx,但 P99 延迟 30 秒
- 间歇限流:每天 9 点到 10 点集中 429,其他时段正常
- 半成功:订单创建成功但扣款失败,状态不一致
- Schema 漂移:第三方 API 升级后字段类型变化,Agent 解析崩溃
- 网络分区:DNS 故障、TLS 握手失败、CDN 节点宕机
这些错误模式不能简单靠"重试一次"解决。需要建立一套分层容错架构。
容错模式 1:超时分级(Timeout Hierarchy)
最常见的错误是"没有设置超时"或者"超时设置得过长"。Agent 工具调用必须分级设置超时:
from dataclasses import dataclass
from enum import Enum
class ToolCriticality(Enum):
BLOCKING = "blocking" # 必须等待,直接影响主流程
ENHANCING = "enhancing" # 增强体验,失败可降级
OPTIONAL = "optional" # 完全可选,失败忽略
TIMEOUT_CONFIG = {
ToolCriticality.BLOCKING: {
"connect_timeout": 2.0, # TCP 连接建立
"read_timeout": 10.0, # 首次响应
"total_timeout": 30.0, # 整个调用
},
ToolCriticality.ENHANCING: {
"connect_timeout": 1.0,
"read_timeout": 5.0,
"total_timeout": 15.0,
},
ToolCriticality.OPTIONAL: {
"connect_timeout": 1.0,
"read_timeout": 3.0,
"total_timeout": 5.0,
},
}
async def call_with_timeout(tool_name, criticality, *args, **kwargs):
cfg = TIMEOUT_CONFIG[criticality]
return await asyncio.wait_for(
tool_registry[tool_name](*args, **kwargs),
timeout=cfg["total_timeout"]
)
分级原则:
- BLOCKING(如订单查询、支付、核心业务 API):宽裕超时,重试 + 降级到备用 Provider
- ENHANCING(如推荐、个性化、上下文增强):中等超时,失败用兜底数据
- OPTIONAL(如分析埋点、用户行为追踪):极短超时,失败直接吞掉
容错模式 2:指数退避 + 抖动(Exponential Backoff with Jitter)
重试不能简单循环,需要指数退避和随机抖动来避免"thundering herd":
import random
import asyncio
from typing import Callable, Awaitable, TypeVar
T = TypeVar("T")
class RetryConfig:
def __init__(
self,
max_attempts: int = 3,
initial_delay: float = 0.5,
max_delay: float = 8.0,
exponential_base: float = 2.0,
jitter: float = 0.1,
):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
RETRYABLE_EXCEPTIONS = (
asyncio.TimeoutError,
ConnectionError,
)
async def retry_with_backoff(
func: Callable[..., Awaitable[T]],
*args,
config: RetryConfig = RetryConfig(),
is_retryable: Callable[[Exception], bool] = lambda e: isinstance(e, RETRYABLE_EXCEPTIONS),
**kwargs,
) -> T:
last_exc = None
for attempt in range(1, config.max_attempts + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exc = e
if attempt == config.max_attempts or not is_retryable(e):
raise
delay = min(
config.initial_delay * (config.exponential_base ** (attempt - 1)),
config.max_delay,
)
delay = delay * (1 + random.uniform(-config.jitter, config.jitter))
# 尊重 Retry-After header
if hasattr(e, "retry_after") and e.retry_after is not None:
delay = max(delay, e.retry_after)
await asyncio.sleep(delay)
raise last_exc
关键设计:
is_retryable必须区分错误类型——401/403/404 不应该重试,只有 5xx/429/TimeoutError/ConnectionError 才重试max_delay防止极端长延迟(如2^10 = 1024 秒)jitter避免多 Agent 实例同时重试Retry-Afterheader 优先于计算值(尊重服务端限流指令)
容错模式 3:熔断器(Circuit Breaker)
当某个工具的失败率超过阈值,应该主动熔断,避免"压垮一个已经倒下的服务":
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 正常调用
OPEN = "open" # 熔断中,直接抛错
HALF_OPEN = "half_open" # 试探性放行一次
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.opened_at: datetime | None = None
def allow_request(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
return True # HALF_OPEN 放行一次
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
circuit_breakers: dict[str, CircuitBreaker] = {}
def get_breaker(tool_name: str) -> CircuitBreaker:
if tool_name not in circuit_breakers:
circuit_breakers[tool_name] = CircuitBreaker()
return circuit_breakers[tool_name]
async def call_with_circuit_breaker(tool_name, *args, **kwargs):
breaker = get_breaker(tool_name)
if not breaker.allow_request():
raise CircuitOpenError(f"Circuit open for {tool_name}")
try:
result = await tool_registry[tool_name](*args, **kwargs)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise
熔断策略:
- 5 次连续失败 → 熔断 30 秒
- HALF_OPEN 状态试探性放行一次,成功 2 次后关闭
- 熔断时直接抛
CircuitOpenError,由上层降级到兜底逻辑
容错模式 4:降级到备用 Provider(Fallback Chain)
关键工具应该有备用方案。常见模式是同一类工具的 fallback chain:
class SearchProvider(Enum):
GOOGLE = "google"
BING = "bing"
DUCKDUCKGO = "duckduckgo"
LOCAL_INDEX = "local_index" # 最终兜底
async def search_with_fallback(query: str, max_results: int = 10):
providers = [
(SearchProvider.GOOGLE, _search_google, ToolCriticality.BLOCKING),
(SearchProvider.BING, _search_bing, ToolCriticality.BLOCKING),
(SearchProvider.DUCKDUCKGO, _search_duckduckgo, ToolCriticality.ENHANCING),
(SearchProvider.LOCAL_INDEX, _search_local_index, ToolCriticality.OPTIONAL),
]
last_error = None
for provider, fn, criticality in providers:
try:
result = await call_with_timeout(provider.value, criticality, fn, query, max_results)
return {"provider": provider.value, "results": result}
except Exception as e:
last_error = e
# 记录降级链
logger.warning(f"Provider {provider.value} failed: {e}, trying next")
continue
# 全部失败时,Agent 必须优雅降级——告知用户"暂时无法搜索"
raise AllProvidersFailedError(f"All search providers failed: {last_error}")
降级原则:
- 主备 Provider 应该是不同厂商,避免共享故障域
- 降级链按"质量/成本"排序:高质量高成本 → 低质量低成本 → 本地兜底
- 最终兜底应该是 100% 可用的(如本地索引、缓存、预置回答)
容错模式 5:可恢复错误分类
把所有错误视为"重试一次"是常见的反模式。正确的做法是分类错误,决定策略:
class ErrorCategory(Enum):
TRANSIENT = "transient" # 临时性,重试
RATE_LIMIT = "rate_limit" # 限流,遵守 Retry-After
PERMANENT = "permanent" # 永久性,不重试
CLIENT_ERROR = "client_error" # 4xx,请求有问题
SCHEMA_ERROR = "schema_error" # 响应结构变化,需要人工修复
TIMEOUT = "timeout" # 超时,可能降级到更简单的实现
def classify_error(exc: Exception, status_code: int | None = None) -> ErrorCategory:
if isinstance(exc, asyncio.TimeoutError):
return ErrorCategory.TIMEOUT
if isinstance(exc, (ConnectionError, OSError)):
return ErrorCategory.TRANSIENT
if status_code is not None:
if status_code == 429:
return ErrorCategory.RATE_LIMIT
if 400 <= status_code < 500 and status_code != 408:
return ErrorCategory.CLIENT_ERROR
if 500 <= status_code < 600:
return ErrorCategory.TRANSIENT
if isinstance(exc, (KeyError, ValueError, json.JSONDecodeError)):
return ErrorCategory.SCHEMA_ERROR
return ErrorCategory.PERMANENT
def should_retry(category: ErrorCategory) -> bool:
return category in {
ErrorCategory.TRANSIENT,
ErrorCategory.RATE_LIMIT,
ErrorCategory.TIMEOUT,
}
关键分类:
- TRANSIENT / RATE_LIMIT / TIMEOUT:可重试,配合退避
- CLIENT_ERROR(400/401/403/404):不重试,调用方问题
- SCHEMA_ERROR:不重试,触发告警 + 自动 snapshot
- PERMANENT:不重试,切换到备用方案
容错模式 6:结构化输出校验
Agent 的工具调用最脆弱的环节是 LLM 生成调用参数。结构化校验能防止"参数正确但语义错误":
from pydantic import BaseModel, Field, field_validator
class SearchQuery(BaseModel):
query: str = Field(..., min_length=1, max_length=500)
max_results: int = Field(default=10, ge=1, le=100)
language: str = Field(default="en")
time_filter: str | None = None
@field_validator("query")
@classmethod
def query_must_be_meaningful(cls, v: str) -> str:
if not v.strip() or len(v.split()) < 2:
raise ValueError("Query must be at least 2 words")
return v
@field_validator("time_filter")
@classmethod
def time_filter_must_be_valid(cls, v: str | None) -> str | None:
if v is None:
return v
valid = {"day", "week", "month", "year"}
if v not in valid:
raise ValueError(f"time_filter must be one of {valid}")
return v
async def search_tool(params: dict) -> list[dict]:
try:
query = SearchQuery(**params)
except ValidationError as e:
# 校验失败:可能是 LLM 生成错误或参数被注入
logger.error(f"Invalid search params: {e}")
raise SchemaError(f"Invalid parameters: {e.errors()}")
return await _search_google(query.query, query.max_results, query.language, query.time_filter)
校验策略:
- 用 Pydantic 定义严格的参数 schema
- 校验失败时,记录原始参数和错误信息,触发告警
- 严重时回退到"安全默认值",而不是直接失败
容错模式 7:幂等性设计
重试带来的副作用是重复调用。对于非幂等操作(如下单、扣款),必须实现幂等键:
import hashlib
from datetime import datetime
class IdempotencyKey:
def __init__(self, namespace: str, params: dict, ttl_seconds: int = 86400):
self.namespace = namespace
self.params = params
self.created_at = datetime.now()
self.ttl = ttl_seconds
@property
def key(self) -> str:
payload = json.dumps(self.params, sort_keys=True)
return f"{self.namespace}:{hashlib.sha256(payload.encode()).hexdigest()[:16]}"
def is_expired(self) -> bool:
return (datetime.now() - self.created_at).total_seconds() > self.ttl
idempotency_cache: dict[str, dict] = {}
async def idempotent_call(tool_name, params: dict, func, *args, **kwargs):
idem = IdempotencyKey(tool_name, params)
if idem.key in idempotency_cache and not idempotency_cache[idem.key]["expired"]:
return idempotency_cache[idem.key]["result"]
result = await func(*args, **kwargs)
idempotency_cache[idem.key] = {
"result": result,
"expired": idem.is_expired(),
}
return result
幂等原则:
- 所有"创建"类操作必须接受
idempotency_key参数 - 缓存最近 24 小时的响应,避免真正的服务端调用
- 重试时使用相同的 key,服务端自动去重
工具调用可靠性 Checklist
| 项 | 必做 | 选做 |
|---|---|---|
| 设置总超时 | ✅ | |
| 按关键性分级超时 | ✅ | |
| 区分可重试与不可重试错误 | ✅ | |
| 指数退避 + 抖动 | ✅ | |
| 尊重 Retry-After | ✅ | |
| 熔断器 | ✅ | |
| 备用 Provider | ✅ | |
| 响应结构校验 | ✅ | |
| 参数 schema 校验 | ✅ | |
| 幂等键 | ✅(非幂等操作) | |
| 失败告警 | ✅ | |
| 死信队列(DLQ) | ✅ | |
| 完整 trace | ✅ |
实施路径
第 1 阶段:审计所有工具调用,添加超时和错误分类。 第 2 阶段:为核心工具实现重试 + 熔断 + 降级。 第 3 阶段:建立备用 Provider 链,覆盖 80% 的关键工具。 第 4 阶段:实施 Pydantic 参数校验和响应结构校验。 第 5 阶段:对非幂等操作加幂等键保护。 第 6 阶段:把失败模式接入告警和死信队列。 第 7 阶段:定期演练故障注入(chaos engineering),验证容错能力。
总结
Agent 工具调用的可靠性不是"加一个 try-catch"那么简单,而是一套分层容错架构:超时分级控制等待时间、退避抖动避免压垮服务、熔断器快速失败、备用 Provider 保证兜底、错误分类决定策略、结构化校验防止 LLM 错误、幂等键让重试安全。
把容错做扎实,Agent 才能从"能 demo"变成"能上生产"。
参考工具:Pydantic AI(强校验的 Agent 框架)、OpenAI Agents SDK(内置工具容错)、Strands Agents(AWS 维护的 Agent SDK)、LangChain(成熟的工具调用抽象)和 CrewAI(多 Agent 工具协作)都为工具容错提供了基础实现。
本文涉及的项目
PydanticAI
18.1k ⭐PydanticAI 基于类型系统构建 Agent,强调可验证的数据结构、工具调用与生产级可靠性。
OpenAI Agents SDK
15.0k ⭐OpenAI Agents SDK 是 OpenAI 官方提供的 Agent 开发工具包,支持构建多步骤工作流的 AI Agent,提供工具调用、状态管理等核心功能。
Strands Agents SDK
6.4k ⭐Strands Agents SDK 是 AWS 开源的 Agent 框架,采用模型驱动的方法构建 AI Agent,内置工具使用、对话记忆和多 Agent 协作能力。
LangChain
140.6k ⭐LangChain 是面向 Agent 工程化的开源框架与编排平台,提供模型接入、工具调用、RAG、记忆与可观测性的统一抽象。
CrewAI
54.6k ⭐CrewAI 是一个用于编排角色扮演、自主协作 AI Agent 的多智能体框架,可让多个 Agent 像团队一样分工完成复杂任务。