Skip to content

fix(streams): snapshot SSE cursor once on entry to stop drop race#241

Open
smoreinis wants to merge 2 commits into
mainfrom
fix/sse-stream-cursor-race
Open

fix(streams): snapshot SSE cursor once on entry to stop drop race#241
smoreinis wants to merge 2 commits into
mainfrom
fix/sse-stream-cursor-race

Conversation

@smoreinis
Copy link
Copy Markdown
Collaborator

@smoreinis smoreinis commented May 15, 2026

Summary

stream_task_events in streams_use_case.py initialized last_id = "$" and only advanced it when XREAD returned entries. When BLOCK timed out empty, the outer loop re-issued XREAD with the literal "$" — Redis re-resolves "$" to the current stream tail at call time, so any entry XADD-ed in the ~100ms gap between the empty return and the next call landed behind the new "$" and was unreachable from the consumer.

For fast-emitting agents (token-level LLM streaming at multiple Hz, ~7 Hz bursts in practice), this silently dropped deltas. Slow agents were unaffected, which made the failure mode timing-dependent and easy to miss.

What changed

  • New repo helper RedisStreamRepository.get_stream_tail_id(topic) -> str that snapshots the current tail via XREVRANGE topic + - COUNT 1, returning "0-0" for empty/missing streams. Also added to the StreamRepository port so any future implementation must honour the cursor-snapshot contract.
  • stream_task_events now calls get_stream_tail_id once on entry and uses that as the starting last_id. The existing loop already advances last_id correctly from yielded entry IDs, so this is the only change needed.

Out of scope

These are deliberate non-changes — happy to follow up in a separate PR if there's appetite:

  • Switching to XREADGROUP with per-subscriber consumer groups. At-least-once + PEL recovery + replica fan-out is the principled fix; the cursor snapshot is the minimal correct fix. Consumer-group lifecycle (create, ack, expire, clean up) is a bigger architectural change.
  • Exposing Last-Event-ID / since= on the endpoint and SDK. Lets reconnecting SSE clients resume across disconnects without losing the gap. Requires an API change.

Test plan

New integration test test_event_xadded_in_inter_cycle_gap_is_delivereddeterministic symptom-level reproduction, not race-window timing:

  • Patches repo.read_messages to set an asyncio.Event the instant the first BLOCK returns empty. asyncio scheduling guarantees the consumer is then entering its 100ms inter-cycle asyncio.sleep before yielding back to the test.
  • Test XADDs a uniquely-tagged event synchronously on the signal — guaranteed to land inside the gap.
  • Waits 2.5s for the second BLOCK cycle, then asserts the reader received the sentinel.

Under the bug: the next XREAD $ re-resolves $ to the sentinel's ID and waits for strictly-greater entries — the sentinel is lost forever, test fails (verified locally: AssertionError: assert []).

Under the fix: snapshotted cursor stays put across the empty cycle; the sentinel has an ID > snapshot and is delivered.

  • New regression test passes against the fix.
  • New regression test FAILS against the unfixed code (verified by reverting the one-line change and re-running).
  • All 6 existing tests in test_task_stream.py still pass with the fix.
  • ruff check and ruff format --check clean on changed files.

Greptile Summary

This PR fixes a silent message-drop race in stream_task_events where the Redis "$" cursor sentinel was re-resolved to the stream tail on every XREAD call, causing entries XADDed during the ~100ms gap between empty-BLOCK returns and the next call to be permanently unreachable. The fix snapshots the tail once on entry via a new XREVRANGE-backed helper.

  • New get_stream_tail_id method on RedisStreamRepository (and the StreamRepository port) snapshots the stream tail using XREVRANGE topic + - COUNT 1, returning "0-0" for empty/non-existent streams so the first future XADD is always captured.
  • stream_task_events now calls get_stream_tail_id once before the read loop; the existing loop already advances last_id from yielded entry IDs, so no further changes to the loop body are needed.
  • Regression test test_event_xadded_in_inter_cycle_gap_is_delivered patches read_messages to deterministically fire an event at the precise inter-cycle gap, XADDs a sentinel there, and asserts delivery — confirmed to fail against the unfixed code.

Confidence Score: 5/5

Safe to merge — targeted one-call change to snapshot the stream cursor, with a deterministic regression test confirming the fix and all existing tests still passing.

The change is minimal and surgical: a single await on entry to stream_task_events replaces the problematic "$" literal. The new get_stream_tail_id helper is straightforward (XREVRANGE COUNT 1), handles the bytes/str duality from the Redis client, and falls back safely to "0-0" for empty streams. The regression test is deterministic and was confirmed to fail against the original code. No existing loop logic is touched.

No files require special attention.

Important Files Changed

Filename Overview
agentex/src/adapters/streams/adapter_redis.py Adds get_stream_tail_id using XREVRANGE count=1 to snapshot the current stream tail; handles both bytes and str return types correctly and returns "0-0" for empty/non-existent streams.
agentex/src/adapters/streams/port.py Adds get_stream_tail_id as an abstract method to the StreamRepository port, enforcing the cursor-snapshot contract on any future implementations.
agentex/src/domain/use_cases/streams_use_case.py Replaces the last_id = "$" assignment with a one-time get_stream_tail_id call before the read loop, eliminating the cursor re-resolution race on each empty-BLOCK cycle.
agentex/tests/integration/test_task_stream.py Adds test_event_xadded_in_inter_cycle_gap_is_delivered, a deterministic regression test that patches read_messages to signal after the first empty BLOCK, XADDs a sentinel in that gap, and asserts delivery.

Sequence Diagram

sequenceDiagram
    participant Client as SSE Client
    participant UC as StreamsUseCase
    participant Repo as RedisStreamRepository
    participant Redis as Redis

    Client->>UC: stream_task_events(task_id)
    UC->>Client: yield connected event
    UC->>Repo: get_stream_tail_id(stream_topic)
    Repo->>Redis: XREVRANGE topic + - COUNT 1
    Redis-->>Repo: [(tail_id, fields)] or []
    Repo-->>UC: tail_id (or "0-0" if empty)
    Note over UC: last_id = tail_id (stable cursor)

    loop Read loop
        UC->>Repo: read_messages(topic, last_id)
        Repo->>Redis: XREAD BLOCK 2000ms STREAMS topic last_id
        alt Messages available
            Redis-->>Repo: [(msg_id, fields), ...]
            Repo-->>UC: yield (msg_id, data)
            UC->>Client: yield SSE event
            Note over UC: last_id = msg_id (advanced)
        else Timeout (empty)
            Redis-->>Repo: null
            Note over UC: last_id unchanged (stable — no re-resolve of "$")
            UC->>Client: yield :ping (if interval elapsed)
        end
    end
Loading

Reviews (2): Last reviewed commit: "test(streams): make SSE drop test determ..." | Re-trigger Greptile

stream_task_events initialized last_id="$" and only advanced it when
XREAD returned entries. When BLOCK timed out empty, the outer loop
re-issued XREAD with the literal "$" — Redis re-resolves "$" to the
current stream tail at call time, so any XADD'd entry in the ~100ms
gap between the empty return and the next call landed behind the new
"$" and was unreachable from the consumer.

For fast-emitting agents (token-level LLM streaming at multiple Hz),
this silently dropped deltas. Slow agents were unaffected, which made
the failure mode timing-dependent.

Snapshot the tail once on entry via a new repository helper
get_stream_tail_id (XREVRANGE topic + - COUNT 1, "0-0" fallback for
empty/missing streams). The existing loop already advances last_id
correctly from yielded entry IDs, so the snapshot only replaces the
initial "$".

Adds an integration regression test that spies on the repository and
asserts (a) get_stream_tail_id is called once on entry and (b) no
read_messages call ever receives "$" as last_id. Deterministic —
doesn't depend on race-window timing.
@smoreinis smoreinis requested a review from a team as a code owner May 15, 2026 21:45
Replace the spy-based contract assertion with a symptom-level test
that deterministically reproduces the bug:

Patch repo.read_messages to set an asyncio.Event the instant the
first BLOCK returns empty. asyncio scheduling guarantees the consumer
then enters its 100ms inter-cycle sleep before yielding control, so a
synchronous XADD on the signal lands inside the gap window. Under the
buggy "$" cursor, the next xread re-resolves $ to our entry's ID and
waits for strictly greater entries — losing the entry forever.

Confirmed the test fails against last_id="$" (AssertionError: assert
[]) and passes against the snapshotted cursor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant