Skip to content

fix(transport): prevent SSE replay storms and silent message loss#396

Open
dimakis wants to merge 1 commit into
mainfrom
fix/sse-delivery-regression
Open

fix(transport): prevent SSE replay storms and silent message loss#396
dimakis wants to merge 1 commit into
mainfrom
fix/sse-delivery-regression

Conversation

@dimakis

@dimakis dimakis commented Jun 25, 2026

Copy link
Copy Markdown
Owner

Summary

Fixes the SSE transport regression causing 8+ minute message delivery delays and silent message drops after the SSE migration (#381).

Root cause chain:

  1. ProcessTransport.interrupt() throws unhandled → server crashes → loses all in-memory state
  2. Clients reconnect with stale cursors → periodic sync replays thousands of events at 50/5s
  3. isSessionActive only checked a boolean, not session STATE → SUSPENDED/DETACHED sessions replayed indefinitely
  4. Failed send POSTs (404 from stale connectionId) were silently dropped → user messages lost

Three fixes:

  • Periodic sync replay storm prevention (connection-registry.ts):

    • Replace isSessionActive with shouldSync — checks session STATE (only ACTIVE/STARTING/CREATED sync)
    • Add MAX_REPLAY_GAP (200 events) — skip ahead when cursor is far behind head instead of replaying 6K+ events
    • Add getHeadSeq to EventStore for gap detection
  • POST retry for failed message sends (sse-connection.ts):

    • Re-queue send POSTs that get HTTP errors or network errors
    • Messages flush on next successful reconnect instead of being silently dropped
  • Guard ProcessTransport.interrupt() crash (chat.ts):

    • Wrap queryInstance.interrupt() in try/catch to prevent server process death

Test plan

  • 76 tests passing (connection-registry + sse-connection)
  • Type check clean
  • Centaur review (2 rounds)
  • Manual test: send messages during agent tool execution, verify delivery
  • Manual test: restart server, verify no replay storm in logs

🤖 Generated with Claude Code

Three fixes for the SSE transport regression causing 8+ minute message
delivery delays and silent message drops:

1. Periodic sync replay storm prevention (connection-registry):
   - Replace isSessionActive with shouldSync that checks session STATE
     (ACTIVE/STARTING/CREATED only), not just the isActive boolean.
     SUSPENDED/DETACHED/ENDED sessions no longer trigger sync.
   - Add MAX_REPLAY_GAP (200 events) cap — when cursor is far behind
     head (e.g. after server restart), skip ahead instead of replaying
     thousands of events at 50/5s. Client lazy-loads history via API.
   - Add getHeadSeq to EventStore for gap detection.

2. POST retry for failed message sends (sse-connection):
   - Re-queue send POSTs that get HTTP errors (e.g. 404 from stale
     connectionId after reconnect) or network errors. Messages flush
     on next successful reconnect instead of being silently dropped.
   - Only send endpoints retry — stop/interrupt are fire-and-forget.

3. Guard ProcessTransport.interrupt() crash (chat.ts):
   - Wrap queryInstance.interrupt() in try/catch to prevent unhandled
     errors from killing the server process. This was the trigger that
     caused server restart → in-memory state loss → replay storms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@dimakis dimakis left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Centaur Review

Found 5 issue(s) (2 warning).

packages/client/src/sse-connection.ts

Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.

  • 🟡 bugs (L340): Re-queue in doPost bypasses the MAX_PENDING_SENDS cap enforced in send() (line 97). If multiple flushPendingSends rounds fail repeatedly, pendingSends can grow beyond 100 without the shift() eviction. Consider applying the same cap here. [fixable]

packages/protocol/src/event-store.ts

Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.

  • 🟡 missing_tests (L617): EventStore.getHeadSeq() has no unit tests. It's exercised indirectly via connection-registry tests (which mock it), but the actual SQLite query (MAX(seq)) is never tested against a real database — e.g., no events returns 0, single event returns its seq, multiple events returns the max. [fixable]

server/index.ts

Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.

  • 🔵 unsafe_assumptions (L105): shouldSync returns false for null state (unknown session ID), which is correct but implicit. If a session is watched but hasn't been registered in the EventStore yet (race at startup), periodic sync silently skips it. This is probably fine since events wouldn't exist yet either, but worth noting.

server/chat.ts

Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.

  • 🔵 missing_tests (L1269): The new try/catch around queryInstance.interrupt() has no test verifying that the server survives a ProcessTransport crash. The existing interruptChat tests in send-to-chat.test.ts don't cover the case where interrupt() throws. [fixable]

packages/client/src/__tests__/sse-connection.test.ts

Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.

  • 🔵 style (L876): The callCount tracking variable in the first two retry tests shadows mockFetch.mock.calls.length. Using mockFetch.mockImplementationOnce() for the first call and a default for subsequent calls would be cleaner and avoid the manual counter. [fixable]

// Message delivery failed (e.g. 404 from stale connectionId).
// Re-queue so it flushes on the next successful reconnect.
console.warn(`[SseConnection] POST /${endpoint} returned ${res.status}, re-queuing`);
this.pendingSends.push({ endpoint, body });

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 bugs: Re-queue in doPost bypasses the MAX_PENDING_SENDS cap enforced in send() (line 97). If multiple flushPendingSends rounds fail repeatedly, pendingSends can grow beyond 100 without the shift() eviction. Consider applying the same cap here. [fixable]

return (row?.state as SessionState) ?? null;
}

/** Return the highest seq number for a session, or 0 if no events exist. */

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 missing_tests: EventStore.getHeadSeq() has no unit tests. It's exercised indirectly via connection-registry tests (which mock it), but the actual SQLite query (MAX(seq)) is never tested against a real database — e.g., no events returns 0, single event returns its seq, multiple events returns the max. [fixable]

Comment thread server/index.ts
// shouldSync skips ENDED/SUSPENDED/DETACHED sessions to prevent replay storms.
// getHeadSeq caps replay depth so post-crash reconnects don't flood clients.
connRegistry.setEventStore({
getEventsAfter: (sessionId, afterSeq, limit) =>

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 unsafe_assumptions: shouldSync returns false for null state (unknown session ID), which is correct but implicit. If a session is watched but hasn't been registered in the EventStore yet (race at startup), periodic sync silently skips it. This is probably fine since events wouldn't exist yet either, but worth noting.

Comment thread server/chat.ts
await Promise.allSettled(stops);
}
await session.queryInstance.interrupt();
try {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 missing_tests: The new try/catch around queryInstance.interrupt() has no test verifying that the server survives a ProcessTransport crash. The existing interruptChat tests in send-to-chat.test.ts don't cover the case where interrupt() throws. [fixable]

expect(mockFetch).not.toHaveBeenCalled();
});

// ─── POST retry on failure ───────────────────────────────────────────────

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 style: The callCount tracking variable in the first two retry tests shadows mockFetch.mock.calls.length. Using mockFetch.mockImplementationOnce() for the first call and a default for subsequent calls would be cleaner and avoid the manual counter. [fixable]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant