Agent 工具调用容错:超时、重试、熔断、降级与幂等

系统梳理 Agent 工具调用的 7 大容错模式:超时分级、指数退避 + 抖动、熔断器、备用 Provider 链、可恢复错误分类、结构化校验、幂等键设计,让 Agent 在不稳定的真实环境中保持稳定输出。

AgentList · 2026年6月29日
容错工具调用重试熔断可靠性

Agent 系统的可靠性瓶颈几乎都集中在工具调用上:HTTP 超时、429 限流、5xx 错误、Schema 解析失败、部分成功——这些边缘情况决定了 Agent 是"勉强能用"还是"生产可用"。本文从工程实战角度系统梳理工具调用的容错模式:超时分级、重试退避、熔断降级、幂等设计、可恢复错误分类,让 Agent 在不稳定的真实环境中保持稳定输出。

为什么工具调用是 Agent 的最大可靠性瓶颈

LLM 本身是无状态的推理服务,可用性通常 99.9% 以上。但 Agent 调用的工具——搜索 API、数据库、邮件服务、第三方 SaaS——每多一个依赖,失败率就指数级叠加。一次涉及 10 个工具调用的 Agent 任务,即使每个工具的可用性是 99.5%,端到端的成功率也只有 95.1%。

更糟糕的是工具失败的"长尾"现象:

  • 偶发超时:5% 的请求 5xx,但 P99 延迟 30 秒
  • 间歇限流:每天 9 点到 10 点集中 429,其他时段正常
  • 半成功:订单创建成功但扣款失败,状态不一致
  • Schema 漂移:第三方 API 升级后字段类型变化,Agent 解析崩溃
  • 网络分区:DNS 故障、TLS 握手失败、CDN 节点宕机

这些错误模式不能简单靠"重试一次"解决。需要建立一套分层容错架构

容错模式 1:超时分级(Timeout Hierarchy)

最常见的错误是"没有设置超时"或者"超时设置得过长"。Agent 工具调用必须分级设置超时:

from dataclasses import dataclass
from enum import Enum

class ToolCriticality(Enum):
    BLOCKING = "blocking"      # 必须等待,直接影响主流程
    ENHANCING = "enhancing"     # 增强体验,失败可降级
    OPTIONAL = "optional"       # 完全可选,失败忽略

TIMEOUT_CONFIG = {
    ToolCriticality.BLOCKING: {
        "connect_timeout": 2.0,    # TCP 连接建立
        "read_timeout": 10.0,     # 首次响应
        "total_timeout": 30.0,    # 整个调用
    },
    ToolCriticality.ENHANCING: {
        "connect_timeout": 1.0,
        "read_timeout": 5.0,
        "total_timeout": 15.0,
    },
    ToolCriticality.OPTIONAL: {
        "connect_timeout": 1.0,
        "read_timeout": 3.0,
        "total_timeout": 5.0,
    },
}

async def call_with_timeout(tool_name, criticality, *args, **kwargs):
    cfg = TIMEOUT_CONFIG[criticality]
    return await asyncio.wait_for(
        tool_registry[tool_name](*args, **kwargs),
        timeout=cfg["total_timeout"]
    )

分级原则

  • BLOCKING(如订单查询、支付、核心业务 API):宽裕超时,重试 + 降级到备用 Provider
  • ENHANCING(如推荐、个性化、上下文增强):中等超时,失败用兜底数据
  • OPTIONAL(如分析埋点、用户行为追踪):极短超时,失败直接吞掉

容错模式 2:指数退避 + 抖动(Exponential Backoff with Jitter)

重试不能简单循环,需要指数退避和随机抖动来避免"thundering herd":

import random
import asyncio
from typing import Callable, Awaitable, TypeVar

T = TypeVar("T")

class RetryConfig:
    def __init__(
        self,
        max_attempts: int = 3,
        initial_delay: float = 0.5,
        max_delay: float = 8.0,
        exponential_base: float = 2.0,
        jitter: float = 0.1,
    ):
        self.max_attempts = max_attempts
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter

RETRYABLE_EXCEPTIONS = (
    asyncio.TimeoutError,
    ConnectionError,
)

async def retry_with_backoff(
    func: Callable[..., Awaitable[T]],
    *args,
    config: RetryConfig = RetryConfig(),
    is_retryable: Callable[[Exception], bool] = lambda e: isinstance(e, RETRYABLE_EXCEPTIONS),
    **kwargs,
) -> T:
    last_exc = None
    for attempt in range(1, config.max_attempts + 1):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exc = e
            if attempt == config.max_attempts or not is_retryable(e):
                raise
            
            delay = min(
                config.initial_delay * (config.exponential_base ** (attempt - 1)),
                config.max_delay,
            )
            delay = delay * (1 + random.uniform(-config.jitter, config.jitter))
            
            # 尊重 Retry-After header
            if hasattr(e, "retry_after") and e.retry_after is not None:
                delay = max(delay, e.retry_after)
            
            await asyncio.sleep(delay)
    raise last_exc

关键设计

  • is_retryable 必须区分错误类型——401/403/404 不应该重试,只有 5xx/429/TimeoutError/ConnectionError 才重试
  • max_delay 防止极端长延迟(如 2^10 = 1024 秒
  • jitter 避免多 Agent 实例同时重试
  • Retry-After header 优先于计算值(尊重服务端限流指令)

容错模式 3:熔断器(Circuit Breaker)

当某个工具的失败率超过阈值,应该主动熔断,避免"压垮一个已经倒下的服务":

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # 正常调用
    OPEN = "open"          # 熔断中,直接抛错
    HALF_OPEN = "half_open"  # 试探性放行一次

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 2,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.opened_at: datetime | None = None
    
    def allow_request(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                return True
            return False
        return True  # HALF_OPEN 放行一次
    
    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        else:
            self.failure_count = 0
    
    def record_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

circuit_breakers: dict[str, CircuitBreaker] = {}

def get_breaker(tool_name: str) -> CircuitBreaker:
    if tool_name not in circuit_breakers:
        circuit_breakers[tool_name] = CircuitBreaker()
    return circuit_breakers[tool_name]

async def call_with_circuit_breaker(tool_name, *args, **kwargs):
    breaker = get_breaker(tool_name)
    if not breaker.allow_request():
        raise CircuitOpenError(f"Circuit open for {tool_name}")
    try:
        result = await tool_registry[tool_name](*args, **kwargs)
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

熔断策略

  • 5 次连续失败 → 熔断 30 秒
  • HALF_OPEN 状态试探性放行一次,成功 2 次后关闭
  • 熔断时直接抛 CircuitOpenError,由上层降级到兜底逻辑

容错模式 4:降级到备用 Provider(Fallback Chain)

关键工具应该有备用方案。常见模式是同一类工具的 fallback chain:

class SearchProvider(Enum):
    GOOGLE = "google"
    BING = "bing"
    DUCKDUCKGO = "duckduckgo"
    LOCAL_INDEX = "local_index"  # 最终兜底

async def search_with_fallback(query: str, max_results: int = 10):
    providers = [
        (SearchProvider.GOOGLE, _search_google, ToolCriticality.BLOCKING),
        (SearchProvider.BING, _search_bing, ToolCriticality.BLOCKING),
        (SearchProvider.DUCKDUCKGO, _search_duckduckgo, ToolCriticality.ENHANCING),
        (SearchProvider.LOCAL_INDEX, _search_local_index, ToolCriticality.OPTIONAL),
    ]
    
    last_error = None
    for provider, fn, criticality in providers:
        try:
            result = await call_with_timeout(provider.value, criticality, fn, query, max_results)
            return {"provider": provider.value, "results": result}
        except Exception as e:
            last_error = e
            # 记录降级链
            logger.warning(f"Provider {provider.value} failed: {e}, trying next")
            continue
    
    # 全部失败时,Agent 必须优雅降级——告知用户"暂时无法搜索"
    raise AllProvidersFailedError(f"All search providers failed: {last_error}")

降级原则

  • 主备 Provider 应该是不同厂商,避免共享故障域
  • 降级链按"质量/成本"排序:高质量高成本 → 低质量低成本 → 本地兜底
  • 最终兜底应该是 100% 可用的(如本地索引、缓存、预置回答)

容错模式 5:可恢复错误分类

把所有错误视为"重试一次"是常见的反模式。正确的做法是分类错误,决定策略

class ErrorCategory(Enum):
    TRANSIENT = "transient"        # 临时性,重试
    RATE_LIMIT = "rate_limit"      # 限流,遵守 Retry-After
    PERMANENT = "permanent"        # 永久性,不重试
    CLIENT_ERROR = "client_error"   # 4xx,请求有问题
    SCHEMA_ERROR = "schema_error"  # 响应结构变化,需要人工修复
    TIMEOUT = "timeout"            # 超时,可能降级到更简单的实现

def classify_error(exc: Exception, status_code: int | None = None) -> ErrorCategory:
    if isinstance(exc, asyncio.TimeoutError):
        return ErrorCategory.TIMEOUT
    if isinstance(exc, (ConnectionError, OSError)):
        return ErrorCategory.TRANSIENT
    if status_code is not None:
        if status_code == 429:
            return ErrorCategory.RATE_LIMIT
        if 400 <= status_code < 500 and status_code != 408:
            return ErrorCategory.CLIENT_ERROR
        if 500 <= status_code < 600:
            return ErrorCategory.TRANSIENT
    if isinstance(exc, (KeyError, ValueError, json.JSONDecodeError)):
        return ErrorCategory.SCHEMA_ERROR
    return ErrorCategory.PERMANENT

def should_retry(category: ErrorCategory) -> bool:
    return category in {
        ErrorCategory.TRANSIENT,
        ErrorCategory.RATE_LIMIT,
        ErrorCategory.TIMEOUT,
    }

关键分类

  • TRANSIENT / RATE_LIMIT / TIMEOUT:可重试,配合退避
  • CLIENT_ERROR(400/401/403/404):不重试,调用方问题
  • SCHEMA_ERROR:不重试,触发告警 + 自动 snapshot
  • PERMANENT:不重试,切换到备用方案

容错模式 6:结构化输出校验

Agent 的工具调用最脆弱的环节是 LLM 生成调用参数。结构化校验能防止"参数正确但语义错误":

from pydantic import BaseModel, Field, field_validator

class SearchQuery(BaseModel):
    query: str = Field(..., min_length=1, max_length=500)
    max_results: int = Field(default=10, ge=1, le=100)
    language: str = Field(default="en")
    time_filter: str | None = None
    
    @field_validator("query")
    @classmethod
    def query_must_be_meaningful(cls, v: str) -> str:
        if not v.strip() or len(v.split()) < 2:
            raise ValueError("Query must be at least 2 words")
        return v
    
    @field_validator("time_filter")
    @classmethod
    def time_filter_must_be_valid(cls, v: str | None) -> str | None:
        if v is None:
            return v
        valid = {"day", "week", "month", "year"}
        if v not in valid:
            raise ValueError(f"time_filter must be one of {valid}")
        return v

async def search_tool(params: dict) -> list[dict]:
    try:
        query = SearchQuery(**params)
    except ValidationError as e:
        # 校验失败:可能是 LLM 生成错误或参数被注入
        logger.error(f"Invalid search params: {e}")
        raise SchemaError(f"Invalid parameters: {e.errors()}")
    return await _search_google(query.query, query.max_results, query.language, query.time_filter)

校验策略

  • 用 Pydantic 定义严格的参数 schema
  • 校验失败时,记录原始参数和错误信息,触发告警
  • 严重时回退到"安全默认值",而不是直接失败

容错模式 7:幂等性设计

重试带来的副作用是重复调用。对于非幂等操作(如下单、扣款),必须实现幂等键:

import hashlib
from datetime import datetime

class IdempotencyKey:
    def __init__(self, namespace: str, params: dict, ttl_seconds: int = 86400):
        self.namespace = namespace
        self.params = params
        self.created_at = datetime.now()
        self.ttl = ttl_seconds
    
    @property
    def key(self) -> str:
        payload = json.dumps(self.params, sort_keys=True)
        return f"{self.namespace}:{hashlib.sha256(payload.encode()).hexdigest()[:16]}"
    
    def is_expired(self) -> bool:
        return (datetime.now() - self.created_at).total_seconds() > self.ttl

idempotency_cache: dict[str, dict] = {}

async def idempotent_call(tool_name, params: dict, func, *args, **kwargs):
    idem = IdempotencyKey(tool_name, params)
    if idem.key in idempotency_cache and not idempotency_cache[idem.key]["expired"]:
        return idempotency_cache[idem.key]["result"]
    
    result = await func(*args, **kwargs)
    idempotency_cache[idem.key] = {
        "result": result,
        "expired": idem.is_expired(),
    }
    return result

幂等原则

  • 所有"创建"类操作必须接受 idempotency_key 参数
  • 缓存最近 24 小时的响应,避免真正的服务端调用
  • 重试时使用相同的 key,服务端自动去重

工具调用可靠性 Checklist

必做 选做
设置总超时
按关键性分级超时
区分可重试与不可重试错误
指数退避 + 抖动
尊重 Retry-After
熔断器
备用 Provider
响应结构校验
参数 schema 校验
幂等键 ✅(非幂等操作)
失败告警
死信队列(DLQ)
完整 trace

实施路径

第 1 阶段:审计所有工具调用,添加超时和错误分类。 第 2 阶段:为核心工具实现重试 + 熔断 + 降级。 第 3 阶段:建立备用 Provider 链,覆盖 80% 的关键工具。 第 4 阶段:实施 Pydantic 参数校验和响应结构校验。 第 5 阶段:对非幂等操作加幂等键保护。 第 6 阶段:把失败模式接入告警和死信队列。 第 7 阶段:定期演练故障注入(chaos engineering),验证容错能力。

总结

Agent 工具调用的可靠性不是"加一个 try-catch"那么简单,而是一套分层容错架构:超时分级控制等待时间、退避抖动避免压垮服务、熔断器快速失败、备用 Provider 保证兜底、错误分类决定策略、结构化校验防止 LLM 错误、幂等键让重试安全。

把容错做扎实,Agent 才能从"能 demo"变成"能上生产"。

参考工具:Pydantic AI(强校验的 Agent 框架)、OpenAI Agents SDK(内置工具容错)、Strands Agents(AWS 维护的 Agent SDK)、LangChain(成熟的工具调用抽象)和 CrewAI(多 Agent 工具协作)都为工具容错提供了基础实现。