当你的 AI Agent 从 Kafka 消费数据时:那些失效的设计假设
AI Agent 的标准心智模型通常假设采用 HTTP:客户端发送请求,Agent 进行处理,最后返回响应。这种模式清晰、同步、且易于推理。当一个基于 LLM 的函数执行失败时,你会收到一个错误代码;当它成功时,你就可以继续下一步。
一旦你将 HTTP 接口换成 Kafka 主题或 SQS 队列,上述每一个假设都会开始动摇。队列保证的是“至少一次交付”(at-least-once delivery),而你的 Agent 具有随机性。这种组合产生了一些在确定性系统中并不存在的故障模式——而且修复方法也与传统微服务所采用的方法不同。
本文将探讨当 AI Agent 消费消息队列时实际发生的变化:幂等性、顺序性、背压、死信处理,以及一种特定的故障模式——即重播的消息在第二次触发时会导致 Agent 产生不同的行为。
“至少一次”交付与随机消费者的问题
在经典的分布式系统中,“至少一次”交付是可控的,因为确定性函数是幂等的。如果你的函数计算的是 hash(message),那么对同一条消息运行两次会产生相同的输出。重复运行是无害的。
LLM 并不是确定性的。即使设置 temperature=0,由于硬件配置的浮点数非确定性、批处理效应以及供应商侧的实现细节,重复调用同一个提示词(prompt)也可能产生不同的输出。在非零温度下,这种差异性更是一项设计特性,而非 bug。
这打破了队列系统赖以生存的幂等性假设。考虑一个具体的场景:
- 你的 Agent 消费了一条消息,调用了 LLM,并将“摘要 A”写入数据库。
- 在提交偏移量(offset)之前,消费者崩溃了。
- 代理(broker)重新交付了该消息。
- Agent 再次调用 LLM,并生成了“摘要 B”。
- 现在你针对同一来源的同一条消息拥有了两份摘要,且无法得知哪一份才是权威的。
如果是确定性函数,两次运行都会写入相同的值,重复写入是无害的。但对于 LLM,你面临的是数据一致性问题。
解决方法是将幂等性移至模型层之上。在调用 LLM 之前,先检查一个以消息幂等标识符(SQS 原生提供 MessageId;对于 Kafka,你可以通过“主题 + 分区 + 偏移量”构建一个)为键的去重存储。如果结果已存在,则返回缓存值。只有在没有存储结果时才调用模型,并在提交消费者偏移量之前原子化地持久化输出结果。
这种模式——检查、调用、持久化、提交——对于任何会产生副作用的 LLM 消费者来说都是不可逾越的准则。
顺序模糊性比你想象的更糟糕
Kafka 保证分区内的顺序。但在分区之间,这种保证就不复存在了。对于确定性消费者来说,这是可控的:你通过实体键(例如 user_id)进行分区,给定用户的所有事件都会进入同一个分区并按顺序处理。
对于 AI Agent 来说,分区键的选择变成了一个语义决策,而不仅仅是为了性能。如果你的 Agent 需要维护对话状态——追踪用户在消息 3 中说了什么以辅助处理消息 7——那么同一场对话的消息必须进入同一个分区。请使用稳定的对话或会话标识符作为分区键。
如果你在这里搞错了,故障模式会非常隐蔽。假设一个 Agent 正在生成一个多步骤的工作流计划。任务事件按顺序到达:步骤 1、步骤 2、步骤 3。如果这些步骤落在了不同的分区,步骤 3 可能会在步骤 2 之前被处理。Agent 会根据不完整的上下文生成计划。系统不会报错,流水线也会顺利完成,但输出是错误的。
这也是为什么重播(replay)语义值得进行显式测试的原因。大多数 Kafka 问题出现在重播期间,而不是在稳态运行期间。如果你为了修复某个 bug 而从较早的偏移量开始重播,你的 Agent 将重新处理旧事件。任何在任务层面不具备幂等性的 Agent 都会在重播期间产生重复的副作用——重复的工具调用、重复的写入、重复发送的电子邮件。
在预发布环境(staging)中测试你的重播路径,不要等它在生产环境中变成事故响应场景。
背压是消费者延迟,而非 HTTP 429 错误
在基于 HTTP 的系统中,背压(backpressure)来自服务器:当达到负载上限时,返回 429 错误。客户端看到状态码后会退避。
在事件驱动系统中,背压通过消费者延迟(consumer lag)发挥作用。代理(broker)不知道也不关心你的 Agent 处理消息的速度。它只是不断追加事件。消费者按照自己的节奏拉取消息,并在完成后提交偏移量。最新偏移量与消费者已提交偏移量之间的差值就是延迟(lag)。
对于 LLM 工作负载,延迟是主要的健康信号。当你的 Agent 在 SLA 范围内处理任务时,延迟会保持在界限内。当 LLM API 变慢——供应商延迟激增、触发限流、冷启动——消费者就会落后,延迟随之增加。上游生产者不需要知道这一切正在发生;他们只需继续发布消息。队列会自然地缓冲这些突发流量。
这实际上是相对于 HTTP 的架构优势。你不需要跨服务协调背压信号。队列提供了天然的缓冲。
问题在于 Token 成本不会自动降低。如果消费者处理消息的速度超过了你的 LLM 速率限制,会产生供应商返回的 429 错误,并耗费时间进行指数退避——但队列仍在持续填满。你需要在消费者层面实现“Token 感知”的速率限制,而不仅仅是请求层面的限流。
具体来说:在调用 LLM 之前,检查一个追踪每分钟消耗 Token 数的滑动窗口计数器。如果你接近供应商的 TPM(每分钟 Token 数)限制,请暂停消费并等待。仅对完全处理完毕的消息提交偏移量。宁可让延迟暂时增加,也不要将 Token 浪费在无效的重试上。
针对智能体工作线程 (Agent Workers) 的消费者组设计
Kafka 的消费者组模型动态地将分区分配给消费者。当新的消费者加入组时,Broker 会进行再均衡 (rebalance),重新分配分区。这为你提供了水平扩展性:添加更多智能体工作线程,Broker 就会自动分配负载。
对于 LLM 消费者来说,再均衡有一个陷阱。如果你的智能体在触发再均衡时正处于推理中期——例如一个耗时 15 秒的慢速 LLM 调用,而 Broker 的会话超时时间是 10 秒——该消费者就会被从组中剔除。分区会被重新分配给另一个工作线程,后者从上一个提交的偏移量 (offset) 开始重新获取并处理消息,于是你又回到了随机重复 (stochastic-duplicate) 的问题上。
有两种缓解方案:
首先,将 max.poll.interval.ms 设置为一个能够容纳 LLM 最坏情况延迟的值。如果你的第 99 百分位 LLM 调用耗时 30 秒,请预留余量并将间隔设置为 60 秒。
其次,每次拉取 (poll) 时仅获取小批量的消息。与其一次拉取 500 条消息并串行处理,不如拉取 5–10 条,处理完并提交后再拉取更多。这可以保持较高的提交频率,并缩短再均衡期间的暴露窗口。
SQS 中的对应概念是可见性超时 (visibility timeout)。如果你的智能体处理一条消息需要 45 秒,但可见性超时设为 30 秒,那么在处理完成前该消息就会再次变得可见,导致另一个消费者接收它。请将可见性超时设置为至少两倍于预期的最大处理时间,并针对长时 间运行的任务通过编程方式延长它。
针对 AI 失败的死信队列
传统的死信队列 (DLQ) 处理是二元的:一条消息失败 N 次后,进入死信队列,由运维团队介入调查。
AI 智能体的失败则更加微妙。LLM 可能会:
- 因为供应商响应慢而超时(瞬时故障——重试是安全的)
- 返回一个无法通过 Schema 验证的格式错误 JSON 响应(可能可以通过调整 Temperature 安全地重试)
- 使用不存在的幻觉参数调用工具(数据问题——需要人工审核)
- 预订机票,收到超时确认,然后重试并再次预订了另一张机票(不可逆操作——需要补偿逻辑)
如果对所有这些情况都采用统一的“重试后进 DLQ”逻辑,会产生糟糕的结果。实践中正在兴起的是一种两阶段方法:在路由到 DLQ 之前先对失败进行分类。
一些团队现在使用 LLM 本身来对 DLQ 消息进行分拣——提供原始消息、错误信息和智能体的输出,并要求做出路由决策:REPROCESS(重新处理,针对瞬时故障)、HUMAN_REVIEW(人工审核,针对逻辑失败)、COMPENSATE(执行补偿,针对已发生的不可逆操作)。分类结果会与原始消息一起写入审计日志。
这在架构上是合理的,但引入了新的依赖:你的失败处理路径现在也要调用 LLM,而它也可能会失败。确保你的 DLQ 分类器设置了硬超时,并且在分类器本身失效时有一个确定性的备选方案(默认路由到 HUMAN_REVIEW)。
