feat(agent-runtime): mirror a run's transcript to thread storage incrementally#456
Conversation
…ementally Add `flushGranularity: 'turn' | 'message'` to AgentRuntime. In 'message' mode each message is appended to thread storage as soon as the executor session commits, so an OSS-polling observer can watch a running agent's conversation grow in real time instead of only seeing it after the turn finishes. 'turn' (default) keeps the existing single end-of-turn append, so this is a zero-behaviour-change default. The three run paths (sync/async/stream) now share one commit-gated, idempotent message flusher: a cursor advances only after a successful append, giving at-most-once semantics across the success and catch/abort paths; stream_event deltas stay filtered; both modes converge on an identical final transcript. Add `OSSAgentStore.indexThrottleMs` (default 0 = unchanged) to coalesce the activity-index sidecar writes that per-message appends would otherwise multiply. The throttle bookkeeping map is only touched when throttling is enabled and is FIFO-capped so it cannot grow without bound. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR replaces AgentRuntime's previous "persist-at-most-once" flow with a centralized ChangesIncremental thread mirroring and activity-index throttling
Sequence DiagramsequenceDiagram
participant Executor
participant syncRun
participant MessageFlusher
participant ThreadStore
Executor->>syncRun: start run with streaming executor
syncRun->>MessageFlusher: createMessageFlusher(userInput)
loop executor yields committed messages
Executor->>syncRun: committed message
syncRun->>MessageFlusher: flushQuietly()
MessageFlusher->>ThreadStore: append from cursor (idempotent)
ThreadStore-->>syncRun: partial transcript visible to readers
end
alt abort in-loop
syncRun->>MessageFlusher: flushQuietly() before finalize
syncRun-->>Executor: cancel signal
else success
syncRun->>MessageFlusher: flush(true) bypass commit gate
MessageFlusher->>ThreadStore: final forced append
syncRun->>syncRun: mark completed
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly Related PRs
Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a new flushGranularity option ('turn' or 'message') to AgentRuntime to support incremental mirroring of running turns to thread storage. It refactors the message persistence logic using a new createMessageFlusher helper. Additionally, it adds an indexThrottleMs option to OSSAgentStore to throttle and coalesce activity-index sidecar writes during rapid message appends, preventing excessive writes. Unit tests have been added to verify both the incremental flushing behavior and the index throttling mechanism. I have no feedback to provide as there are no review comments to evaluate.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
…etion The incremental-flush refactor routed the success path through the commit-gated flush(), so a run that finished normally but never flipped task.committed (a system-only run, or a conservative custom isSessionCommitted) silently persisted nothing while still being marked Completed — losing the transcript and corrupting the next run's isResume. The success-path flush now forces past the commit gate (flush(true)), restoring the pre-incremental behaviour where a normally-finished run always appends its full transcript. The gate still constrains the in-loop incremental writes and the abort/failure cleanup paths, so abort-before-commit still leaves the thread untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A late abort (external AbortSignal or destroy()) can land while the final in-loop flush / markCommitted is awaiting. If the executor then ignores the signal and ends naturally, the for-await loop exits without a top-of-loop re-check, and the post-loop only inspected the store status — which an external abort never writes — so the run was marked Completed instead of Cancelled. Add a post-loop `abortController.signal.aborted` re-check in all three run paths (sync/async/stream), before the success-completion path, mirroring the in-loop abort handling. The window pre-dated the incremental flush (the last awaited loop-body op was previously markCommitted); the per-message flush only widened it. cancelRun is unaffected — it still writes a terminal status caught by the existing TOCTOU check. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Codex 对抗式 review(3 轮收敛)用 Round 1 — codex 提 2 个 P2:
Round 2 — codex 提 1 个新 P2: Round 3 — codex:「未发现明确且可操作的正确性、安全性或兼容性问题。相关测试均通过。」 测试 184 passing(在原 182 基础上 +2 回归用例); |
…flushGranularity OSS comfortably absorbs the per-message write rate (writes spread across UUID-keyed per-thread objects, no hot partition; peak hundreds–low-thousands ops/s vs OSS's many-thousands-QPS single-bucket budget; per-append latency is negligible vs LLM message cadence), so the opt-in flag is unnecessary friction and a silent-no-op footgun when not wired through. Remove it: every run path now flushes each message as it is produced. To keep that coherent, OSSAgentStore.indexThrottleMs now defaults to 5s (was 0): per-message appends would otherwise emit one activity-index sidecar object per message and multiply the reader's list cost. Pass indexThrottleMs: 0 to restore write-on-every-append. messages.jsonl content is unaffected by throttling. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The activity index is partitioned by UTC date (index/threads-by-updated-date/ <date>/...). Throttling purely on elapsed time meant a thread that appended just before midnight and again within indexThrottleMs after midnight would be suppressed from the NEW day's bucket entirely — a reader listing that day would miss the thread. Include the date bucket in the per-thread throttle state and never throttle across a bucket change, so each day a thread is active always gets at least one index entry. Within a day, throttling is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…tle on PUT failure Two issues surfaced once messages are mirrored per-message instead of once per turn: 1. Concurrent runs on the same thread, on a store without atomic `append`, raced through the get-concat-put fallback (read same object, clobber each other, lose lines). Serialize that fallback per thread via a generalized per-key lock (the atomic OSS AppendObject path is already position-safe and stays lock-free). 2. The index throttle recorded its window eagerly, before the fire-and-forget index PUT resolved. A transient PUT failure then suppressed retries for the throttle interval, potentially dropping a thread from a day's activity index. Roll back the throttle entry when the PUT fails (identity-checked so a concurrent newer entry is not clobbered) so the next append retries. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… for incremental mirroring Concurrent runs on one thread are unsupported (they already diverge the per-thread executor session); with per-message appends they would also interleave into an unresumable transcript. Callers serialize runs per thread; the runtime does not enforce it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The flusher advances its cursor only after a successful append (favouring no-loss). An ambiguous failure — store committed, client errored — therefore re-appends and can duplicate a mirror line. This pre-dated the change (the turn-batch catch-block persist had the same risk, once per turn). Documented as a known at-least-once limitation; de-dup (idempotent ids / length reconciliation) is a follow-up. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
设计调整 + codex 多轮对抗复审设计调整:去掉 flushGranularity 开关,改为永远增量原先做成 codex 复审修掉的真实问题
已知限制(本轮按文档化处理,后续迭代)
测试 187 passing; |
…rollback The PUT-failure rollback re-enables a future append's index write but does not re-index appends that were throttled within the failed window; if no further append arrives that day, the thread is missing from that day's activity bucket until its next activity. messages.jsonl is unaffected. A full fix (success-gated throttle, losing burst coalescing; or tracking+rewriting latest activity) is deferred — best-effort index. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
背景 / 动机
agent 运行在 Chair Sandbox,通过
OSSAgentStore把会话写到 OSS 的threads/<id>/messages.jsonl;一个独立的、只读轮询 OSS 的观测应用按对象读取来展示会话详情。原本messages.jsonl在一个 turn(run)结束后才一次性批量 append,所以运行中的 agent 在 turn 结束前观测端读到的是空的——无法实时看到会话。观测端与运行端唯一契约就是 OSS 对象(够不到本地 SSE/event 文件),因此唯一可行的办法是让消息增量出现在 OSS 的messages.jsonl。改动
AgentRuntime:永远增量镜像。 三条 run 路径(sync/async/stream)在循环里每产出一条消息就 append 到 thread 存储,观测端轮询getThread即可看到运行中会话逐步生长。统一复用一个 commit 门控 + 游标幂等 的 flusher:flush(true)强制写,保证正常结束的 run 一定持久化全量转录(即便从未 commit)。aborted,避免外部 signal/destroy()在最后一次 flush 期间触发时把该 Cancelled 的 run 误标 Completed。OSSAgentStore:indexThrottleMs默认 5s(传 0 关闭):逐条 append 否则会让活动索引index/threads-by-updated-date/每条消息生成一个 sidecar 对象,放大读端 list 成本。节流按 UTC 日期桶作用域(跨午夜强制写新桶,不会让线程从新一天的索引缺失);索引 PUT 失败时回滚节流窗口让下次 append 重试;节流簿记 Map 有 FIFO 上限。append的 client 走 get-concat-put 回退时,按 thread 串行化,避免并发 append 丢更新(真 OSS 的原子 AppendObject 路径本就 position 安全,不加锁)。测试
core/agent-runtime187 passing(tsc:pub、ESLint 通过)。新增覆盖:运行中提前可见、完成无重复、commit 前 abort 留空、成功但从未 commit 仍持久化、late-abort 标 Cancelled、索引默认节流 / 关闭节流 / 跨日期桶不被节流、索引 PUT 失败回滚重试、并发回退 append 不丢更新。兼容性
行为变更:消息现在在 run 执行期间增量落库(原为 turn 末批量);最终转录内容一致。
indexThrottleMs默认从 0 改为 5s(可传 0 恢复每次 append 写索引)。🤖 Generated with Claude Code
Summary by CodeRabbit
0disables) with retry-safe rollback on write failures and correct behavior across UTC day boundaries.