Skip to content

feat(agent-runtime): mirror a run's transcript to thread storage incrementally#456

Merged
akitaSummer merged 9 commits into
masterfrom
feat-incremental-message-flush
Jun 16, 2026
Merged

feat(agent-runtime): mirror a run's transcript to thread storage incrementally#456
akitaSummer merged 9 commits into
masterfrom
feat-incremental-message-flush

Conversation

@jerryliang64

@jerryliang64 jerryliang64 commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

背景 / 动机

agent 运行在 Chair Sandbox,通过 OSSAgentStore 把会话写到 OSS 的 threads/<id>/messages.jsonl;一个独立的、只读轮询 OSS 的观测应用按对象读取来展示会话详情。原本 messages.jsonl 在一个 turn(run)结束后才一次性批量 append,所以运行中的 agent 在 turn 结束前观测端读到的是空的——无法实时看到会话。观测端与运行端唯一契约就是 OSS 对象(够不到本地 SSE/event 文件),因此唯一可行的办法是让消息增量出现在 OSS 的 messages.jsonl

改动

AgentRuntime:永远增量镜像。 三条 run 路径(sync/async/stream)在循环里每产出一条消息就 append 到 thread 存储,观测端轮询 getThread 即可看到运行中会话逐步生长。统一复用一个 commit 门控 + 游标幂等 的 flusher:

  • commit 门控:executor session 未 commit 前不写(abort-before-commit → thread 留空,resume 不分叉);成功完成路径 flush(true) 强制写,保证正常结束的 run 一定持久化全量转录(即便从未 commit)。
  • at-most-once:游标只在 append 成功后推进,成功路径与 catch/abort 补写不重不漏。
  • stream_event delta 始终不入库。
  • late-abort 复查:loop 结束后、进入成功路径前复查 aborted,避免外部 signal/destroy() 在最后一次 flush 期间触发时把该 Cancelled 的 run 误标 Completed。

设计前提:单 thread 同时只有一个活跃 run。这本就是 executor 模型的要求(一个 thread = 一个可 resume 的 executor session,并发 run 本就会分叉 session)。已在代码注释中明确;runtime 不强制,由调用方串行化(典型「一个会话 = 一条顺序 thread」)。

OSSAgentStore:

  • indexThrottleMs 默认 5s(传 0 关闭):逐条 append 否则会让活动索引 index/threads-by-updated-date/ 每条消息生成一个 sidecar 对象,放大读端 list 成本。节流按 UTC 日期桶作用域(跨午夜强制写新桶,不会让线程从新一天的索引缺失);索引 PUT 失败时回滚节流窗口让下次 append 重试;节流簿记 Map 有 FIFO 上限。
  • 无原子 append 的 client 走 get-concat-put 回退时,按 thread 串行化,避免并发 append 丢更新(真 OSS 的原子 AppendObject 路径本就 position 安全,不加锁)。

测试

core/agent-runtime 187 passing(tsc:pub、ESLint 通过)。新增覆盖:运行中提前可见、完成无重复、commit 前 abort 留空、成功但从未 commit 仍持久化、late-abort 标 Cancelled、索引默认节流 / 关闭节流 / 跨日期桶不被节流、索引 PUT 失败回滚重试、并发回退 append 不丢更新。

兼容性

行为变更:消息现在在 run 执行期间增量落库(原为 turn 末批量);最终转录内容一致。indexThrottleMs 默认从 0 改为 5s(可传 0 恢复每次 append 写索引)。

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Improvements
    • Enhanced agent run transcript persistence so user/assistant turns are mirrored reliably to thread storage, including “never committed” and incremental streaming scenarios, with safer handling when runs are finalized by other actors.
    • Added configurable activity-index throttling (default 5s; 0 disables) with retry-safe rollback on write failures and correct behavior across UTC day boundaries.
  • Testing
    • Added regression and concurrency tests to verify incremental visibility during in-flight runs, correct cancel vs complete outcomes, and no lost updates under concurrent appends.

…ementally

Add `flushGranularity: 'turn' | 'message'` to AgentRuntime. In 'message'
mode each message is appended to thread storage as soon as the executor
session commits, so an OSS-polling observer can watch a running agent's
conversation grow in real time instead of only seeing it after the turn
finishes. 'turn' (default) keeps the existing single end-of-turn append, so
this is a zero-behaviour-change default.

The three run paths (sync/async/stream) now share one commit-gated, idempotent
message flusher: a cursor advances only after a successful append, giving
at-most-once semantics across the success and catch/abort paths; stream_event
deltas stay filtered; both modes converge on an identical final transcript.

Add `OSSAgentStore.indexThrottleMs` (default 0 = unchanged) to coalesce the
activity-index sidecar writes that per-message appends would otherwise
multiply. The throttle bookkeeping map is only touched when throttling is
enabled and is FIFO-capped so it cannot grow without bound.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3689fc0d-b6df-4553-8b92-d37546d6be0f

📥 Commits

Reviewing files that changed from the base of the PR and between 2505c90 and 33804db.

📒 Files selected for processing (1)
  • core/agent-runtime/src/OSSAgentStore.ts

📝 Walkthrough

Walkthrough

This PR replaces AgentRuntime's previous "persist-at-most-once" flow with a centralized createMessageFlusher helper that enforces commit-gated, idempotent thread mirroring across syncRun, asyncRun, and executeStreamBackground. Each run method now calls flushQuietly() incrementally during streaming and on abort/failure paths, then performs a final forced flush(true) on success to guarantee transcript persistence. OSSAgentStore gains indexThrottleMs configuration for coalescing activity-index writes within UTC date buckets while maintaining per-append message durability. Tests validate incremental visibility, commit-gating semantics, and throttle behavior.

Changes

Incremental thread mirroring and activity-index throttling

Layer / File(s) Summary
Centralized idempotent message flusher helper
core/agent-runtime/src/AgentRuntime.ts
New private createMessageFlusher helper with commit-gate, flushedCount cursor for idempotent retry, and dual-mode flush: flush(force?) respects commit gate by default; flushQuietly() swallows errors while logging.
syncRun integration with flusher
core/agent-runtime/src/AgentRuntime.ts
syncRun creates per-run flusher, calls flushQuietly() on in-loop and TOCTOU abort paths before finalization, performs forced flush(true) on success, and invokes flushQuietly() in failure catch before status update.
asyncRun integration with flusher
core/agent-runtime/src/AgentRuntime.ts
asyncRun adopts flusher similarly: flushQuietly() on in-loop abort, late-abort re-check, and TOCTOU terminal-status checks; forced flush(true) on normal completion; quiet flush in failure and abort finalization paths.
executeStreamBackground integration with flusher
core/agent-runtime/src/AgentRuntime.ts
Background streaming path uses flusher with identical pattern: flushQuietly() on aborts and TOCTOU checks, forced flush(true) after executor completion before emitting done event, and quiet flush in failure/abort paths before SSE emission.
OSSAgentStore activity-index throttling configuration and implementation
core/agent-runtime/src/OSSAgentStore.ts
Adds indexThrottleMs?: number (default 5s, disable with 0); tracks per-thread last-index timestamp by UTC date bucket; appendMessages skips index write when within throttle window, records/evicts throttle entries with FIFO cap, and passes entry to writeThreadActivityIndex for failure rollback; serialization refactored to generic runExclusive helper.
Incremental mirroring and commit-gating test suite
core/agent-runtime/test/AgentRuntime.test.ts
syncRun regression test with "never committed" override validates full transcript persistence; new incremental mirror section tests in-flight message visibility without duplication, abort-before-commit empty thread, and late external abort producing RunStatus.Cancelled.
Activity-index throttling validation tests
core/agent-runtime/test/OSSAgentStore.test.ts
Concurrency test validates fallback append consistency; throttling tests verify index-write frequency with indexThrottleMs: 0, default coalescing within window, UTC boundary respect, and failure rollback recovery; all verify messages remain retrievable with includeAllMessages: true.

Sequence Diagram

sequenceDiagram
  participant Executor
  participant syncRun
  participant MessageFlusher
  participant ThreadStore
  Executor->>syncRun: start run with streaming executor
  syncRun->>MessageFlusher: createMessageFlusher(userInput)
  loop executor yields committed messages
    Executor->>syncRun: committed message
    syncRun->>MessageFlusher: flushQuietly()
    MessageFlusher->>ThreadStore: append from cursor (idempotent)
    ThreadStore-->>syncRun: partial transcript visible to readers
  end
  alt abort in-loop
    syncRun->>MessageFlusher: flushQuietly() before finalize
    syncRun-->>Executor: cancel signal
  else success
    syncRun->>MessageFlusher: flush(true) bypass commit gate
    MessageFlusher->>ThreadStore: final forced append
    syncRun->>syncRun: mark completed
  end
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly Related PRs

  • eggjs/tegg#449: Both PRs modify core/agent-runtime/src/AgentRuntime.ts's sync/async/background transcript persistence to ensure user + already-produced assistant messages are written using task.committed-gated logic for non-success/abort/failure end states.
  • eggjs/tegg#441: Both PRs update core/agent-runtime/src/AgentRuntime.ts to enforce task.committed/commit-detection gating for when thread messages (especially on abort/cancel) are persisted, including coordinated changes to post-loop terminal handling and transcript flushing.
  • eggjs/tegg#445: Both PRs modify core/agent-runtime/src/OSSAgentStore.ts around the thread activity-index sidecar logic (writeThreadActivityIndex), including how index keys are written in the background and how failed PUTs are handled.

Suggested Reviewers

  • killagu
  • akitaSummer

Poem

🐰 I hopped through threads both slow and fleet,
Flushed a message at each heartbeat,
Commit gates kept the mirror clean,
No dupes, no gaps—a tidy scene,
Cheers from a rabbit with nimble feet! 🌙

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and specifically describes the main change: incremental transcript mirroring to thread storage, which is the central feature across all modified files.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat-incremental-message-flush

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new flushGranularity option ('turn' or 'message') to AgentRuntime to support incremental mirroring of running turns to thread storage. It refactors the message persistence logic using a new createMessageFlusher helper. Additionally, it adds an indexThrottleMs option to OSSAgentStore to throttle and coalesce activity-index sidecar writes during rapid message appends, preventing excessive writes. Unit tests have been added to verify both the incremental flushing behavior and the index throttling mechanism. I have no feedback to provide as there are no review comments to evaluate.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

jerryliang64 and others added 2 commits June 12, 2026 01:01
…etion

The incremental-flush refactor routed the success path through the
commit-gated flush(), so a run that finished normally but never flipped
task.committed (a system-only run, or a conservative custom
isSessionCommitted) silently persisted nothing while still being marked
Completed — losing the transcript and corrupting the next run's isResume.

The success-path flush now forces past the commit gate (flush(true)),
restoring the pre-incremental behaviour where a normally-finished run always
appends its full transcript. The gate still constrains the in-loop incremental
writes and the abort/failure cleanup paths, so abort-before-commit still
leaves the thread untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A late abort (external AbortSignal or destroy()) can land while the final
in-loop flush / markCommitted is awaiting. If the executor then ignores the
signal and ends naturally, the for-await loop exits without a top-of-loop
re-check, and the post-loop only inspected the store status — which an external
abort never writes — so the run was marked Completed instead of Cancelled.

Add a post-loop `abortController.signal.aborted` re-check in all three run
paths (sync/async/stream), before the success-completion path, mirroring the
in-loop abort handling. The window pre-dated the incremental flush (the last
awaited loop-body op was previously markCommitted); the per-message flush only
widened it. cancelRun is unaffected — it still writes a terminal status caught
by the existing TOCTOU check.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jerryliang64

Copy link
Copy Markdown
Contributor Author

Codex 对抗式 review(3 轮收敛)

codex review 对本分支做了对抗式审查,逐条核实后修复,最终收敛到 0 issue。

Round 1 — codex 提 2 个 P2:

  1. 成功路径被 commit 门控误伤(真 bug):重构后成功路径走了 gated 的 flush(),导致「executor 成功结束但从未 commit」(system-only run,或自定义 isSessionCommitted 恒 false)时转录丢失、isResume 误判。改动前成功路径是无条件 append 的。→ 修复:成功路径用 flush(true) 绕过 commit 门控,恢复旧语义;in-loop/abort/失败路径仍 gated(1ceb05a)。补回归测试。
  2. ⚖️ 索引节流的尾部滞后:核实后实际滞后量 有界 ≤ indexThrottleMs(每跨一个窗口必写一次),正是 JSDoc 已声明的可接受权衡,consumer 按 max updatedAt 去重且 list 缓存 5min,影响可忽略。修复 first impl #1 后 codex 未再坚持。

Round 2 — codex 提 1 个新 P2:
3. ✅ loop 自然结束后未复查 abort(竞态):外部 AbortSignal/destroy() 在最后一次 flush 期间触发、且 executor 忽略信号自然结束时,loop 不经顶部复查退出,post-loop 只看 store 终态(外部 abort 不写),会把该 Cancelled 的 run 标成 Completed。该窗口改动前就存在(旧代码 loop 末尾是 markCommitted),per-message flush 只是拉宽了它。→ 修复:三条路径 loop 后、进入成功路径前补 abortController.signal.aborted 复查(c505e77)。补确定性回归测试。

Round 3 — codex:「未发现明确且可操作的正确性、安全性或兼容性问题。相关测试均通过。」

测试 184 passing(在原 182 基础上 +2 回归用例);tsc:pub、ESLint 均通过。

jerryliang64 and others added 4 commits June 12, 2026 18:04
…flushGranularity

OSS comfortably absorbs the per-message write rate (writes spread across
UUID-keyed per-thread objects, no hot partition; peak hundreds–low-thousands
ops/s vs OSS's many-thousands-QPS single-bucket budget; per-append latency is
negligible vs LLM message cadence), so the opt-in flag is unnecessary friction
and a silent-no-op footgun when not wired through. Remove it: every run path
now flushes each message as it is produced.

To keep that coherent, OSSAgentStore.indexThrottleMs now defaults to 5s (was
0): per-message appends would otherwise emit one activity-index sidecar object
per message and multiply the reader's list cost. Pass indexThrottleMs: 0 to
restore write-on-every-append. messages.jsonl content is unaffected by
throttling.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The activity index is partitioned by UTC date (index/threads-by-updated-date/
<date>/...). Throttling purely on elapsed time meant a thread that appended
just before midnight and again within indexThrottleMs after midnight would be
suppressed from the NEW day's bucket entirely — a reader listing that day would
miss the thread.

Include the date bucket in the per-thread throttle state and never throttle
across a bucket change, so each day a thread is active always gets at least one
index entry. Within a day, throttling is unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…tle on PUT failure

Two issues surfaced once messages are mirrored per-message instead of once per
turn:

1. Concurrent runs on the same thread, on a store without atomic `append`,
   raced through the get-concat-put fallback (read same object, clobber each
   other, lose lines). Serialize that fallback per thread via a generalized
   per-key lock (the atomic OSS AppendObject path is already position-safe and
   stays lock-free).

2. The index throttle recorded its window eagerly, before the fire-and-forget
   index PUT resolved. A transient PUT failure then suppressed retries for the
   throttle interval, potentially dropping a thread from a day's activity index.
   Roll back the throttle entry when the PUT fails (identity-checked so a
   concurrent newer entry is not clobbered) so the next append retries.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… for incremental mirroring

Concurrent runs on one thread are unsupported (they already diverge the
per-thread executor session); with per-message appends they would also
interleave into an unresumable transcript. Callers serialize runs per thread;
the runtime does not enforce it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jerryliang64 jerryliang64 changed the title feat(agent-runtime): stream a run's transcript to thread storage incrementally feat(agent-runtime): mirror a run's transcript to thread storage incrementally Jun 15, 2026
The flusher advances its cursor only after a successful append (favouring
no-loss). An ambiguous failure — store committed, client errored — therefore
re-appends and can duplicate a mirror line. This pre-dated the change (the
turn-batch catch-block persist had the same risk, once per turn). Documented as
a known at-least-once limitation; de-dup (idempotent ids / length
reconciliation) is a follow-up.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jerryliang64

Copy link
Copy Markdown
Contributor Author

设计调整 + codex 多轮对抗复审

设计调整:去掉 flushGranularity 开关,改为永远增量

原先做成 flushGranularity: 'turn' | 'message' 开关(默认 turn)。评估后确认 OSS 扛得住逐条写(写分散在 UUID-keyed 的 per-thread 对象、无热点;峰值百~千 ops/s 远低于单 bucket 上万 QPS;append 延迟相对 LLM 出消息间隔可忽略),且开关默认关 + 要穿插件/app 两层 wiring 才生效,是个静默失效的坑。故去掉开关,永远边写;OSSAgentStore.indexThrottleMs 默认随之改为 5s(传 0 关闭),避免逐条 append 把活动索引 sidecar 放大。

codex 复审修掉的真实问题

  • 成功路径被 commit 门控误伤:成功结束但从未 commit 的 run 会丢转录 → flush(true) 强制写修复。
  • late-abort 竞态:loop 自然结束后未复查 abort,外部 signal/destroy 可能把 Cancelled 误标 Completed → 补 post-loop 复查。
  • 索引节流跨 UTC 日期桶漏写:跨午夜的 append 被节流 → 新一天索引缺失 → 节流纳入日期桶,跨桶强制写。
  • 并发回退 append 丢更新:无原子 append 的 client 走 get-concat-put,并发同 thread 会丢 → per-thread 串行化(原子 OSS append 路径本就 position 安全,不加锁)。
  • 索引 PUT 失败仍占节流窗口:失败 PUT 仍记节流时间 → 回滚节流条目让下次 append 重试。

已知限制(本轮按文档化处理,后续迭代)

  1. 单 thread 单活跃 run:并发同 thread run 会交错出无法 resume 的历史。但这本就是 executor session 模型的硬约束(并发 run 本就分叉 session),非本改动引入;已在代码注释明确,由调用方串行化(典型「一会话=一顺序 thread」),runtime 不强制。
  2. 歧义 append 可能重复(at-least-once):append 服务端已写但客户端报错(超时/丢响应)时,游标不前进 → 下次重发 → 可能重复一行。游标策略取「不丢 > 不重」;该风险改动前就存在(turn 批量的 catch 补写同样会重发),只是频率从每 turn 提到每条。彻底去重需幂等行 id / 按长度对账,作为 follow-up。

测试 187 passing;tsc:pub、ESLint 通过。

…rollback

The PUT-failure rollback re-enables a future append's index write but does not
re-index appends that were throttled within the failed window; if no further
append arrives that day, the thread is missing from that day's activity bucket
until its next activity. messages.jsonl is unaffected. A full fix (success-gated
throttle, losing burst coalescing; or tracking+rewriting latest activity) is
deferred — best-effort index.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@akitaSummer akitaSummer merged commit bf89468 into master Jun 16, 2026
12 checks passed
@akitaSummer akitaSummer deleted the feat-incremental-message-flush branch June 16, 2026 03:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants