跳到主要内容

当你的 AI Agent 从 Kafka 消费数据时:那些失效的设计假设

· 阅读需 14 分钟
Tian Pan
Software Engineer

AI Agent 的标准心智模型通常假设采用 HTTP:客户端发送请求,Agent 进行处理,最后返回响应。这种模式清晰、同步、且易于推理。当一个基于 LLM 的函数执行失败时,你会收到一个错误代码;当它成功时,你就可以继续下一步。

一旦你将 HTTP 接口换成 Kafka 主题或 SQS 队列,上述每一个假设都会开始动摇。队列保证的是“至少一次交付”(at-least-once delivery),而你的 Agent 具有随机性。这种组合产生了一些在确定性系统中并不存在的故障模式——而且修复方法也与传统微服务所采用的方法不同。

本文将探讨当 AI Agent 消费消息队列时实际发生的变化:幂等性、顺序性、背压、死信处理,以及一种特定的故障模式——即重播的消息在第二次触发时会导致 Agent 产生不同的行为。

“至少一次”交付与随机消费者的问题

在经典的分布式系统中,“至少一次”交付是可控的,因为确定性函数是幂等的。如果你的函数计算的是 hash(message),那么对同一条消息运行两次会产生相同的输出。重复运行是无害的。

LLM 并不是确定性的。即使设置 temperature=0,由于硬件配置的浮点数非确定性、批处理效应以及供应商侧的实现细节,重复调用同一个提示词(prompt)也可能产生不同的输出。在非零温度下,这种差异性更是一项设计特性,而非 bug。

这打破了队列系统赖以生存的幂等性假设。考虑一个具体的场景:

  1. 你的 Agent 消费了一条消息,调用了 LLM,并将“摘要 A”写入数据库。
  2. 在提交偏移量(offset)之前,消费者崩溃了。
  3. 代理(broker)重新交付了该消息。
  4. Agent 再次调用 LLM,并生成了“摘要 B”。
  5. 现在你针对同一来源的同一条消息拥有了两份摘要,且无法得知哪一份才是权威的。

如果是确定性函数,两次运行都会写入相同的值,重复写入是无害的。但对于 LLM,你面临的是数据一致性问题。

解决方法是将幂等性移至模型层之上。在调用 LLM 之前,先检查一个以消息幂等标识符(SQS 原生提供 MessageId;对于 Kafka,你可以通过“主题 + 分区 + 偏移量”构建一个)为键的去重存储。如果结果已存在,则返回缓存值。只有在没有存储结果时才调用模型,并在提交消费者偏移量之前原子化地持久化输出结果。

这种模式——检查、调用、持久化、提交——对于任何会产生副作用的 LLM 消费者来说都是不可逾越的准则。

顺序模糊性比你想象的更糟糕

Kafka 保证分区内的顺序。但在分区之间,这种保证就不复存在了。对于确定性消费者来说,这是可控的:你通过实体键(例如 user_id)进行分区,给定用户的所有事件都会进入同一个分区并按顺序处理。

对于 AI Agent 来说,分区键的选择变成了一个语义决策,而不仅仅是为了性能。如果你的 Agent 需要维护对话状态——追踪用户在消息 3 中说了什么以辅助处理消息 7——那么同一场对话的消息必须进入同一个分区。请使用稳定的对话或会话标识符作为分区键。

如果你在这里搞错了,故障模式会非常隐蔽。假设一个 Agent 正在生成一个多步骤的工作流计划。任务事件按顺序到达:步骤 1、步骤 2、步骤 3。如果这些步骤落在了不同的分区,步骤 3 可能会在步骤 2 之前被处理。Agent 会根据不完整的上下文生成计划。系统不会报错,流水线也会顺利完成,但输出是错误的。

这也是为什么重播(replay)语义值得进行显式测试的原因。大多数 Kafka 问题出现在重播期间,而不是在稳态运行期间。如果你为了修复某个 bug 而从较早的偏移量开始重播,你的 Agent 将重新处理旧事件。任何在任务层面不具备幂等性的 Agent 都会在重播期间产生重复的副作用——重复的工具调用、重复的写入、重复发送的电子邮件。

在预发布环境(staging)中测试你的重播路径,不要等它在生产环境中变成事故响应场景。

背压是消费者延迟,而非 HTTP 429 错误

在基于 HTTP 的系统中,背压(backpressure)来自服务器:当达到负载上限时,返回 429 错误。客户端看到状态码后会退避。

在事件驱动系统中,背压通过消费者延迟(consumer lag)发挥作用。代理(broker)不知道也不关心你的 Agent 处理消息的速度。它只是不断追加事件。消费者按照自己的节奏拉取消息,并在完成后提交偏移量。最新偏移量与消费者已提交偏移量之间的差值就是延迟(lag)。

对于 LLM 工作负载,延迟是主要的健康信号。当你的 Agent 在 SLA 范围内处理任务时,延迟会保持在界限内。当 LLM API 变慢——供应商延迟激增、触发限流、冷启动——消费者就会落后,延迟随之增加。上游生产者不需要知道这一切正在发生;他们只需继续发布消息。队列会自然地缓冲这些突发流量。

这实际上是相对于 HTTP 的架构优势。你不需要跨服务协调背压信号。队列提供了天然的缓冲。

问题在于 Token 成本不会自动降低。如果消费者处理消息的速度超过了你的 LLM 速率限制,会产生供应商返回的 429 错误,并耗费时间进行指数退避——但队列仍在持续填满。你需要在消费者层面实现“Token 感知”的速率限制,而不仅仅是请求层面的限流。

具体来说:在调用 LLM 之前,检查一个追踪每分钟消耗 Token 数的滑动窗口计数器。如果你接近供应商的 TPM(每分钟 Token 数)限制,请暂停消费并等待。仅对完全处理完毕的消息提交偏移量。宁可让延迟暂时增加,也不要将 Token 浪费在无效的重试上。

针对智能体工作线程 (Agent Workers) 的消费者组设计

Kafka 的消费者组模型动态地将分区分配给消费者。当新的消费者加入组时,Broker 会进行再均衡 (rebalance),重新分配分区。这为你提供了水平扩展性:添加更多智能体工作线程,Broker 就会自动分配负载。

对于 LLM 消费者来说,再均衡有一个陷阱。如果你的智能体在触发再均衡时正处于推理中期——例如一个耗时 15 秒的慢速 LLM 调用,而 Broker 的会话超时时间是 10 秒——该消费者就会被从组中剔除。分区会被重新分配给另一个工作线程,后者从上一个提交的偏移量 (offset) 开始重新获取并处理消息,于是你又回到了随机重复 (stochastic-duplicate) 的问题上。

有两种缓解方案:

首先,将 max.poll.interval.ms 设置为一个能够容纳 LLM 最坏情况延迟的值。如果你的第 99 百分位 LLM 调用耗时 30 秒,请预留余量并将间隔设置为 60 秒。

加载中…
References:Let's stay in touch and Follow me for more thoughts and updates