Agent 工作流编排实战:从 DAG 到状态机的生产级模式
多数 Agent 工作流并非败在模型能力,而是败在编排层。对比 DAG、状态机、可视化构建器三种编排范式,给出可复制的生产级错误处理、人工审批和条件分支代码。
Agent 工作流编排实战:从 DAG 到状态机的生产级模式
做过几个 Agent 项目后你会发现一个规律:单次 LLM 调用通常不难,难的是把多次调用串成一个稳定运行的流程。
一个典型的生产级 Agent 工作流往往涉及:多轮对话、外部工具调用、条件分支、错误重试、人工审批、结果持久化。这些环节中,任何一步出问题,整个链路就会中断。而 LLM 的非确定性——同样的输入可能产出不同结果——让问题更难复现和排查。
这篇文章不谈"什么是工作流",而是回答一个更实际的问题:当你需要在生产环境编排 Agent 工作流时,应该选择哪种模式,怎么写才够健壮?
三种核心编排范式
目前主流的 Agent 工作流编排有三条路线:DAG(有向无环图)、State Machine(状态机)、Visual Builder(可视化构建器)。它们各有适用场景,不是"谁更好"的关系。
DAG 编排
DAG 把工作流拆成一组有依赖关系的任务节点。每个节点执行完,输出流向下游节点。
适用场景:数据处理类 pipeline,步骤之间是纯粹的前后依赖关系,没有复杂的循环或条件跳转。比如"文档 → 分块 → 向量化 → 入库"。
优势:天然支持并行执行(无依赖的节点可以同时跑),逻辑清晰可审计,容易做性能优化。
劣势:遇到需要循环、回退、动态分支的场景会很别扭。DAG 的"无环"约束意味着你不能让步骤回到之前的状态。
像 PySpur 就是用 DAG 模式构建 Agent 流程的工具,适合偏数据处理的场景。
状态机编排
状态机用"状态 + 转换条件"来描述工作流。每个节点是一个状态,转换条件决定下一步去哪个状态。
适用场景:需要循环、条件分支、人工介入、错误恢复的复杂工作流。比如客服机器人(可能多轮对话后转人工)、审批流(可能被打回修改)。
优势:天然支持循环和回退,状态可持久化,失败后可以从上次状态恢复。LangGraph 就是状态机范式的代表。
劣势:状态多了之后管理复杂度上升,需要仔细设计状态转换表,否则容易变成"面条式"流程。
Julep 也采用了状态机式的任务编排模型,支持长期运行的 Agent 任务和复杂的转换逻辑。
可视化构建器
用拖拽界面组装工作流,底层仍然是 DAG 或状态机,但用户不需要写代码来定义流程。
适用场景:非开发者参与流程设计、快速原型验证、标准化模板复用。
优势:降低编排门槛,所见即所得,方便团队协作和知识传递。
劣势:复杂逻辑的可视化表达有上限,版本控制和 diff 不如代码方便,调试能力受限于平台提供的能力。
LangChain Open Agent Platform、Refly 和 Vercel Workflow Builder 都在这条路上探索。
生产模式一:指数退避重试
第一个要解决的问题:外部工具调用会失败。API 超时、服务限流、网络抖动……这些都是生产环境的常态。
简单的"重试三次"不够用。你需要的是:可配置的重试策略、退避间隔、可区分的错误类型、重试耗尽后的降级逻辑。
import asyncio
import random
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
from enum import Enum
class ErrorKind(Enum):
TRANSIENT = "transient" # 网络超时、限流,可重试
PERMANENT = "permanent" # 参数错误、权限拒绝,不应重试
UNKNOWN = "unknown" # 不确定,保守重试
@dataclass
class RetryPolicy:
max_retries: int = 3
base_delay: float = 1.0 # 秒
max_delay: float = 60.0 # 秒
backoff_factor: float = 2.0
jitter: bool = True
def get_delay(self, attempt: int) -> float:
delay = min(self.base_delay * (self.backoff_factor ** attempt), self.max_delay)
if self.jitter:
delay *= random.uniform(0.5, 1.5)
return delay
@dataclass
class RetryResult:
success: bool
value: Any = None
error: Exception | None = None
attempts: int = 0
def classify_error(exc: Exception) -> ErrorKind:
"""根据异常类型判断是否值得重试"""
if isinstance(exc, (ConnectionError, TimeoutError, asyncio.TimeoutError)):
return ErrorKind.TRANSIENT
if isinstance(exc, (ValueError, TypeError, PermissionError)):
return ErrorKind.PERMANENT
status = getattr(exc, "status_code", None)
if status and int(status) == 429:
return ErrorKind.TRANSIENT
if status and int(status) >= 400 and int(status) < 500:
return ErrorKind.PERMANENT
return ErrorKind.UNKNOWN
async def with_retry(
fn: Callable[..., Awaitable[Any]],
policy: RetryPolicy | None = None,
on_retry: Callable[[int, Exception, float], Awaitable[None]] | None = None,
) -> RetryResult:
"""带指数退避的异步重试封装"""
policy = policy or RetryPolicy()
for attempt in range(policy.max_retries + 1):
try:
result = await fn()
return RetryResult(success=True, value=result, attempts=attempt + 1)
except Exception as exc:
kind = classify_error(exc)
if kind == ErrorKind.PERMANENT:
return RetryResult(success=False, error=exc, attempts=attempt + 1)
if attempt < policy.max_retries:
delay = policy.get_delay(attempt)
if on_retry:
await on_retry(attempt + 1, exc, delay)
await asyncio.sleep(delay)
else:
return RetryResult(success=False, error=exc, attempts=attempt + 1)
return RetryResult(success=False, error=RuntimeError("unreachable"), attempts=0)
# 使用示例
async def call_search_api(query: str) -> dict:
"""模拟一个会偶尔失败的外部 API"""
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"https://api.example.com/search?q={query}")
resp.raise_for_status()
return resp.json()
async def main():
async def log_retry(attempt: int, exc: Exception, delay: float):
print(f"Retry #{attempt} after {delay:.1f}s: {exc}")
result = await with_retry(
lambda: call_search_api("LangGraph tutorial"),
policy=RetryPolicy(max_retries=3, base_delay=2.0),
on_retry=log_retry,
)
if result.success:
print(f"Success after {result.attempts} attempt(s)")
else:
print(f"Failed after {result.attempts} attempt(s): {result.error}")
if __name__ == "__main__":
asyncio.run(main())
关键设计点:
- 错误分类(
classify_error)避免对永久性错误做无意义重试 - Jitter 抖动防止多个工作流同时重试造成"惊群"
on_retry回调让你可以记录日志、发告警、更新 UI- 返回
RetryResult结构体而不是抛异常,让调用方决定如何处理失败
生产模式二:人工审批门
很多业务场景下,Agent 不能完全自主决策。比如:执行金额超过阈值的操作、对外发送邮件/消息、修改生产环境配置。这些节点需要"暂停,等人工确认后继续"。
import json
import uuid
from datetime import datetime, timezone
from enum import Enum
from dataclasses import dataclass, field
from typing import Any
class StepStatus(Enum):
PENDING = "pending"
WAITING_APPROVAL = "waiting_approval"
APPROVED = "approved"
REJECTED = "rejected"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkflowStep:
step_id: str
step_type: str
status: StepStatus = StepStatus.PENDING
requires_approval: bool = False
approval_reason: str = ""
input_data: dict = field(default_factory=dict)
output_data: dict = field(default_factory=dict)
error: str = ""
@dataclass
class Workflow:
workflow_id: str
name: str
steps: list[WorkflowStep] = field(default_factory=list)
current_step_index: int = 0
created_at: str = ""
@classmethod
def create(cls, name: str, step_defs: list[dict]) -> "Workflow":
steps = []
for sd in step_defs:
steps.append(WorkflowStep(
step_id=str(uuid.uuid4())[:8],
step_type=sd["type"],
requires_approval=sd.get("requires_approval", False),
approval_reason=sd.get("approval_reason", ""),
input_data=sd.get("input_data", {}),
))
return cls(
workflow_id=str(uuid.uuid4())[:12],
name=name,
steps=steps,
created_at=datetime.now(timezone.utc).isoformat(),
)
def next_pending_step(self) -> WorkflowStep | None:
for step in self.steps:
if step.status == StepStatus.PENDING:
return step
return None
def find_approval_step(self, step_id: str) -> WorkflowStep | None:
for step in self.steps:
if step.step_id == step_id and step.status == StepStatus.WAITING_APPROVAL:
return step
return None
class ApprovalService:
"""管理人工审批的挂起和恢复"""
def __init__(self):
self._pending: dict[str, Workflow] = {}
def request_approval(self, workflow: Workflow, step: WorkflowStep) -> str:
step.status = StepStatus.WAITING_APPROVAL
self._pending[step.step_id] = workflow
return step.step_id
def approve(self, step_id: str, reviewer: str, comment: str = "") -> WorkflowStep | None:
workflow = self._pending.pop(step_id, None)
if not workflow:
return None
step = workflow.find_approval_step(step_id)
if not step:
return None
step.status = StepStatus.APPROVED
step.output_data["approved_by"] = reviewer
step.output_data["approval_comment"] = comment
return step
def reject(self, step_id: str, reviewer: str, reason: str = "") -> WorkflowStep | None:
workflow = self._pending.pop(step_id, None)
if not workflow:
return None
step = workflow.find_approval_step(step_id)
if not step:
return None
step.status = StepStatus.REJECTED
step.output_data["rejected_by"] = reviewer
step.output_data["rejection_reason"] = reason
return step
# 使用示例
def build_deployment_workflow():
"""构建一个需要人工审批的部署工作流"""
workflow = Workflow.create("deploy-to-production", [
{"type": "run_tests", "input_data": {"suite": "full"}},
{"type": "build_image", "input_data": {"dockerfile": "Dockerfile.prod"}},
{
"type": "deploy",
"requires_approval": True,
"approval_reason": "Deploying to production environment",
"input_data": {"environment": "prod", "replicas": 3},
},
{"type": "smoke_test", "input_data": {"endpoints": ["/health", "/api/v1/status"]}},
])
approval_service = ApprovalService()
# 模拟执行前两步
workflow.steps[0].status = StepStatus.COMPLETED
workflow.steps[1].status = StepStatus.COMPLETED
# 第三步需要审批
deploy_step = workflow.steps[2]
ticket_id = approval_service.request_approval(workflow, deploy_step)
print(f"Deployment pending approval. Ticket: {ticket_id}")
print(f"Step status: {deploy_step.status.value}")
# 模拟审批通过
result = approval_service.approve(ticket_id, reviewer="alice", comment="LGTM")
if result:
print(f"Approved by {result.output_data['approved_by']}")
print(f"Step status: {result.status.value}")
return workflow
if __name__ == "__main__":
wf = build_deployment_workflow()
关键设计点:
ApprovalService把审批状态管理从工作流逻辑中解耦出来,可以用内存、数据库或外部审批系统作为后端- 工作流可以序列化并持久化,支持长时间等待(分钟级、小时级甚至天级)
reject不是终点——被拒绝的步骤可以修改参数后重新提交
生产模式三:条件分支与动态工具选择
Agent 工作流最灵活的地方在于:根据中间结果动态决定下一步做什么。这比固定 pipeline 强大得多,但也更容易失控。关键是要把分支逻辑显式化,而不是藏在 Prompt 里让模型"自己判断"。
from dataclasses import dataclass
from typing import Any, Literal
from enum import Enum
class ToolCategory(Enum):
SEARCH = "search"
CODE_EXEC = "code_exec"
DATABASE = "database"
FILE_IO = "file_io"
@dataclass
class ToolInfo:
name: str
category: ToolCategory
cost_estimate: float # 预估调用成本 (USD)
avg_latency_ms: int # 平均延迟
reliability: float # 成功率 0.0 ~ 1.0
TOOL_REGISTRY: dict[str, ToolInfo] = {
"web_search": ToolInfo("web_search", ToolCategory.SEARCH, 0.002, 800, 0.98),
"code_interpreter": ToolInfo("code_interpreter", ToolCategory.CODE_EXEC, 0.05, 3000, 0.92),
"sql_query": ToolInfo("sql_query", ToolCategory.DATABASE, 0.001, 200, 0.99),
"file_read": ToolInfo("file_read", ToolCategory.FILE_IO, 0.0001, 50, 0.999),
}
@dataclass
class BranchCondition:
field: str
operator: Literal["eq", "ne", "gt", "lt", "contains", "in"]
value: Any
@dataclass
class BranchRoute:
condition: BranchCondition
target_step: str
tools: list[str]
@dataclass
class StepDef:
name: str
prompt_template: str
routes: list[BranchRoute] = None # type: ignore
default_next: str | None = None
default_tools: list[str] | None = None
def resolve(self, context: dict) -> tuple[str | None, list[str]]:
"""根据运行时上下文决定下一步和可用工具"""
for route in self.routes or []:
if _evaluate_condition(route.condition, context):
return route.target_step, route.tools
return self.default_next, self.default_tools or []
def _evaluate_condition(cond: BranchCondition, context: dict) -> bool:
actual = context.get(cond.field)
match cond.operator:
case "eq": return actual == cond.value
case "ne": return actual != cond.value
case "gt": return actual is not None and actual > cond.value
case "lt": return actual is not None and actual < cond.value
case "contains": return cond.value in actual if actual else False
case "in": return actual in cond.value if actual else False
case _: return False
def build_rag_workflow() -> dict[str, StepDef]:
"""构建一个带条件分支的 RAG 工作流"""
return {
"classify": StepDef(
name="classify",
prompt_template="Classify the user query type: {query}",
routes=[
BranchRoute(
condition=BranchCondition("query_type", "eq", "code"),
target_step="code_search",
tools=["web_search", "code_interpreter"],
),
BranchRoute(
condition=BranchCondition("query_type", "eq", "data"),
target_step="data_query",
tools=["sql_query", "file_read"],
),
BranchRoute(
condition=BranchCondition("query_type", "in", ["general", "factual"]),
target_step="web_search",
tools=["web_search"],
),
],
default_next="web_search",
default_tools=["web_search"],
),
"code_search": StepDef(
name="code_search",
prompt_template="Search for code related to: {query}",
default_next="generate_answer",
default_tools=["web_search"],
),
"data_query": StepDef(
name="data_query",
prompt_template="Query data for: {query}",
default_next="generate_answer",
default_tools=["sql_query"],
),
"web_search": StepDef(
name="web_search",
prompt_template="Search the web for: {query}",
default_next="generate_answer",
default_tools=["web_search"],
),
"generate_answer": StepDef(
name="generate_answer",
prompt_template="Generate a comprehensive answer based on: {context}",
),
}
# 使用示例
def demo():
workflow = build_rag_workflow()
test_contexts = [
{"query": "How to implement binary search in Python?", "query_type": "code"},
{"query": "What were Q1 sales figures?", "query_type": "data"},
{"query": "What is the capital of France?", "query_type": "factual"},
{"query": "Random question", "query_type": "unknown"},
]
for ctx in test_contexts:
classify_step = workflow["classify"]
next_step, tools = classify_step.resolve(ctx)
print(f"Query: {ctx['query'][:50]}...")
print(f" Type: {ctx['query_type']} -> Next: {next_step}, Tools: {tools}")
print()
if __name__ == "__main__":
demo()
关键设计点:
- 分支条件用结构化数据描述(
BranchCondition),而不是藏在 Prompt 文本里。这使得条件可以被测试、审计和版本控制 - 工具选择和路由目标绑定在一起,确保每条路径只加载它需要的工具,降低误调用风险
default_next兜底保证永远不会进入死胡同
编排范式选择决策表
| 维度 | DAG | 状态机 | 可视化构建器 |
|---|---|---|---|
| 循环/回退 | 不支持 | 原生支持 | 取决于平台 |
| 并行执行 | 天然支持 | 需要显式设计 | 取决于平台 |
| 学习曲线 | 低 | 中 | 低 |
| 复杂度上限 | 中 | 高 | 中 |
| 版本控制友好度 | 高(代码即配置) | 高 | 低(JSON/数据库) |
| 调试能力 | 中 | 强(状态可检查) | 受限于平台 |
| 适合团队 | 有工程背景 | 有工程背景 | 跨职能团队 |
| 典型工具 | PySpur、Prefect | LangGraph、Julep | LangChain Open Agent Platform、Refly |
简单决策规则:
- 工作流是线性的或只有简单分支 → DAG
- 工作流有循环、需要持久化状态、需要人工介入 → 状态机
- 团队中有非工程师需要参与流程设计 → 可视化构建器
- 三种范式不互斥——复杂系统经常组合使用
三个常见错误
错误一:把 Prompt 当流程控制器
很多项目试图用 Prompt 指挥 LLM 在不同分支间跳转:"如果用户问的是 A,就调用工具 X;如果问的是 B,就调用工具 Y"。这在 demo 阶段看起来能用,但生产环境下,LLM 的指令遵从率不会是 100%。一次误判就会导致错误的工具被调用、数据被写错地方。
正确做法:用代码控制流程分支,Prompt 只负责"理解意图"和"生成内容"。
错误二:忽视 LLM 调用的延迟分布
LLM API 的响应延迟不是正态分布,而是长尾分布。P50 可能是 1 秒,P99 可能是 30 秒甚至超时。如果你按"平均 2 秒"来设计超时策略,大约每 100 次调用就会有几次超时失败。在串联 5 步的工作流中,整体超时概率会显著叠加。
正确做法:按 P99 设计超时,每步设置独立的超时上限,工作流层面也要有总超时。
错误三:无状态设计导致无法恢复
一个 10 步的工作流,在第 7 步失败了。如果你的工作流是无状态的(不保存中间结果),你就必须从头重跑。更糟的是,前 6 步中可能有发送邮件、扣费等不可逆操作。
正确做法:每步完成后持久化状态,失败后从断点恢复。这也是状态机范式在生产环境中更受欢迎的核心原因。
总结
- 选对范式比选对框架重要。DAG 适合线性 pipeline,状态机适合有循环和审批的复杂流程,可视化构建器适合跨职能团队协作。多数生产系统需要组合使用
- 错误处理是编排层最重要的职责。区分永久性和暂时性错误,用指数退避加重试,重试耗尽后走降级逻辑——这些应该成为工作流的基础设施,而不是每个节点自己实现
- 把流程控制和 LLM 推理分开。Prompt 负责理解和生成,代码负责分支、重试、审批。混在一起会同时降低可靠性和可维护性
- 每一步都要持久化状态。这不是"可以以后优化"的事情,是上线第一天就要做好的事情
- 用结构化数据描述分支条件,而不是让 LLM 自行判断流程走向。可测试、可审计、可回溯
推荐结合 PySpur(DAG 编排)、Julep(状态机编排)、LangChain Open Agent Platform(可视化构建)和 Refly 来对比不同编排范式的实际体验。
本文涉及的项目
PySpur
5.7k ⭐PySpur 是一个可视化 Agent 工作流编辑平台,支持拖拽式构建 AI Agent 管道,内置评估测试和人机协作循环。
Julep
6.6k ⭐Julep 是面向 AI Agent 的无服务器工作流部署平台,可大规模部署 AI 工作流,被称为 AI Agent 的 Firebase。
Open Agent Platform
1.9k ⭐Open Agent Platform 是 LangChain 团队开源的 Agent 部署平台,强调多 Agent 运行、长时任务、可观测性与生产环境编排,适合作为 Agent 服务化落地基础设施。
Refly
7.3k ⭐Refly 是首个开源的智能体技能构建器,通过可视化工作流定义技能,可在 Claude Code、Cursor、Codex 等多个平台上运行,将技能作为基础设施而非简单提示词。
Workflow Builder Template
1.1k ⭐Workflow Builder Template 是 Vercel Labs 提供的 AI 工作流构建器模板,支持可视化编排 AI Agent 工作流。基于 Next.js 和 Vercel AI SDK 构建,提供拖拽式工作流设计体验。