跳到主要内容

AI 流水线中的结构化并发:为什么 asyncio.gather() 还不够

· 阅读需 11 分钟
Tian Pan
Software Engineer

当 LLM 在单次响应中返回三个工具调用时,最显而易见的方法是并行运行它们。你会想到 asyncio.gather(),分发调用,收集结果,然后将它们返回给模型。代码在测试中运行正常,在预发环境中也表现良好。但在生产环境运行六周后,你开始注意到应用程序持有本应释放的 HTTP 连接。Token 配额的消耗速度快于使用指标所显示的。偶尔,一个发送电子邮件的工具会触发两次。

根本问题不在于 LLM 或工具 —— 而在于并发原语。asyncio.gather() 并非为多步智能体管道产生的故障模式而设计,将其作为并行工具执行的核心,会产生在问题复合之前难以察觉的问题。

gather() 让你失望的三种方式

asyncio.gather() 有两种模式。在 return_exceptions=False 时,第一个异常会取消整个任务组 —— 但它不会等待正在运行的兄弟任务完成清理。这些任务会一直运行,直到它们完成或抛出自己的异常,在此期间持续持有连接、消耗 Token 预算,并可能产生副作用。在 return_exceptions=True 时,异常会被合并到返回值列表中。除非调用方显式检查每个项是否为 isinstance(result, Exception),否则失败将被静默吞没。

这两种模式都不适用于智能体管道,但更深层的问题是结构性的:在作用域边界外使用 asyncio.create_task() 创建的任务没有父级。如果调用的协程被取消 —— 无论是由于请求超时、用户终止,还是因为工作流已经进入下一步 —— 生成的任务仍会在不受监控的情况下继续运行。它们消耗 LLM API 配额,持有 HTTP 连接,并可能执行不可逆的工具调用,而所有这些都不会向任何能处理结果的组件发送信号。

这就是孤儿任务(orphaned task)问题。在一个运行 20 个并发智能体的系统中,如果每个智能体每轮生成 3–5 个并行工具调用,孤儿任务会迅速累积。来自多智能体部署的生产数据显示,主要由于这类协调故障,错误率会从低于 0.5%(单智能体)上升到 3% 及以上(没有适当并发控制的多智能体)。这些 Bug 与时序相关、特定于环境,且通常表现为模糊的资源耗尽,而非清晰的错误信号。

结构化并发到底能给你带来什么

Python 3.11 添加了 asyncio.TaskGroup —— 一个基于作用域的原语,它为每个并发任务提供了明确的生命周期。关键的不变量是:在其中生成的所有任务完成或被取消并清理之前,async with 块不会退出。没有孤儿任务。

async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(call_search_api(query))
task_b = tg.create_task(call_lookup_tool(entity))
# 在这一行之前,保证两者都已完成或都已被取消

当任何任务抛出异常时,TaskGroup 会立即取消所有兄弟任务,等待它们的取消完全传播,然后抛出一个包含所有非取消异常的 ExceptionGroup。调用方可以按异常类型进行过滤:

try:
async with asyncio.TaskGroup() as tg:
for tc in tool_calls:
tg.create_task(execute_tool(tc))
except* RateLimitError as eg:
handle_rate_limits(eg.exceptions)
except* ToolTimeoutError as eg:
log_partial_results(eg.exceptions)

这在结构上与 gather() 不同。即使调用方协程从外部被取消,该作用域也能保证清理。不存在任务寿命超过其父级的窗口期。

对于已经使用 anyio 的团队(OpenAI、Anthropic 和 Google Gemini 的 Python SDK 内部都在使用),anyio.create_task_group() 提供了相同的语义,并附带一个值得了解的额外原语:move_on_after()。与超时抛出异常的 fail_after() 不同,move_on_after() 会干净地退出作用域,保留已完成的任务。这使得一种对生产环境搜索分发(fan-out)至关重要的模式成为可能:“获取 3 秒内到达的任何结果,并以此继续。”

async with anyio.move_on_after(3.0):
async with anyio.create_task_group() as tg:
for query in queries:
tg.start_soon(fetch_search_result, query, results)
# results 包含所有已完成的内容 —— 即使部分超时也不会抛出异常

并行工具调用:生产环境模式

当 LLM 响应包含多个工具调用项时,正确的分发模式是在单个 TaskGroup 中分派所有调用,然后将所有结果批量反馈给模型。顺序执行 —— 调用工具、等待、调用下一个工具、等待 —— 在功能上等同于完全没有并行工具调用。

实际形态如下:

tool_results = []

async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(execute_tool(tc))
for tc in response.tool_calls
]

# 所有任务完成或全部被取消;没有部分提交
for task, tc in zip(tasks, response.tool_calls):
tool_results.append({
"tool_call_id": tc.id,
"content": task.result()
})

这种模式所基于的假设 —— 即工具调用是相互独立的 —— 是系统的承重结构。如果工具 B 实际上依赖于工具 A 的输出,但规划器认为它们是独立的,那么两者都会使用陈旧的输入执行,模型会自信地处理错误数据。IBM Research 记录了这种“静默灰色错误”模式在多智能体部署中非常普遍:没有抛出异常,下游步骤使错误复合,最终输出是错误的。缓解方案需要在规划层进行显式的依赖跟踪,这也是 LLMCompiler (ICML 2024) 的核心思想 —— 在分发之前生成带有依赖注释的工具调用 DAG,在防止静默依赖违规的同时,实现比顺序 ReAct 高达 3.7 倍的延迟提升。

正确设置分层超时

对于智能体管道(agent pipelines)而言,单一超时方法是错误的。30 秒的全局超时虽然能覆盖某些场景,但会完全忽略其他场景。正确的模型是四个不同的超时层:

逐个工具超时(根据工具类型设置 5–15 秒)可以在不中止整个扇出组(fan-out group)的情况下,捕获单个运行缓慢的工具。

扇出组(fan-out-group)超时覆盖整个并行调度——设置为预期 p95 延迟加上一定的余量。如果你有 5 个搜索工具在并行运行,每个工具的 p95 为 4 秒,那么你的组超时应该是 8 秒(而不是 20 秒)。

逐轮(per-turn)超时覆盖一个完整的 LLM 推理 + 工具执行周期。这可以防止单轮对话在递归或自我扩展的工具调用中消耗无限的时间。

工作流级超时限制整个多步骤任务。对于任何调用外部 API 的工作流,这都是不可协商的必要条件。

使用 anyio 的取消范围(cancel scopes),这些超时可以干净地嵌套,而不会让 asyncio.timeout() 调用在你的技术栈中缠成一团:

async with anyio.fail_after(workflow_timeout):       # 第 4 层
async with anyio.fail_after(turn_timeout): # 第 3 层
async with anyio.move_on_after(group_timeout): # 第 2 层
async with anyio.create_task_group() as tg:
for tc in tool_calls:
tg.start_soon(
anyio.fail_after(tool_timeout)(execute_tool),
tc
)

关于取消成本的说明:当你取消一个正在进行的 LLM API 调用时,提供商通常会在 TCP 关闭信号传播之前继续生成 Token 数百毫秒。这些 Token 仍会计入账单。在高并发情况下,激进的超时策略可能会产生大量“已生成但从未接收”的 Token。请根据 p99 延迟而非 p50 延迟来保守地设置超时。

双层速率限制器

仅限制并发请求数量的单一信号量是不够的。LLM API 同时强制执行两个独立的限制:每分钟请求数 (RPM) 和每分钟 Token 数 (TPM)。一个拥有 50 个插槽的朴素信号量可能会出现这种情况:所有 50 个活跃插槽都在发送大型 Prompt,这些 Prompt 在各自完成之前,总和就已经超过了你的 TPM 限制,从而导致连锁的 429 错误。

生产环境的方法是使用双层控制:

  • 请求信号量 (Request semaphore):将并发的在途请求限制在你的 RPM 余量内。
  • 令牌桶 (Token bucket):追踪滚动 TPM 消耗,并在接近限制时阻止调度。

除了原始的限制之外,关键细节是处理 429 响应时的抖动(jitter)行为。如果 50 个并发请求几乎同时收到 429 错误,带有相等抖动的指数退避会导致它们都在大致相同的时间窗内重试,从而引发另一波 429 浪潮。全抖动(Full jitter)——在 0 到整个退避窗口之间随机取值——可以将重试分散到整个窗口中,防止惊群效应(thundering herd)。相等抖动可以防止来自不同起点同步重试,但不能防止来自相同起点的同步。

截至 2025 年生产部署中有效的信号量参考值:OpenAI 为 50 个并发,Anthropic 为 20 个,Google Gemini 为 60 个——每个都结合了令牌桶 TPM 追踪。这些数字会根据层级和模型而变化;请将它们视为校准的起点,而非最终目标。

当前框架的不足之处

框架级的并发抽象有其自身的故障模式,在你依赖它们之前值得了解。

LangGraph 将并行执行表达为从单个节点出发并汇聚在合并节点的多个边。在内部,ToolNode 使用 asyncio.gather() 来调度并行工具——这意味着它继承了上文提到的部分清理行为。2025 年 12 月的一个 Bug(issues #6624, #6626)使这一点更加突出:当同一个 ToolNode 中的工具并行调用 interrupt() 时,它们会产生相同的打断 ID(通过哈希检查点命名空间生成,而该空间是共享的)。需要在并行打断后恢复的人机回圈(human-in-the-loop)工作流无法区分哪个打断是哪个。实际影响是:在修复补丁发布之前,在任何使用人工审批门控的工作流中,都要避免通过 ToolNode 进行并行工具调用。

LangChain 没有提供原生的结构化并发。async_execution=True 标志允许非阻塞调度,但没有提供自动的取消传播。如果你想要清理保证,你需要负责将 Chain 调用封装在 TaskGroup 中。

Temporal 在不同的层面上解决了这个问题。Temporal 不管理并发原语,而是将每一次 LLM 调用、工具调用和 API 调用记录在持久的事件日志中。崩溃和重启会确定性地重放,而不会重新执行已完成的步骤。OpenAI Agents SDK 和几个生产级编码智能体专门运行在 Temporal 上,因为它消除了这样一种故障类型:即在途调用成功了,但响应在被记录之前就丢失了。2025 年 9 月 Temporal 与 OpenAI Agents SDK 的集成,使得在没有自定义基础设施的情况下采用这种模式变得容易得多。

更深层的教训

根源问题在于,大多数 Agent 框架是通过抽象 LLM 调用来构建的,而不是优先设计正确的并发语义。asyncio.gather() 是一个显而易见的选择,因为它适用于简单的用例——但 Agent 流水线并不简单。它们涉及带有副作用的工具、受速率限制的外部 API、部分可恢复的状态,以及跨越多个抽象层的超时预算。非结构化并发产生的 bug 在开发期间是隐形的,但在生产环境中却代价高昂。

未来的方向是将并发模型视为架构的一部分,而不是实现细节。使用 TaskGroup 或 anyio 任务组作为扇出的唯一允许机制。在每个作用域级别显式地分层设置超时。在遇到生产环境的 429 错误之前,先构建双层速率限制器。如果你正在构建一个需要在多步并行分支中实现持久、可恢复执行的工作流,请评估 Temporal 或类似的工作流引擎是否适合你的技术栈——因为单进程内的结构化并发只能解决一半的问题。

并行工具调用是 LLM 真正有用的功能之一。正确处理并发模型才能让它们在生产环境中发挥作用,而不是成为隐蔽且代价高昂的故障源。

References:Let's stay in touch and Follow me for more thoughts and updates