跳到主要内容

批处理 LLM 流水线的盲点:离线处理与无人提及的队列设计

· 阅读需 14 分钟
Tian Pan
Software Engineer

大多数使用 LLM 构建产品的团队都在针对错误的工作负载进行优化。他们过分痴迷于首个 token 生成时间(time-to-first-token)、流式传输延迟和响应速度——结果却发现,其 LLM API 支出的 60% 或更多实际上流向了无人实时监控的夜间摘要任务、数据扩充流水线和分类运行。适用于聊天应用的“延迟优先”思维模式正在主动破坏这些离线工作负载。

LLM 批处理流水线是生产环境 AI 中那些不起眼但至关重要的“劳模”。它是每晚对 50,000 张工单进行分类的任务,是每周用公司描述丰富 CRM 的流水线,也是每天为新文档生成嵌入(embeddings)的运行任务。这些工作负载的设计约束与实时服务有着本质的不同。如果将它们视为聊天 API 的“慢速版本”,问题就由此产生了。

OLTP/OLAP 的分化也出现在了 LLM 基础设施中

数据库工程师在几十年前就吸取了这个教训:事务性工作负载和分析性工作负载需要不同的架构。你不会针对生产环境的 OLTP 数据库运行夜间报表查询,也不会为了单行插入而优化数据仓库。

同样的差异也存在于 LLM 基础设施中,但大多数团队尚未内化这一点。实时 LLM 服务优化的是延迟——保持 GPU 热启动、维持缓存命中、尽量缩短首个 token 生成时间。而 LLM 批处理优化的是吞吐量和成本——密集地打包请求、容忍更高的单次请求延迟,并利用非高峰时段的计算价格。

当团队将实时思维模式应用于批处理任务时,他们会犯下一些显而易见的错误:

  • 为不需要的延迟过度配置资源。 如果能将账单减半,一个运行 2 小时而非 20 分钟的夜间分类任务是完全可以接受的。
  • 构建同步重试循环而不是持久化队列。 一个带有 try/excepttime.sleepfor 循环并不叫批处理基础设施。
  • 忽视故障粒度。 当 10,000 个项目中的第 847 个失败时,你是重试整个批处理还是只重试该项目?大多数团队直到凌晨 3 点遇到问题时才会有答案。
  • 缺乏单次任务的成本分摊。 每月的 LLM 账单只是一个总数,没人知道摘要流水线到底花了 200 美元还是 2,000 美元。

任务规模(Job Sizing):大多数团队忽略的首个设计决策

批处理任务应该有多大?这个答案决定了下游的一切——故障波及范围、检查点频率、内存压力以及重试成本。

太小(每个任务仅包含一次 LLM 调用)会导致你淹没在队列开销中。任务管理、状态跟踪和结果聚合所耗费的工程精力将超过实际的推理过程。

太大(一个任务包含整个数据集)意味着单次失败就需要重新处理所有内容。你还会失去在不同 worker 或提供商之间进行并行处理的能力。

最佳平衡点取决于你的故障容忍度和提供商的约束。OpenAI 和 Anthropic 的原生批处理 API 分别支持每批多达 50,000 和 10,000 个请求,但这并不意味着你应该直接将这些数值作为任务规模。请考虑以下因素:

  • 检查点频率。 每完成 N 个请求保存一次状态。如果进程崩溃,你最多只会损失 N 个项目的工作量。25–100 个项目的检查点间隔可以在 I/O 成本和恢复速度之间取得平衡。
  • 内存压力。 为 50,000 个具有丰富输出的项目在内存中累积结果最终会导致问题。请在结果完成后将其以流式方式存入存储。
  • 提供商速率限制。 即使是批处理 API 也有吞吐量限制。根据你的速率配额来设定任务规模,可以防止队列堆积。
  • 成本透明度。 一个运行 6 小时且成本未知的任务在运维上是不可见的。将其拆分为子批次并进行单批次成本跟踪,可以使经济账变得清晰。

一个务实的起点:每批 500–2,000 个项目,每完成 25–50 个请求进行一次检查点保存。根据你的单项延迟和失败率进行调整。

真正有效的队列架构

大多数 LLM 代码库中典型的“批处理”实现通常是这样的:一个带有 for 循环的 Python 脚本、轮换 API 密钥、指数退避、一个用于跟踪进度的 CSV 文件,以及祈祷进程在夜间不要崩溃。据报道,团队在构建和维护这些脆弱的脚本上花费了数百个工时。

一个完善 LLM 批处理流水线需要四个组件:

1. 持久化任务队列。 基于 Redis 的队列(BullMQ, Celery, RQ)或云原生方案(SQS, Cloud Tasks)能为你提供持久化、重试语义和并发控制。核心属性是:如果你的 worker 崩溃,不会丢失任何工作。任务保留在队列中,由另一个 worker 接手。

2. 可配置的并发和速率限制。 LLM API 有每分钟 token 和请求数的限制。你的队列 worker 需要在尊重这些限制的同时,最大限度地提高吞吐量。对于大多数 API 来说,以每分钟 40 个请求的限制运行 8 个并发 worker 是一个合理的起点。

3. 死信处理。 在用尽重试次数后,永久失败的项目需要被移动到可供检查的地方。内容政策违规、格式错误的输入和意外的 schema 变更都会产生不可重试的错误。死信队列可以将这些错误与重试即可成功的瞬时故障(如速率限制、超时、服务器错误)区分开来。

4. 具有“精确一次”(Exactly-once)语义的结果聚合。 使用任务的唯一 ID 作为幂等键(idempotency key)。如果同一项目因重试被处理了两次,第二次结果应该覆盖第一次,而不是创建重复项。这对于数据扩充流水线尤为重要,因为重复数据会破坏下游的分析结果。

断点续传:拯救你深夜的模式

最常见的批处理流水线故障模式:你的 10,000 个任务在处理到第 6,847 个项目时,因为一个瞬时 API 错误而崩溃,而你无法从中断的地方恢复。你不得不重新处理那 6,846 个已经付过费的项目。

断点续传(Checkpoint-resume)就是解决方案,而且它比大多数团队想象的要简单。其模式如下:

  1. 在处理开始前为每个项目分配一个唯一 ID。像 batchId:itemIndex 这样的复合键非常有效。
  2. 将已完成的项目 ID 持久化到可靠存储中(数据库行、断点文件或 Redis 集合),在每次完成或每完成 N 个项目后进行记录。
  3. 在启动时加载断点,跳过那些已标记为完成的项目。
  4. 使用原子写入进行断点持久化。先写入临时文件,然后重命名。这可以防止因崩溃期间的局部写入而导致断点文件损坏。

断点文件应跟踪:已完成的项目 ID、累计 Token 使用量、累计成本、按类型统计的错误计数以及时间戳。这既能让你拥有续传能力,又能为该批次任务提供运行成本账本。

对于使用原生批处理 API(如 OpenAI Batch API、Anthropic Message Batches)的团队,供应商会在内部处理断点——你只需提交批次并轮询完成状态。但这样做会失去对单个项目进度的可见性,并且无法针对批次内的单个失败实现自定义重试逻辑。

成本归因:让隐形支出可见

这里有一个让大多数团队感到惊讶的数字:一个每天处理 10,000 个项目的增强流水线,如果通过 Claude Sonnet 以每项约 500 个输入 Token 和 100 个输出 Token 处理,按实时定价计算,每次运行成本约为 22.50 美元,使用批处理 API 则为 11.25 美元。一个月下来,就是 337 至 675 美元。

现在,请将这个数字乘以你组织中的每一个批处理流水线。内容分类任务、文档摘要流水线、向量嵌入生成任务、合成数据增强任务。每一个都有其成本,如果没有针对每个作业的归因,你就像在盲目飞行。

在三个层面实施成本跟踪:

  • 单项目(Per-item): 记录每个请求的输入 Token、输出 Token 和使用的模型。这能让你识别出成本异常高的项目(长文档、复杂的分类)。
  • 单批次(Per-batch): 将项目成本汇总为批次总计。通过对比不同运行批次来发现成本漂移——源数据中的架构更改可能会在没人察觉的情况下让你的平均 Token 数翻倍。
  • 单流水线(Per-pipeline): 按流水线跟踪每周和每月的成本。这些数据将进入你的 FinOps 仪表板和容量规划电子表格。

为每个 LLM 请求附加元数据标签(流水线名称、团队、环境)。Langfuse、Braintrust 和 LangWatch 等可观测性平台可以按任何维度分解支出。相比之下,试图从一张每月的 API 总账单中反推成本是一个随着流水线增加而变得越来越棘手的问题。

改变重试策略的故障分类

并非所有的批处理失败都是平等的,你的重试策略应该反映这一点。LLM 批处理流水线会遇到三类不同的故障:

瞬时故障(Transient failures)(立即重试或使用退避策略):HTTP 429 速率限制、HTTP 5xx 服务器错误、连接超时、网络重置。这些故障在重试时会成功。使用带有抖动(jitter)的指数退避,以避免在供应商恢复时产生惊群效应。

确定性故障(Deterministic failures)(不要重试,路由至死信队列):内容策略违规、永远无法解析的格式错误输入、架构验证错误。重试这些只会浪费金钱和配额。通过错误代码检测它们,并直接路由到你的死信队列。

质量故障(Quality failures)(最危险的类别):API 返回了 200 状态码,但输出是垃圾——幻觉分类、截断的摘要、虽然能解析但包含错误数据的 JSON。标准重试逻辑对这些是不可见的。你需要输出验证:结构化输出的架构检查、自由文本的长度限制、分类的置信度阈值。

质量故障类别是批处理流水线与实时服务区别最大的地方。在聊天应用中,用户可以通过重新提问或表达困惑来提供质量反馈。但在批处理流水线中,糟糕的输出会悄无声息地流入你的数据库并污染下游系统。在 LLM 响应和数据存储之间设置一个验证网关是必选项。

当延迟优先的思维模型失效时

批处理流水线设计中最昂贵的错误是为你并不需要的延迟进行优化。这通常表现为:

为批处理作业使用流式响应。 流式传输增加了实现复杂度,且当输出直接进入数据库时,它没有任何好处。为批处理工作负载使用非流式端点——它们更简单,有时甚至更便宜。

为低频作业保持提示词缓存预热。 当同一个前缀在短时间内被反复使用时,提示词缓存(Prompt caching)才有效。每 24 小时运行一次的夜间作业无法从生存时间(TTL)仅为 5 分钟的缓存中获益。不要围绕你永远无法命中的缓存来设计架构。

在慢速模型足以胜任时使用最快的模型。 一个使用 Haiku 就能完成得很好的分类任务不需要 Sonnet。批处理让你有奢侈的空间在真实数据上对不同模型进行 A/B 测试——利用这一点找到满足你质量标准的成本最低的模型。

为批处理作业进行实时监控。 对于每天只运行一次的作业,你不需要一个每 5 秒更新一次的 Grafana 仪表板。一个包含汇总统计信息(处理项目数、错误、成本、持续时间)的完成通知就足够了。把实时仪表板留给你的服务基础设施。

可扩展的架构

一个生产级的批量 LLM 流水线,无论具体的队列技术如何,其架构模式通常都是一致的:

调度器 (Scheduler) 根据 cron 定时任务或响应事件(如新数据到达、上游流水线完成)触发作业。它创建作业定义 —— 包括输入数据、模型配置、输出目的地 —— 并将它们提交到队列中。

工作线程池 (Worker pool) 从队列中拉取作业,在并发控制和速率限制下调用 LLM API,验证输出,并将结果写入存储。工作线程是无状态的;任何工作线程都可以处理任何作业。这实现了水平扩展和容错能力。

结果存储 (Result store) 以幂等性保证累积输出。无论是数据库、数据湖还是对象存储,写入操作都以项目 ID 作为键,因此不可能出现重复。

监控器 (Monitor) 跟踪批处理进度、成本累积、错误率和持续时间。当错误率超过阈值或成本与历史基线发生显著偏差时,它会发送告警。

调度器和工作线程应该是独立的进程。这允许你从应用程序提交作业,而工作线程独立地处理它们,任何一方都可以进行扩展或重启而不影响另一方。它还能防止常见的故障模式,即工作线程崩溃导致调度器宕机,从而无法创建新作业。

论证投资的必要性

如果你的团队在任何非交互场景下每天处理超过几百个 LLM 请求,那么你就有了一个批处理工作负载。将其视为批处理可以节省资金(原生批量 API 可节省 50%,非高峰时段的自托管推理则更多)、提高可靠性(持久化队列优于重试循环),并使成本可见(按流水线归因优于对每月账单进行“考古”分析)。

这种转变在技术上并不困难。难点在于组织层面:意识到某人六个月前在 Jupyter notebook 中编写的夜间脚本现在已是一个关键的数据流水线,理应拥有队列基础设施、监控和成本跟踪。批量 LLM 流水线是生产级的基础设施。是时候以这种方式对待它了。

References:Let's stay in touch and Follow me for more thoughts and updates