Agent 工作流编排实战:从 DAG 到状态机的生产级模式

多数 Agent 工作流并非败在模型能力,而是败在编排层。对比 DAG、状态机、可视化构建器三种编排范式,给出可复制的生产级错误处理、人工审批和条件分支代码。

AgentList Team · 2026年4月28日
AI Agent工作流编排DAG状态机LangGraph

Agent 工作流编排实战:从 DAG 到状态机的生产级模式

做过几个 Agent 项目后你会发现一个规律:单次 LLM 调用通常不难,难的是把多次调用串成一个稳定运行的流程。

一个典型的生产级 Agent 工作流往往涉及:多轮对话、外部工具调用、条件分支、错误重试、人工审批、结果持久化。这些环节中,任何一步出问题,整个链路就会中断。而 LLM 的非确定性——同样的输入可能产出不同结果——让问题更难复现和排查。

这篇文章不谈"什么是工作流",而是回答一个更实际的问题:当你需要在生产环境编排 Agent 工作流时,应该选择哪种模式,怎么写才够健壮?

三种核心编排范式

目前主流的 Agent 工作流编排有三条路线:DAG(有向无环图)、State Machine(状态机)、Visual Builder(可视化构建器)。它们各有适用场景,不是"谁更好"的关系。

DAG 编排

DAG 把工作流拆成一组有依赖关系的任务节点。每个节点执行完,输出流向下游节点。

适用场景:数据处理类 pipeline,步骤之间是纯粹的前后依赖关系,没有复杂的循环或条件跳转。比如"文档 → 分块 → 向量化 → 入库"。

优势:天然支持并行执行(无依赖的节点可以同时跑),逻辑清晰可审计,容易做性能优化。

劣势:遇到需要循环、回退、动态分支的场景会很别扭。DAG 的"无环"约束意味着你不能让步骤回到之前的状态。

PySpur 就是用 DAG 模式构建 Agent 流程的工具,适合偏数据处理的场景。

状态机编排

状态机用"状态 + 转换条件"来描述工作流。每个节点是一个状态,转换条件决定下一步去哪个状态。

适用场景:需要循环、条件分支、人工介入、错误恢复的复杂工作流。比如客服机器人(可能多轮对话后转人工)、审批流(可能被打回修改)。

优势:天然支持循环和回退,状态可持久化,失败后可以从上次状态恢复。LangGraph 就是状态机范式的代表。

劣势:状态多了之后管理复杂度上升,需要仔细设计状态转换表,否则容易变成"面条式"流程。

Julep 也采用了状态机式的任务编排模型,支持长期运行的 Agent 任务和复杂的转换逻辑。

可视化构建器

用拖拽界面组装工作流,底层仍然是 DAG 或状态机,但用户不需要写代码来定义流程。

适用场景:非开发者参与流程设计、快速原型验证、标准化模板复用。

优势:降低编排门槛,所见即所得,方便团队协作和知识传递。

劣势:复杂逻辑的可视化表达有上限,版本控制和 diff 不如代码方便,调试能力受限于平台提供的能力。

LangChain Open Agent PlatformReflyVercel Workflow Builder 都在这条路上探索。

生产模式一:指数退避重试

第一个要解决的问题:外部工具调用会失败。API 超时、服务限流、网络抖动……这些都是生产环境的常态。

简单的"重试三次"不够用。你需要的是:可配置的重试策略、退避间隔、可区分的错误类型、重试耗尽后的降级逻辑。

import asyncio
import random
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
from enum import Enum


class ErrorKind(Enum):
    TRANSIENT = "transient"     # 网络超时、限流,可重试
    PERMANENT = "permanent"     # 参数错误、权限拒绝,不应重试
    UNKNOWN = "unknown"         # 不确定,保守重试


@dataclass
class RetryPolicy:
    max_retries: int = 3
    base_delay: float = 1.0          # 秒
    max_delay: float = 60.0          # 秒
    backoff_factor: float = 2.0
    jitter: bool = True

    def get_delay(self, attempt: int) -> float:
        delay = min(self.base_delay * (self.backoff_factor ** attempt), self.max_delay)
        if self.jitter:
            delay *= random.uniform(0.5, 1.5)
        return delay


@dataclass
class RetryResult:
    success: bool
    value: Any = None
    error: Exception | None = None
    attempts: int = 0


def classify_error(exc: Exception) -> ErrorKind:
    """根据异常类型判断是否值得重试"""
    if isinstance(exc, (ConnectionError, TimeoutError, asyncio.TimeoutError)):
        return ErrorKind.TRANSIENT
    if isinstance(exc, (ValueError, TypeError, PermissionError)):
        return ErrorKind.PERMANENT
    status = getattr(exc, "status_code", None)
    if status and int(status) == 429:
        return ErrorKind.TRANSIENT
    if status and int(status) >= 400 and int(status) < 500:
        return ErrorKind.PERMANENT
    return ErrorKind.UNKNOWN


async def with_retry(
    fn: Callable[..., Awaitable[Any]],
    policy: RetryPolicy | None = None,
    on_retry: Callable[[int, Exception, float], Awaitable[None]] | None = None,
) -> RetryResult:
    """带指数退避的异步重试封装"""
    policy = policy or RetryPolicy()

    for attempt in range(policy.max_retries + 1):
        try:
            result = await fn()
            return RetryResult(success=True, value=result, attempts=attempt + 1)
        except Exception as exc:
            kind = classify_error(exc)
            if kind == ErrorKind.PERMANENT:
                return RetryResult(success=False, error=exc, attempts=attempt + 1)

            if attempt < policy.max_retries:
                delay = policy.get_delay(attempt)
                if on_retry:
                    await on_retry(attempt + 1, exc, delay)
                await asyncio.sleep(delay)
            else:
                return RetryResult(success=False, error=exc, attempts=attempt + 1)

    return RetryResult(success=False, error=RuntimeError("unreachable"), attempts=0)


# 使用示例
async def call_search_api(query: str) -> dict:
    """模拟一个会偶尔失败的外部 API"""
    import httpx
    async with httpx.AsyncClient(timeout=10.0) as client:
        resp = await client.get(f"https://api.example.com/search?q={query}")
        resp.raise_for_status()
        return resp.json()


async def main():
    async def log_retry(attempt: int, exc: Exception, delay: float):
        print(f"Retry #{attempt} after {delay:.1f}s: {exc}")

    result = await with_retry(
        lambda: call_search_api("LangGraph tutorial"),
        policy=RetryPolicy(max_retries=3, base_delay=2.0),
        on_retry=log_retry,
    )
    if result.success:
        print(f"Success after {result.attempts} attempt(s)")
    else:
        print(f"Failed after {result.attempts} attempt(s): {result.error}")


if __name__ == "__main__":
    asyncio.run(main())

关键设计点

  • 错误分类(classify_error)避免对永久性错误做无意义重试
  • Jitter 抖动防止多个工作流同时重试造成"惊群"
  • on_retry 回调让你可以记录日志、发告警、更新 UI
  • 返回 RetryResult 结构体而不是抛异常,让调用方决定如何处理失败

生产模式二:人工审批门

很多业务场景下,Agent 不能完全自主决策。比如:执行金额超过阈值的操作、对外发送邮件/消息、修改生产环境配置。这些节点需要"暂停,等人工确认后继续"。

import json
import uuid
from datetime import datetime, timezone
from enum import Enum
from dataclasses import dataclass, field
from typing import Any


class StepStatus(Enum):
    PENDING = "pending"
    WAITING_APPROVAL = "waiting_approval"
    APPROVED = "approved"
    REJECTED = "rejected"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class WorkflowStep:
    step_id: str
    step_type: str
    status: StepStatus = StepStatus.PENDING
    requires_approval: bool = False
    approval_reason: str = ""
    input_data: dict = field(default_factory=dict)
    output_data: dict = field(default_factory=dict)
    error: str = ""


@dataclass
class Workflow:
    workflow_id: str
    name: str
    steps: list[WorkflowStep] = field(default_factory=list)
    current_step_index: int = 0
    created_at: str = ""

    @classmethod
    def create(cls, name: str, step_defs: list[dict]) -> "Workflow":
        steps = []
        for sd in step_defs:
            steps.append(WorkflowStep(
                step_id=str(uuid.uuid4())[:8],
                step_type=sd["type"],
                requires_approval=sd.get("requires_approval", False),
                approval_reason=sd.get("approval_reason", ""),
                input_data=sd.get("input_data", {}),
            ))
        return cls(
            workflow_id=str(uuid.uuid4())[:12],
            name=name,
            steps=steps,
            created_at=datetime.now(timezone.utc).isoformat(),
        )

    def next_pending_step(self) -> WorkflowStep | None:
        for step in self.steps:
            if step.status == StepStatus.PENDING:
                return step
        return None

    def find_approval_step(self, step_id: str) -> WorkflowStep | None:
        for step in self.steps:
            if step.step_id == step_id and step.status == StepStatus.WAITING_APPROVAL:
                return step
        return None


class ApprovalService:
    """管理人工审批的挂起和恢复"""

    def __init__(self):
        self._pending: dict[str, Workflow] = {}

    def request_approval(self, workflow: Workflow, step: WorkflowStep) -> str:
        step.status = StepStatus.WAITING_APPROVAL
        self._pending[step.step_id] = workflow
        return step.step_id

    def approve(self, step_id: str, reviewer: str, comment: str = "") -> WorkflowStep | None:
        workflow = self._pending.pop(step_id, None)
        if not workflow:
            return None
        step = workflow.find_approval_step(step_id)
        if not step:
            return None
        step.status = StepStatus.APPROVED
        step.output_data["approved_by"] = reviewer
        step.output_data["approval_comment"] = comment
        return step

    def reject(self, step_id: str, reviewer: str, reason: str = "") -> WorkflowStep | None:
        workflow = self._pending.pop(step_id, None)
        if not workflow:
            return None
        step = workflow.find_approval_step(step_id)
        if not step:
            return None
        step.status = StepStatus.REJECTED
        step.output_data["rejected_by"] = reviewer
        step.output_data["rejection_reason"] = reason
        return step


# 使用示例
def build_deployment_workflow():
    """构建一个需要人工审批的部署工作流"""
    workflow = Workflow.create("deploy-to-production", [
        {"type": "run_tests", "input_data": {"suite": "full"}},
        {"type": "build_image", "input_data": {"dockerfile": "Dockerfile.prod"}},
        {
            "type": "deploy",
            "requires_approval": True,
            "approval_reason": "Deploying to production environment",
            "input_data": {"environment": "prod", "replicas": 3},
        },
        {"type": "smoke_test", "input_data": {"endpoints": ["/health", "/api/v1/status"]}},
    ])

    approval_service = ApprovalService()

    # 模拟执行前两步
    workflow.steps[0].status = StepStatus.COMPLETED
    workflow.steps[1].status = StepStatus.COMPLETED

    # 第三步需要审批
    deploy_step = workflow.steps[2]
    ticket_id = approval_service.request_approval(workflow, deploy_step)
    print(f"Deployment pending approval. Ticket: {ticket_id}")
    print(f"Step status: {deploy_step.status.value}")

    # 模拟审批通过
    result = approval_service.approve(ticket_id, reviewer="alice", comment="LGTM")
    if result:
        print(f"Approved by {result.output_data['approved_by']}")
        print(f"Step status: {result.status.value}")

    return workflow


if __name__ == "__main__":
    wf = build_deployment_workflow()

关键设计点

  • ApprovalService 把审批状态管理从工作流逻辑中解耦出来,可以用内存、数据库或外部审批系统作为后端
  • 工作流可以序列化并持久化,支持长时间等待(分钟级、小时级甚至天级)
  • reject 不是终点——被拒绝的步骤可以修改参数后重新提交

生产模式三:条件分支与动态工具选择

Agent 工作流最灵活的地方在于:根据中间结果动态决定下一步做什么。这比固定 pipeline 强大得多,但也更容易失控。关键是要把分支逻辑显式化,而不是藏在 Prompt 里让模型"自己判断"。

from dataclasses import dataclass
from typing import Any, Literal
from enum import Enum


class ToolCategory(Enum):
    SEARCH = "search"
    CODE_EXEC = "code_exec"
    DATABASE = "database"
    FILE_IO = "file_io"


@dataclass
class ToolInfo:
    name: str
    category: ToolCategory
    cost_estimate: float        # 预估调用成本 (USD)
    avg_latency_ms: int         # 平均延迟
    reliability: float          # 成功率 0.0 ~ 1.0


TOOL_REGISTRY: dict[str, ToolInfo] = {
    "web_search": ToolInfo("web_search", ToolCategory.SEARCH, 0.002, 800, 0.98),
    "code_interpreter": ToolInfo("code_interpreter", ToolCategory.CODE_EXEC, 0.05, 3000, 0.92),
    "sql_query": ToolInfo("sql_query", ToolCategory.DATABASE, 0.001, 200, 0.99),
    "file_read": ToolInfo("file_read", ToolCategory.FILE_IO, 0.0001, 50, 0.999),
}


@dataclass
class BranchCondition:
    field: str
    operator: Literal["eq", "ne", "gt", "lt", "contains", "in"]
    value: Any


@dataclass
class BranchRoute:
    condition: BranchCondition
    target_step: str
    tools: list[str]


@dataclass
class StepDef:
    name: str
    prompt_template: str
    routes: list[BranchRoute] = None  # type: ignore
    default_next: str | None = None
    default_tools: list[str] | None = None

    def resolve(self, context: dict) -> tuple[str | None, list[str]]:
        """根据运行时上下文决定下一步和可用工具"""
        for route in self.routes or []:
            if _evaluate_condition(route.condition, context):
                return route.target_step, route.tools
        return self.default_next, self.default_tools or []


def _evaluate_condition(cond: BranchCondition, context: dict) -> bool:
    actual = context.get(cond.field)
    match cond.operator:
        case "eq": return actual == cond.value
        case "ne": return actual != cond.value
        case "gt": return actual is not None and actual > cond.value
        case "lt": return actual is not None and actual < cond.value
        case "contains": return cond.value in actual if actual else False
        case "in": return actual in cond.value if actual else False
        case _: return False


def build_rag_workflow() -> dict[str, StepDef]:
    """构建一个带条件分支的 RAG 工作流"""
    return {
        "classify": StepDef(
            name="classify",
            prompt_template="Classify the user query type: {query}",
            routes=[
                BranchRoute(
                    condition=BranchCondition("query_type", "eq", "code"),
                    target_step="code_search",
                    tools=["web_search", "code_interpreter"],
                ),
                BranchRoute(
                    condition=BranchCondition("query_type", "eq", "data"),
                    target_step="data_query",
                    tools=["sql_query", "file_read"],
                ),
                BranchRoute(
                    condition=BranchCondition("query_type", "in", ["general", "factual"]),
                    target_step="web_search",
                    tools=["web_search"],
                ),
            ],
            default_next="web_search",
            default_tools=["web_search"],
        ),
        "code_search": StepDef(
            name="code_search",
            prompt_template="Search for code related to: {query}",
            default_next="generate_answer",
            default_tools=["web_search"],
        ),
        "data_query": StepDef(
            name="data_query",
            prompt_template="Query data for: {query}",
            default_next="generate_answer",
            default_tools=["sql_query"],
        ),
        "web_search": StepDef(
            name="web_search",
            prompt_template="Search the web for: {query}",
            default_next="generate_answer",
            default_tools=["web_search"],
        ),
        "generate_answer": StepDef(
            name="generate_answer",
            prompt_template="Generate a comprehensive answer based on: {context}",
        ),
    }


# 使用示例
def demo():
    workflow = build_rag_workflow()

    test_contexts = [
        {"query": "How to implement binary search in Python?", "query_type": "code"},
        {"query": "What were Q1 sales figures?", "query_type": "data"},
        {"query": "What is the capital of France?", "query_type": "factual"},
        {"query": "Random question", "query_type": "unknown"},
    ]

    for ctx in test_contexts:
        classify_step = workflow["classify"]
        next_step, tools = classify_step.resolve(ctx)
        print(f"Query: {ctx['query'][:50]}...")
        print(f"  Type: {ctx['query_type']} -> Next: {next_step}, Tools: {tools}")
        print()


if __name__ == "__main__":
    demo()

关键设计点

  • 分支条件用结构化数据描述(BranchCondition),而不是藏在 Prompt 文本里。这使得条件可以被测试、审计和版本控制
  • 工具选择和路由目标绑定在一起,确保每条路径只加载它需要的工具,降低误调用风险
  • default_next 兜底保证永远不会进入死胡同

编排范式选择决策表

维度 DAG 状态机 可视化构建器
循环/回退 不支持 原生支持 取决于平台
并行执行 天然支持 需要显式设计 取决于平台
学习曲线
复杂度上限
版本控制友好度 高(代码即配置) 低(JSON/数据库)
调试能力 强(状态可检查) 受限于平台
适合团队 有工程背景 有工程背景 跨职能团队
典型工具 PySpur、Prefect LangGraph、Julep LangChain Open Agent PlatformRefly

简单决策规则

  • 工作流是线性的或只有简单分支 → DAG
  • 工作流有循环、需要持久化状态、需要人工介入 → 状态机
  • 团队中有非工程师需要参与流程设计 → 可视化构建器
  • 三种范式不互斥——复杂系统经常组合使用

三个常见错误

错误一:把 Prompt 当流程控制器

很多项目试图用 Prompt 指挥 LLM 在不同分支间跳转:"如果用户问的是 A,就调用工具 X;如果问的是 B,就调用工具 Y"。这在 demo 阶段看起来能用,但生产环境下,LLM 的指令遵从率不会是 100%。一次误判就会导致错误的工具被调用、数据被写错地方。

正确做法:用代码控制流程分支,Prompt 只负责"理解意图"和"生成内容"。

错误二:忽视 LLM 调用的延迟分布

LLM API 的响应延迟不是正态分布,而是长尾分布。P50 可能是 1 秒,P99 可能是 30 秒甚至超时。如果你按"平均 2 秒"来设计超时策略,大约每 100 次调用就会有几次超时失败。在串联 5 步的工作流中,整体超时概率会显著叠加。

正确做法:按 P99 设计超时,每步设置独立的超时上限,工作流层面也要有总超时。

错误三:无状态设计导致无法恢复

一个 10 步的工作流,在第 7 步失败了。如果你的工作流是无状态的(不保存中间结果),你就必须从头重跑。更糟的是,前 6 步中可能有发送邮件、扣费等不可逆操作。

正确做法:每步完成后持久化状态,失败后从断点恢复。这也是状态机范式在生产环境中更受欢迎的核心原因。

总结

  • 选对范式比选对框架重要。DAG 适合线性 pipeline,状态机适合有循环和审批的复杂流程,可视化构建器适合跨职能团队协作。多数生产系统需要组合使用
  • 错误处理是编排层最重要的职责。区分永久性和暂时性错误,用指数退避加重试,重试耗尽后走降级逻辑——这些应该成为工作流的基础设施,而不是每个节点自己实现
  • 把流程控制和 LLM 推理分开。Prompt 负责理解和生成,代码负责分支、重试、审批。混在一起会同时降低可靠性和可维护性
  • 每一步都要持久化状态。这不是"可以以后优化"的事情,是上线第一天就要做好的事情
  • 用结构化数据描述分支条件,而不是让 LLM 自行判断流程走向。可测试、可审计、可回溯

推荐结合 PySpur(DAG 编排)、Julep(状态机编排)、LangChain Open Agent Platform(可视化构建)和 Refly 来对比不同编排范式的实际体验。