fix(transport): prevent SSE replay storms and silent message loss#396
fix(transport): prevent SSE replay storms and silent message loss#396dimakis wants to merge 1 commit into
Conversation
Three fixes for the SSE transport regression causing 8+ minute message
delivery delays and silent message drops:
1. Periodic sync replay storm prevention (connection-registry):
- Replace isSessionActive with shouldSync that checks session STATE
(ACTIVE/STARTING/CREATED only), not just the isActive boolean.
SUSPENDED/DETACHED/ENDED sessions no longer trigger sync.
- Add MAX_REPLAY_GAP (200 events) cap — when cursor is far behind
head (e.g. after server restart), skip ahead instead of replaying
thousands of events at 50/5s. Client lazy-loads history via API.
- Add getHeadSeq to EventStore for gap detection.
2. POST retry for failed message sends (sse-connection):
- Re-queue send POSTs that get HTTP errors (e.g. 404 from stale
connectionId after reconnect) or network errors. Messages flush
on next successful reconnect instead of being silently dropped.
- Only send endpoints retry — stop/interrupt are fire-and-forget.
3. Guard ProcessTransport.interrupt() crash (chat.ts):
- Wrap queryInstance.interrupt() in try/catch to prevent unhandled
errors from killing the server process. This was the trigger that
caused server restart → in-memory state loss → replay storms.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 5 issue(s) (2 warning).
packages/client/src/sse-connection.ts
Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.
- 🟡 bugs (L340): Re-queue in doPost bypasses the MAX_PENDING_SENDS cap enforced in send() (line 97). If multiple flushPendingSends rounds fail repeatedly, pendingSends can grow beyond 100 without the shift() eviction. Consider applying the same cap here.
[fixable]
packages/protocol/src/event-store.ts
Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.
- 🟡 missing_tests (L617): EventStore.getHeadSeq() has no unit tests. It's exercised indirectly via connection-registry tests (which mock it), but the actual SQLite query (MAX(seq)) is never tested against a real database — e.g., no events returns 0, single event returns its seq, multiple events returns the max.
[fixable]
server/index.ts
Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.
- 🔵 unsafe_assumptions (L105): shouldSync returns false for null state (unknown session ID), which is correct but implicit. If a session is watched but hasn't been registered in the EventStore yet (race at startup), periodic sync silently skips it. This is probably fine since events wouldn't exist yet either, but worth noting.
server/chat.ts
Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.
- 🔵 missing_tests (L1269): The new try/catch around queryInstance.interrupt() has no test verifying that the server survives a ProcessTransport crash. The existing interruptChat tests in send-to-chat.test.ts don't cover the case where interrupt() throws.
[fixable]
packages/client/src/__tests__/sse-connection.test.ts
Solid fix for a real production issue — replay storm prevention and retry-on-failure are well-designed. Main concern is the missing MAX_PENDING_SENDS cap on the re-queue path in doPost, and EventStore.getHeadSeq lacks direct unit tests against SQLite.
- 🔵 style (L876): The callCount tracking variable in the first two retry tests shadows mockFetch.mock.calls.length. Using mockFetch.mockImplementationOnce() for the first call and a default for subsequent calls would be cleaner and avoid the manual counter.
[fixable]
| // Message delivery failed (e.g. 404 from stale connectionId). | ||
| // Re-queue so it flushes on the next successful reconnect. | ||
| console.warn(`[SseConnection] POST /${endpoint} returned ${res.status}, re-queuing`); | ||
| this.pendingSends.push({ endpoint, body }); |
There was a problem hiding this comment.
🟡 bugs: Re-queue in doPost bypasses the MAX_PENDING_SENDS cap enforced in send() (line 97). If multiple flushPendingSends rounds fail repeatedly, pendingSends can grow beyond 100 without the shift() eviction. Consider applying the same cap here. [fixable]
| return (row?.state as SessionState) ?? null; | ||
| } | ||
|
|
||
| /** Return the highest seq number for a session, or 0 if no events exist. */ |
There was a problem hiding this comment.
🟡 missing_tests: EventStore.getHeadSeq() has no unit tests. It's exercised indirectly via connection-registry tests (which mock it), but the actual SQLite query (MAX(seq)) is never tested against a real database — e.g., no events returns 0, single event returns its seq, multiple events returns the max. [fixable]
| // shouldSync skips ENDED/SUSPENDED/DETACHED sessions to prevent replay storms. | ||
| // getHeadSeq caps replay depth so post-crash reconnects don't flood clients. | ||
| connRegistry.setEventStore({ | ||
| getEventsAfter: (sessionId, afterSeq, limit) => |
There was a problem hiding this comment.
🔵 unsafe_assumptions: shouldSync returns false for null state (unknown session ID), which is correct but implicit. If a session is watched but hasn't been registered in the EventStore yet (race at startup), periodic sync silently skips it. This is probably fine since events wouldn't exist yet either, but worth noting.
| await Promise.allSettled(stops); | ||
| } | ||
| await session.queryInstance.interrupt(); | ||
| try { |
There was a problem hiding this comment.
🔵 missing_tests: The new try/catch around queryInstance.interrupt() has no test verifying that the server survives a ProcessTransport crash. The existing interruptChat tests in send-to-chat.test.ts don't cover the case where interrupt() throws. [fixable]
| expect(mockFetch).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| // ─── POST retry on failure ─────────────────────────────────────────────── |
There was a problem hiding this comment.
🔵 style: The callCount tracking variable in the first two retry tests shadows mockFetch.mock.calls.length. Using mockFetch.mockImplementationOnce() for the first call and a default for subsequent calls would be cleaner and avoid the manual counter. [fixable]
Summary
Fixes the SSE transport regression causing 8+ minute message delivery delays and silent message drops after the SSE migration (#381).
Root cause chain:
ProcessTransport.interrupt()throws unhandled → server crashes → loses all in-memory stateisSessionActiveonly checked a boolean, not session STATE → SUSPENDED/DETACHED sessions replayed indefinitelysendPOSTs (404 from stale connectionId) were silently dropped → user messages lostThree fixes:
Periodic sync replay storm prevention (
connection-registry.ts):isSessionActivewithshouldSync— checks session STATE (only ACTIVE/STARTING/CREATED sync)MAX_REPLAY_GAP(200 events) — skip ahead when cursor is far behind head instead of replaying 6K+ eventsgetHeadSeqto EventStore for gap detectionPOST retry for failed message sends (
sse-connection.ts):sendPOSTs that get HTTP errors or network errorsGuard ProcessTransport.interrupt() crash (
chat.ts):queryInstance.interrupt()in try/catch to prevent server process deathTest plan
🤖 Generated with Claude Code