跳到主要内容

主动型 Agent:后台 AI 的事件驱动与定时自动化

· 阅读需 12 分钟
Tian Pan
Software Engineer

几乎所有关于构建 AI Agent 的教程都以同样的方式开场:用户输入消息,Agent 进行推理,Agent 返回响应。这个模型对聊天机器人和副驾驶(Copilot)来说运行良好,却无法描述各组织正在大规模部署的大多数生产 AI 工作。

在企业环境中默默发挥最大价值的 Agent,并不等待消息。它们在数据库行发生变更时唤醒,在队列深度超过阈值时唤醒,在凌晨 3 点的定时任务触发时唤醒,或在监控检测到指标漂移超出范围时唤醒。它们在没有用户在场的情况下行动。一旦失败,没有人会察觉,直到损失已经累积到难以挽回。

构建这类主动型 Agent 需要一套与构建被动式助手截然不同的设计语汇。适用于对话型 AI 的会话(Session)思维模型,在 Agent 循环运行、在后台重试、没有人类兜底的场景下会彻底失效。

触发层:为什么不能只给提示词套一层 Cron

主动型 Agent 最简单的实现看起来是这样的:0 9 * * * python run_agent.py。这行得通,直到它失效为止。生成每日摘要的 Agent 足够简单,Cron 任务是合理的。但一旦 Agent 开始向外部系统写入数据,这种简陋的 Cron 包装就会产生一类静默复合的故障。

第一种故障模式是执行重叠。Cron 任务不会检查上一次运行是否还在进行中。如果你早上 9 点的 Agent 因上游 API 缓慢而耗时 90 分钟,10 点的任务照样会启动。此时,两个 Agent 实例同时读取相同的状态、各自独立地做决策,并向相同的目标写入数据。结果可能是重复的发票、重复发送的通知,或相互矛盾的数据库更新——具体取决于你的 Agent 写入了什么。

解决方案不是拉长 Cron 间隔,而是把触发层和执行层视为两个独立的关注点。触发器负责可靠地触发,执行层负责:

  1. 在开始工作前获取分布式锁
  2. 检查预期工作是否已完成(幂等性检查)
  3. 在释放锁之前记录已完成状态

这并非新的智慧——这是将事务性发件箱(Transactional Outbox)模式应用于 Agent 执行。在同一个数据库事务中,写入"我正在为输入 Y 启动运行 X"的记录和实际的工作结果。在下一次触发时,先检查发件箱再继续。如果该逻辑工作单元已存在完成记录,则跳过。

对于使用 Temporal 的团队,持久化执行模型已部分处理了这个问题——中途失败的工作流会从最后一个检查点恢复,而不是重新开始。对于在无服务器基础设施或普通 Cron 上运行 Agent 的团队,发件箱模式是最可靠的替代方案。

事件驱动触发:以推送替代轮询

轮询是无法推送时的兜底方案。大多数做定时 Agent 工作的团队,实际上在实现一种效率更低的事件驱动架构:每 N 分钟检查一次变更,而不是在变更发生时立即响应。

变更数据捕获(CDC)是运维上更成熟的替代方案。Kafka 配合 Debezium 连接器、AWS DMS 或数据库原生复制流,允许你订阅每一次已提交的数据库行变更。你的 Agent 在真正发生变更时被调用,而不是按照一个可能与变更节奏完全不匹配的调度表运行。

架构层面的影响是显著的:与基于轮询的方案相比,事件驱动 Agent 可将系统延迟降低 70–90%,且在空闲时零计算成本。如果变更不频繁,一个每 5 分钟轮询一次的 Agent 有 99% 以上的时间都在浪费资源。

按来源分类的触发模式:

  • 数据库变更:通过 Kafka/Debezium 的 CDC,或数据库触发器写入队列
  • API 事件:Webhook 投递到端点,经消息队列缓冲
  • 基于时间:标准 Cron 配合分布式锁和幂等性保障
  • 状态漂移:监控系统检测到指标偏差后发出事件,而不是按调度触发

这些方案之间的选择,主要取决于数据源能发出什么信号。CDC 延迟最低、保真度最高。Webhook 对第三方数据源最易实现。Cron 是无法推送事件的数据源的最低保真兜底方案。

幂等性不是可选项,它就是架构本身

当 Agent 从消息队列或 Webhook 基础设施接收事件时,它在"至少一次"(at-least-once)投递语义下运行。队列保证你的 Agent 至少会处理一次消息,但不承诺恰好一次。

网络分区、Agent 崩溃和超时重试,都会导致消息被多次投递。你的 Agent 代码必须从一开始就设计为能处理这种情况,而不是等第一次重复事件发生后再补救。

处理模式:

  1. 每个传入事件携带一个由生产者生成的全局唯一事件 ID。
  2. 在处理前,查询已处理事件表:这个事件 ID 是否已被处理?
  3. 如果是,立即返回成功——不要重新执行。
  4. 如果否,处理事件,并在同一个数据库事务中将事件 ID 记录为已处理,与工作本身一起提交。

第 4 步的原子性是大多数实现出错的地方。在工作完成后单独调用来记录事件 ID,会产生一个时间窗口:崩溃可能导致事件已处理但没有记录,从而在重试时被再次处理;或者有记录但事件未被处理,导致永远不会被处理。事件 ID 记录和工作输出必须一起提交。

加载中…
Let's stay in touch and Follow me for more thoughts and updates