跳到主要内容

主动型 Agent:后台 AI 的事件驱动与定时自动化

· 阅读需 12 分钟
Tian Pan
Software Engineer

几乎所有关于构建 AI Agent 的教程都以同样的方式开场:用户输入消息,Agent 进行推理,Agent 返回响应。这个模型对聊天机器人和副驾驶(Copilot)来说运行良好,却无法描述各组织正在大规模部署的大多数生产 AI 工作。

在企业环境中默默发挥最大价值的 Agent,并不等待消息。它们在数据库行发生变更时唤醒,在队列深度超过阈值时唤醒,在凌晨 3 点的定时任务触发时唤醒,或在监控检测到指标漂移超出范围时唤醒。它们在没有用户在场的情况下行动。一旦失败,没有人会察觉,直到损失已经累积到难以挽回。

构建这类主动型 Agent 需要一套与构建被动式助手截然不同的设计语汇。适用于对话型 AI 的会话(Session)思维模型,在 Agent 循环运行、在后台重试、没有人类兜底的场景下会彻底失效。

触发层:为什么不能只给提示词套一层 Cron

主动型 Agent 最简单的实现看起来是这样的:0 9 * * * python run_agent.py。这行得通,直到它失效为止。生成每日摘要的 Agent 足够简单,Cron 任务是合理的。但一旦 Agent 开始向外部系统写入数据,这种简陋的 Cron 包装就会产生一类静默复合的故障。

第一种故障模式是执行重叠。Cron 任务不会检查上一次运行是否还在进行中。如果你早上 9 点的 Agent 因上游 API 缓慢而耗时 90 分钟,10 点的任务照样会启动。此时,两个 Agent 实例同时读取相同的状态、各自独立地做决策,并向相同的目标写入数据。结果可能是重复的发票、重复发送的通知,或相互矛盾的数据库更新——具体取决于你的 Agent 写入了什么。

解决方案不是拉长 Cron 间隔,而是把触发层和执行层视为两个独立的关注点。触发器负责可靠地触发,执行层负责:

  1. 在开始工作前获取分布式锁
  2. 检查预期工作是否已完成(幂等性检查)
  3. 在释放锁之前记录已完成状态

这并非新的智慧——这是将事务性发件箱(Transactional Outbox)模式应用于 Agent 执行。在同一个数据库事务中,写入"我正在为输入 Y 启动运行 X"的记录和实际的工作结果。在下一次触发时,先检查发件箱再继续。如果该逻辑工作单元已存在完成记录,则跳过。

对于使用 Temporal 的团队,持久化执行模型已部分处理了这个问题——中途失败的工作流会从最后一个检查点恢复,而不是重新开始。对于在无服务器基础设施或普通 Cron 上运行 Agent 的团队,发件箱模式是最可靠的替代方案。

事件驱动触发:以推送替代轮询

轮询是无法推送时的兜底方案。大多数做定时 Agent 工作的团队,实际上在实现一种效率更低的事件驱动架构:每 N 分钟检查一次变更,而不是在变更发生时立即响应。

变更数据捕获(CDC)是运维上更成熟的替代方案。Kafka 配合 Debezium 连接器、AWS DMS 或数据库原生复制流,允许你订阅每一次已提交的数据库行变更。你的 Agent 在真正发生变更时被调用,而不是按照一个可能与变更节奏完全不匹配的调度表运行。

架构层面的影响是显著的:与基于轮询的方案相比,事件驱动 Agent 可将系统延迟降低 70–90%,且在空闲时零计算成本。如果变更不频繁,一个每 5 分钟轮询一次的 Agent 有 99% 以上的时间都在浪费资源。

按来源分类的触发模式:

  • 数据库变更:通过 Kafka/Debezium 的 CDC,或数据库触发器写入队列
  • API 事件:Webhook 投递到端点,经消息队列缓冲
  • 基于时间:标准 Cron 配合分布式锁和幂等性保障
  • 状态漂移:监控系统检测到指标偏差后发出事件,而不是按调度触发

这些方案之间的选择,主要取决于数据源能发出什么信号。CDC 延迟最低、保真度最高。Webhook 对第三方数据源最易实现。Cron 是无法推送事件的数据源的最低保真兜底方案。

幂等性不是可选项,它就是架构本身

当 Agent 从消息队列或 Webhook 基础设施接收事件时,它在"至少一次"(at-least-once)投递语义下运行。队列保证你的 Agent 至少会处理一次消息,但不承诺恰好一次。

网络分区、Agent 崩溃和超时重试,都会导致消息被多次投递。你的 Agent 代码必须从一开始就设计为能处理这种情况,而不是等第一次重复事件发生后再补救。

处理模式:

  1. 每个传入事件携带一个由生产者生成的全局唯一事件 ID。
  2. 在处理前,查询已处理事件表:这个事件 ID 是否已被处理?
  3. 如果是,立即返回成功——不要重新执行。
  4. 如果否,处理事件,并在同一个数据库事务中将事件 ID 记录为已处理,与工作本身一起提交。

第 4 步的原子性是大多数实现出错的地方。在工作完成后单独调用来记录事件 ID,会产生一个时间窗口:崩溃可能导致事件已处理但没有记录,从而在重试时被再次处理;或者有记录但事件未被处理,导致永远不会被处理。事件 ID 记录和工作输出必须一起提交。

对于拥有写入工具的 Agent——那些发送邮件、更新记录、调用支付 API 的 Agent——幂等性失败意味着现实世界的副作用发生了两次。重复扣款、重复消息和重复状态变更,无法通过再次运行 Agent 来恢复,需要人工介入或补偿性事务。

漂移检测触发:基于变化而非基于调度触发

一种常见的主动型 Agent 模式是检查某件事是否已经退化,并采取纠正措施。数据质量监视器、模型性能看门狗、库存核对 Agent——这些都需要在事情发生变化时触发,而不是在任意时间间隔触发。

最简单的实现是按调度轮询:每小时运行质量检查,与阈值比较,超出阈值则采取行动。问题在于,每小时轮询意味着你可能比实际问题出现晚 59 分钟才做出响应,而问题可能在上次检查后一分钟就已出现。

事件触发的漂移检测将这一逻辑倒置。不是 Agent 醒来去寻找漂移,而是监控层在检测到漂移时发出事件,Agent 在事件到达时处理它。监控层持续运行,成本低廉,以数据支持的任意粒度运行。Agent 仅在有事可做时才运行。

两级告警使这一机制更健壮:警告级用于中等漂移,生成调查事件;严重级用于剧烈漂移,生成行动事件。处理调查事件的 Agent 可能生成诊断报告并排入人工审查队列。处理行动事件的 Agent 可能启动自动修复。

实际实现使用与其余事件驱动管道相同的消息队列基础设施。漂移检测器将事件写入主题,Agent 订阅该主题。这将检测逻辑(属于监控基础设施)与响应逻辑(属于你的 Agent)清晰分离。

静默失败问题

同步 Agent 的失败是显而易见的。用户发送了消息,Agent 出错,UI 显示错误状态,有人提交了 Bug 报告。后台 Agent 默认情况下是静默失败的。本应在凌晨 2 点核对记录的任务,在第 47 行因上游 API 速率限制而失败,没有人知道,直到数据错误到足以在 12 小时后的仪表盘上浮现。

传统监控是为确定性系统设计的。HTTP 200 意味着成功。超时是失败。这些信号对 Agent 来说是不够的——HTTP 200 响应可能包含幻觉或不完整的结果,而 Agent 可能在每一步都产生错误输出的情况下仍然成功完成了执行循环。

有效的后台 Agent 可观测性需要三件事:

结构化执行追踪。 每次 Agent 运行都应该发出一条追踪记录,包括调用了哪些工具、传入了什么参数、每个工具返回了什么、以及 Agent 基于这些返回做出了什么决策。追踪记录应该可以在事后进行查询调查。

结果验证,而不只是执行验证。 一个调用了三个工具并返回答案的 Agent 在执行意义上是"成功"的。答案是否正确是另一个问题。后台 Agent 需要输出验证器——在将结果提交到目标之前,对结果的形状和内容进行断言。

对缺失的告警,而不仅仅是对存在的告警。 一次有噪声的失败——伴随异常、非零退出码、错误日志——可以被标准监控检测到。一次因 Cron 任务未被调度而根本未启动的运行,或一次静默完成却未产生输出的运行,则更难捕捉。最可靠的模式:每次后台 Agent 运行都应写入一条包含时间戳和预期下次运行时间的心跳记录。一个独立的监视器监视逾期的心跳记录。

Temporal 与持久化执行的正确之处

持久化执行模型解决了无状态 Agent 调用的一个根本问题:如果 Agent 在运行中途崩溃,所有进行中的状态都会丢失。从头重试意味着重新执行所有已完成的步骤;不重试则意味着工作丢失。

Temporal 及类似的工作流引擎在每一步之后持久化执行状态。如果 Agent 进程崩溃,它会从中断处恢复——已经运行的工具不会再次运行。这消除了内部 Agent 状态的幂等性问题,同时对外部副作用(工作流引擎外部的 API 和数据库仍然需要幂等性键)仍然需要幂等性保障。

对于不使用工作流引擎的团队,实际的替代方案是检查点:在每个逻辑步骤之后将进度写入持久化存储(数据库行、S3 对象)。在启动时检查是否存在检查点,并从那里恢复,而不是重新开始。这比 Temporal 更手动,但对许多用例来说已经足够。

权衡:持久化执行引擎增加了运维复杂性和新的基础设施依赖。对于步骤多、运行时间长、价值高的 Agent,可靠性收益值得这些开销。对于从头重试成本较低的简单、短暂运行的 Agent,这些开销通常不值得。

后台 Agent 的实践设计原则

主动型 Agent 需要与交互式同类产品不同的默认设置:

让每个触发器都是幂等的。 将所有投递视为至少一次。事件 ID 是你的去重单元,必须与它所保护的工作原子性地检查。

分离触发、执行和输出。 决定何时运行的代码、执行工作的代码和提交结果的代码应该是不同的组件。这使每个组件都可以独立测试,并使故障边界更清晰。

监控缺失,而不仅仅是存在。 当预期的运行没有发生时告警,而不仅仅是当它们产生错误时。缺失的执行通常比失败的执行更危险,因为它更难检测。

在提交之前验证输出。 拥有写入工具的 Agent 在将输出写入外部系统之前应该对其运行断言。通过执行验证但未通过输出验证的输出应标记为人工审查,而不是静默丢弃或静默提交。

记录足够多以重建运行过程。 你需要在凌晨 3 点回答的问题是:"Agent 在损坏这条数据的运行中做了什么?"你的追踪记录需要明确地回答这个问题。

后台 Agent 的前景令人振奋——使用 Agent 协调的物流团队报告将延误减少了高达 40%,使用自动化 Agent 的客户支持组织将通话时间减少了近 25%。但这些成果需要在运维上正确的 Agent,而不仅仅是功能上有能力的 Agent。一个平均能做对事情、但在凌晨 2 点静默失败、在负载下产生重复、或在队列积压期间丢失事件的 Agent,是一种负债而非资产。

对话型 Agent 的设计语汇——会话、轮次、上下文窗口——需要一套配套的后台执行语汇:触发器、幂等性键、发件箱模式、心跳、分布式锁。构建生产 AI 系统的工程师将需要两者兼备。

Let's stay in touch and Follow me for more thoughts and updates