Skip to main content

Structured Concurrency for AI Pipelines: Why asyncio.gather() Isn't Enough

· 9 min read
Tian Pan
Software Engineer

When an LLM returns three tool calls in a single response, the obvious thing is to run them in parallel. You reach for asyncio.gather(), fan the calls out, collect the results, return them to the model. The code works in testing. It works in staging. Six weeks into production, you start noticing your application holding open HTTP connections it should have released. Token quota is draining faster than usage metrics suggest. Occasionally, a tool that sends an email fires twice.

The underlying issue is not the LLM or the tool — it's the concurrency primitive. asyncio.gather() was not designed for the failure modes that multi-step agent pipelines produce, and using it as the backbone of parallel tool execution creates problems that are invisible until they compound.

The Three Ways gather() Fails You

asyncio.gather() has two modes. With return_exceptions=False, the first exception cancels the group — but it does not wait for in-flight sibling tasks to complete their cleanup. Those tasks keep running until they finish or raise their own exceptions, holding connections, consuming token budget, and potentially committing side effects. With return_exceptions=True, exceptions are folded into the return value list. Unless your caller explicitly checks every item for isinstance(result, Exception), failures are silently swallowed.

Neither mode is right for agent pipelines, but the deeper problem is structural: tasks created with asyncio.create_task() outside a scope boundary have no parent. If the calling coroutine is cancelled — because a request timed out, because the user aborted, because the workflow moved on — the spawned tasks continue running unobserved. They consume LLM API quota, hold HTTP connections, and may execute irreversible tool calls, all with no signal back to anything that could act on the results.

This is the orphaned task problem. In a system running 20 concurrent agents, each spawning 3–5 parallel tool calls per turn, orphaned tasks accumulate quickly. Production data from multi-agent deployments shows error rate increases from under 0.5% (single agent) to 3% and above (multi-agent without proper concurrency control) driven primarily by this category of coordination failure. The bugs are timing-dependent, environment-specific, and show up as vague resource exhaustion rather than clean error signals.

What Structured Concurrency Actually Gives You

Python 3.11 added asyncio.TaskGroup — a scope-based primitive that gives every concurrent task an explicit lifetime. The key invariant: the async with block does not exit until every task spawned inside it has either completed or been cancelled and cleaned up. No orphans.

async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(call_search_api(query))
task_b = tg.create_task(call_lookup_tool(entity))
# Both guaranteed complete or both cancelled before this line

When any task raises, TaskGroup immediately cancels all siblings, waits for their cancellation to propagate fully, then raises an ExceptionGroup containing every non-cancellation exception. The caller can filter by exception type:

try:
async with asyncio.TaskGroup() as tg:
for tc in tool_calls:
tg.create_task(execute_tool(tc))
except* RateLimitError as eg:
handle_rate_limits(eg.exceptions)
except* ToolTimeoutError as eg:
log_partial_results(eg.exceptions)

This is structurally different from gather(). The scope guarantees cleanup even if the calling coroutine is cancelled from outside. There is no window where tasks outlive their parent.

For teams already using anyio (which the OpenAI, Anthropic, and Google Gemini Python SDKs all use internally), anyio.create_task_group() provides the same semantics with an additional primitive worth knowing: move_on_after(). Unlike fail_after(), which raises on timeout, move_on_after() exits the scope cleanly, leaving completed tasks accessible. This enables a pattern critical for production search fan-outs: "take whatever results arrived within 3 seconds, proceed with those."

async with anyio.move_on_after(3.0):
async with anyio.create_task_group() as tg:
for query in queries:
tg.start_soon(fetch_search_result, query, results)
# results contains whatever completed — no exception raised on partial timeout

Parallel Tool Calls: The Production Pattern

When an LLM response contains multiple tool call entries, the correct fan-out pattern is to dispatch all of them inside a single TaskGroup, then feed all results back to the model in one batch. Sequential execution — one tool call, wait, next tool call, wait — is functionally equivalent to not having parallel tool calling at all.

The practical shape:

tool_results = []

async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(execute_tool(tc))
for tc in response.tool_calls
]

# All tasks complete or all are cancelled; no partial commits
for task, tc in zip(tasks, response.tool_calls):
tool_results.append({
"tool_call_id": tc.id,
"content": task.result()
})

The assumption baked into this pattern — that the tool calls are independent — is load-bearing. If Tool B actually depends on Tool A's output but the planner thought they were independent, both execute with stale inputs and the model proceeds confidently with wrong data. IBM Research documented this "silent gray error" pattern as prevalent across multi-agent deployments: no exception is raised, downstream steps compound the error, the final output is wrong. Mitigation requires explicit dependency tracking at the planning layer, which is the core idea behind LLMCompiler (ICML 2024) — generating a DAG of tool calls with dependency annotations before dispatch, enabling up to 3.7x latency improvement over sequential ReAct while preventing the silent dependency violation.

Layering Timeouts Correctly

Single-timeout approaches are wrong for agent pipelines. A 30-second global timeout covers some scenarios but misses others entirely. The correct model is four distinct timeout layers:

Per-tool timeout (5–15 seconds depending on tool type) catches individual slow tools without aborting the whole fan-out group.

Per-fan-out-group timeout covers the entire parallel dispatch — set to the expected p95 latency plus a margin. If you have 5 search tools with p95 of 4 seconds each running in parallel, your group timeout might be 8 seconds (not 20).

Per-turn timeout covers one full LLM reasoning + tool execution cycle. This prevents a single turn from consuming unbounded time in recursive or self-expanding tool calls.

Workflow-level timeout bounds the entire multi-step task. Non-negotiable for any workflow running against external APIs.

With anyio cancel scopes, these nest cleanly without asyncio.timeout() calls tangled across your stack:

async with anyio.fail_after(workflow_timeout):       # Layer 4
async with anyio.fail_after(turn_timeout): # Layer 3
async with anyio.move_on_after(group_timeout): # Layer 2
async with anyio.create_task_group() as tg:
for tc in tool_calls:
tg.start_soon(
anyio.fail_after(tool_timeout)(execute_tool),
tc
)

A note on cancellation cost: when you cancel an in-flight LLM API call, the provider typically continues generating tokens for several hundred milliseconds before the TCP close propagates. Those tokens count toward billing. At high concurrency, aggressive timeout policies can produce a non-trivial volume of "tokens generated but never received." Set timeouts conservatively relative to p99 latencies, not p50.

The Two-Layer Rate Limiter

A single semaphore capped at concurrent request count is insufficient. LLM APIs enforce two independent limits simultaneously: requests per minute (RPM) and tokens per minute (TPM). A naive semaphore with 50 slots can have all 50 active slots sending large prompts that collectively exceed your TPM limit before they each individually finish, producing cascading 429s.

The production approach uses a two-layer control:

  • Request semaphore limits concurrent in-flight requests to your RPM headroom
  • Token bucket tracks rolling TPM consumption and blocks dispatch when approaching the limit

Beyond raw limiting, the critical detail is jitter behavior on 429 responses. If 50 concurrent requests all receive 429s at approximately the same time, exponential backoff with equal jitter causes them all to retry in roughly the same window, triggering another 429 wave. Full jitter — random between 0 and the entire backoff window — spreads retries across the window and prevents the thundering herd. Equal jitter prevents synchronized retries from different starting points but does not prevent synchronization from the same starting point.

Effective semaphore values from production deployments as of 2025: OpenAI at 50 concurrent, Anthropic at 20, Google Gemini at 60 — each combined with token-bucket TPM tracking. These numbers shift by tier and model; treat them as starting points for calibration, not targets.

Where Frameworks Currently Fall Short

Framework-level concurrency abstractions have their own failure modes worth knowing before you rely on them.

LangGraph expresses parallel execution as multiple edges from a single node converging at a join node. Internally, ToolNode uses asyncio.gather() to dispatch parallel tools — which means it inherits the partial-cleanup behavior described above. A December 2025 bug (issues #6624, #6626) makes this more acute: when tools in the same ToolNode call interrupt() in parallel, they produce identical interrupt IDs (generated by hashing the checkpoint namespace, which is shared). Human-in-the-loop workflows that need to resume after parallel interrupts cannot distinguish which interrupt is which. The practical implication: avoid parallel tool calls through ToolNode in any workflow that uses human approval gates until this is patched.

LangChain provides no native structured concurrency. The async_execution=True flag enables non-blocking dispatch but provides no automatic cancellation propagation. You are responsible for wrapping chain invocations in a TaskGroup if you want cleanup guarantees.

Temporal solves the problem at a different level. Rather than managing concurrency primitives, Temporal records every LLM call, tool invocation, and API call in a durable event log. Crashes and restarts replay deterministically without re-executing completed steps. The OpenAI Agents SDK and several production coding agents run on Temporal specifically because it eliminates the category of failure where an in-flight call succeeds but the response is lost before it can be recorded. The September 2025 integration between Temporal and the OpenAI Agents SDK made this pattern significantly easier to adopt without custom infrastructure.

The Deeper Lesson

The root issue is that most agent frameworks were built by abstracting LLM calls, not by designing correct concurrency semantics first. asyncio.gather() was the obvious choice because it works for simple use cases — but agent pipelines are not simple. They involve side-effecting tools, rate-limited external APIs, partially resumable state, and timeout budgets that span multiple layers of abstraction. Unstructured concurrency produces bugs that are invisible during development and expensive in production.

The path forward is to treat the concurrency model as part of the architecture, not an implementation detail. Use TaskGroup or anyio task groups as the only permitted mechanism for fan-out. Layer timeouts explicitly at each scope level. Build the two-layer rate limiter before you hit production 429s. And if you are building a workflow that needs durable, resumable execution across multi-step parallel branches, evaluate whether Temporal or a comparable workflow engine belongs in your stack — because structured concurrency within a single process only solves half the problem.

Parallel tool calls are one of the genuinely useful things LLMs can do. Getting the concurrency model right is what makes them useful in production rather than a source of subtle, expensive failures.

References:Let's stay in touch and Follow me for more thoughts and updates