fix(streams): snapshot SSE cursor once on entry to stop drop race#241
Open
smoreinis wants to merge 2 commits into
Open
fix(streams): snapshot SSE cursor once on entry to stop drop race#241smoreinis wants to merge 2 commits into
smoreinis wants to merge 2 commits into
Conversation
stream_task_events initialized last_id="$" and only advanced it when XREAD returned entries. When BLOCK timed out empty, the outer loop re-issued XREAD with the literal "$" — Redis re-resolves "$" to the current stream tail at call time, so any XADD'd entry in the ~100ms gap between the empty return and the next call landed behind the new "$" and was unreachable from the consumer. For fast-emitting agents (token-level LLM streaming at multiple Hz), this silently dropped deltas. Slow agents were unaffected, which made the failure mode timing-dependent. Snapshot the tail once on entry via a new repository helper get_stream_tail_id (XREVRANGE topic + - COUNT 1, "0-0" fallback for empty/missing streams). The existing loop already advances last_id correctly from yielded entry IDs, so the snapshot only replaces the initial "$". Adds an integration regression test that spies on the repository and asserts (a) get_stream_tail_id is called once on entry and (b) no read_messages call ever receives "$" as last_id. Deterministic — doesn't depend on race-window timing.
Replace the spy-based contract assertion with a symptom-level test that deterministically reproduces the bug: Patch repo.read_messages to set an asyncio.Event the instant the first BLOCK returns empty. asyncio scheduling guarantees the consumer then enters its 100ms inter-cycle sleep before yielding control, so a synchronous XADD on the signal lands inside the gap window. Under the buggy "$" cursor, the next xread re-resolves $ to our entry's ID and waits for strictly greater entries — losing the entry forever. Confirmed the test fails against last_id="$" (AssertionError: assert []) and passes against the snapshotted cursor.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
stream_task_eventsinstreams_use_case.pyinitializedlast_id = "$"and only advanced it whenXREADreturned entries. WhenBLOCKtimed out empty, the outer loop re-issuedXREADwith the literal"$"— Redis re-resolves"$"to the current stream tail at call time, so any entryXADD-ed in the ~100ms gap between the empty return and the next call landed behind the new"$"and was unreachable from the consumer.For fast-emitting agents (token-level LLM streaming at multiple Hz, ~7 Hz bursts in practice), this silently dropped deltas. Slow agents were unaffected, which made the failure mode timing-dependent and easy to miss.
What changed
RedisStreamRepository.get_stream_tail_id(topic) -> strthat snapshots the current tail viaXREVRANGE topic + - COUNT 1, returning"0-0"for empty/missing streams. Also added to theStreamRepositoryport so any future implementation must honour the cursor-snapshot contract.stream_task_eventsnow callsget_stream_tail_idonce on entry and uses that as the startinglast_id. The existing loop already advanceslast_idcorrectly from yielded entry IDs, so this is the only change needed.Out of scope
These are deliberate non-changes — happy to follow up in a separate PR if there's appetite:
XREADGROUPwith per-subscriber consumer groups. At-least-once + PEL recovery + replica fan-out is the principled fix; the cursor snapshot is the minimal correct fix. Consumer-group lifecycle (create, ack, expire, clean up) is a bigger architectural change.Last-Event-ID/since=on the endpoint and SDK. Lets reconnecting SSE clients resume across disconnects without losing the gap. Requires an API change.Test plan
New integration test
test_event_xadded_in_inter_cycle_gap_is_delivered— deterministic symptom-level reproduction, not race-window timing:repo.read_messagesto set anasyncio.Eventthe instant the first BLOCK returns empty. asyncio scheduling guarantees the consumer is then entering its 100ms inter-cycleasyncio.sleepbefore yielding back to the test.Under the bug: the next
XREAD $re-resolves$to the sentinel's ID and waits for strictly-greater entries — the sentinel is lost forever, test fails (verified locally:AssertionError: assert []).Under the fix: snapshotted cursor stays put across the empty cycle; the sentinel has an ID > snapshot and is delivered.
test_task_stream.pystill pass with the fix.ruff checkandruff format --checkclean on changed files.Greptile Summary
This PR fixes a silent message-drop race in
stream_task_eventswhere the Redis"$"cursor sentinel was re-resolved to the stream tail on everyXREADcall, causing entriesXADDed during the ~100ms gap between empty-BLOCK returns and the next call to be permanently unreachable. The fix snapshots the tail once on entry via a newXREVRANGE-backed helper.get_stream_tail_idmethod onRedisStreamRepository(and theStreamRepositoryport) snapshots the stream tail usingXREVRANGE topic + - COUNT 1, returning"0-0"for empty/non-existent streams so the first futureXADDis always captured.stream_task_eventsnow callsget_stream_tail_idonce before the read loop; the existing loop already advanceslast_idfrom yielded entry IDs, so no further changes to the loop body are needed.test_event_xadded_in_inter_cycle_gap_is_deliveredpatchesread_messagesto deterministically fire an event at the precise inter-cycle gap,XADDs a sentinel there, and asserts delivery — confirmed to fail against the unfixed code.Confidence Score: 5/5
Safe to merge — targeted one-call change to snapshot the stream cursor, with a deterministic regression test confirming the fix and all existing tests still passing.
The change is minimal and surgical: a single
awaiton entry tostream_task_eventsreplaces the problematic"$"literal. The newget_stream_tail_idhelper is straightforward (XREVRANGE COUNT 1), handles the bytes/str duality from the Redis client, and falls back safely to"0-0"for empty streams. The regression test is deterministic and was confirmed to fail against the original code. No existing loop logic is touched.No files require special attention.
Important Files Changed
get_stream_tail_idusingXREVRANGE count=1to snapshot the current stream tail; handles both bytes and str return types correctly and returns"0-0"for empty/non-existent streams.get_stream_tail_idas an abstract method to theStreamRepositoryport, enforcing the cursor-snapshot contract on any future implementations.last_id = "$"assignment with a one-timeget_stream_tail_idcall before the read loop, eliminating the cursor re-resolution race on each empty-BLOCK cycle.test_event_xadded_in_inter_cycle_gap_is_delivered, a deterministic regression test that patchesread_messagesto signal after the first empty BLOCK, XADDs a sentinel in that gap, and asserts delivery.Sequence Diagram
sequenceDiagram participant Client as SSE Client participant UC as StreamsUseCase participant Repo as RedisStreamRepository participant Redis as Redis Client->>UC: stream_task_events(task_id) UC->>Client: yield connected event UC->>Repo: get_stream_tail_id(stream_topic) Repo->>Redis: XREVRANGE topic + - COUNT 1 Redis-->>Repo: [(tail_id, fields)] or [] Repo-->>UC: tail_id (or "0-0" if empty) Note over UC: last_id = tail_id (stable cursor) loop Read loop UC->>Repo: read_messages(topic, last_id) Repo->>Redis: XREAD BLOCK 2000ms STREAMS topic last_id alt Messages available Redis-->>Repo: [(msg_id, fields), ...] Repo-->>UC: yield (msg_id, data) UC->>Client: yield SSE event Note over UC: last_id = msg_id (advanced) else Timeout (empty) Redis-->>Repo: null Note over UC: last_id unchanged (stable — no re-resolve of "$") UC->>Client: yield :ping (if interval elapsed) end endReviews (2): Last reviewed commit: "test(streams): make SSE drop test determ..." | Re-trigger Greptile