diff --git a/.changeset/sse-idle-timeout.md b/.changeset/sse-idle-timeout.md new file mode 100644 index 000000000..27b8f2121 --- /dev/null +++ b/.changeset/sse-idle-timeout.md @@ -0,0 +1,12 @@ +--- +'@modelcontextprotocol/client': minor +--- + +Add `idleTimeoutMs` option to `StreamableHTTPClientTransport`. When set, the SSE stream +reader cancels itself if no chunk arrives within the configured window, and the existing +disconnect/reconnect path runs (same as a network drop). The timer resets on every chunk, +so this is a per-chunk inactivity timeout, not a total stream lifetime. + +Useful for half-open TCP connections, stalled servers, and proxies that go silent without +closing the socket — without it, `reader.read()` blocks indefinitely and the agent hangs. +Defaults to undefined (no timeout, same as today). Closes #1883. diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index cd643c96d..5cf4bbcbe 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -160,6 +160,22 @@ export type StreamableHTTPClientTransportOptions = { * handshake so the reconnected transport continues sending the required header. */ protocolVersion?: string; + + /** + * Idle timeout for the SSE stream reader, in milliseconds. + * + * If no chunk is received from the server within this window, the reader is cancelled and + * the stream's normal disconnect/reconnect path runs (same as a network drop). The timer + * resets on every chunk arrival, so this is a per-chunk inactivity timeout, not a total + * stream lifetime. + * + * Useful for half-open TCP connections, stalled servers, and proxies that go silent + * without closing the socket. Without this option, `reader.read()` blocks indefinitely + * when the stream stalls. + * + * Defaults to undefined (no idle timeout — preserves existing behavior). + */ + idleTimeoutMs?: number; }; /** @@ -184,6 +200,7 @@ export class StreamableHTTPClientTransport implements Transport { private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private readonly _reconnectionScheduler?: ReconnectionScheduler; private _cancelReconnection?: () => void; + private readonly _idleTimeoutMs?: number; // Per-chunk inactivity timeout for SSE reader; opt-in. onclose?: () => void; onerror?: (error: Error) => void; @@ -206,6 +223,12 @@ export class StreamableHTTPClientTransport implements Transport { this._protocolVersion = opts?.protocolVersion; this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; this._reconnectionScheduler = opts?.reconnectionScheduler; + if (opts?.idleTimeoutMs !== undefined) { + if (!Number.isFinite(opts.idleTimeoutMs) || opts.idleTimeoutMs <= 0) { + throw new Error(`idleTimeoutMs must be a positive finite number; got ${opts.idleTimeoutMs}`); + } + this._idleTimeoutMs = opts.idleTimeoutMs; + } } private async _commonHeaders(): Promise { @@ -376,7 +399,11 @@ export class StreamableHTTPClientTransport implements Transport { // Track whether we've received a response - if so, no need to reconnect // Reconnection is for when server disconnects BEFORE sending response let receivedResponse = false; + const idleTimeoutMs = this._idleTimeoutMs; const processStream = async () => { + // Track the idle timer outside the try so we can clear it from the catch and + // the post-loop block. Reassigned on every chunk arrival. + let idleTimer: ReturnType | undefined; // this is the closest we can get to trying to catch network errors // if something happens reader will throw try { @@ -393,8 +420,24 @@ export class StreamableHTTPClientTransport implements Transport { ) .getReader(); + // Per-chunk inactivity timer. Cancelling the reader makes the next + // `await reader.read()` reject, which falls through to the catch + // block below — same path as a network drop. + const armIdleTimer = (): void => { + if (idleTimeoutMs === undefined) return; + if (idleTimer !== undefined) clearTimeout(idleTimer); + idleTimer = setTimeout(() => { + idleTimer = undefined; + // `cancel()` on a ReadableStreamDefaultReader rejects pending reads. + reader.cancel(new Error(`SSE idle timeout after ${idleTimeoutMs}ms`)).catch(() => {}); + }, idleTimeoutMs); + }; + armIdleTimer(); + while (true) { const { value: event, done } = await reader.read(); + // Reset the idle timer on every chunk (data, priming, or close). + armIdleTimer(); if (done) { break; } @@ -470,6 +513,11 @@ export class StreamableHTTPClientTransport implements Transport { this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); } } + } finally { + if (idleTimer !== undefined) { + clearTimeout(idleTimer); + idleTimer = undefined; + } } }; processStream(); diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index b2138b3fa..8e2ae495c 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -2017,4 +2017,136 @@ describe('StreamableHTTPClientTransport', () => { expect(onclose).toHaveBeenCalledTimes(1); }); }); + + describe('idleTimeoutMs', () => { + it('rejects non-positive or non-finite values', () => { + expect( + () => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: 0 }) + ).toThrow(/positive finite/); + expect( + () => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: -1 }) + ).toThrow(/positive finite/); + expect( + () => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: Infinity }) + ).toThrow(/positive finite/); + expect( + () => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: Number.NaN }) + ).toThrow(/positive finite/); + }); + + it('defaults to no idle timeout (preserves existing behavior)', async () => { + // A stream that never enqueues anything should not trigger any error + // when no idleTimeoutMs is set — the reader simply waits. + const stream = new ReadableStream({ + start() { + /* never enqueue, never close */ + } + }); + + (globalThis.fetch as Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + + const onerror = vi.fn(); + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); + transport.onerror = onerror; + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Wait long enough that any default timeout would have fired. + await new Promise(resolve => setTimeout(resolve, 100)); + expect(onerror).not.toHaveBeenCalled(); + }); + + it('cancels the reader and fires onerror when no chunk arrives within idleTimeoutMs', async () => { + // Mock SSE source that never sends data + const stream = new ReadableStream({ + start() { + /* never enqueue, never close */ + } + }); + + (globalThis.fetch as Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + + const onerror = vi.fn(); + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + idleTimeoutMs: 50, + // Avoid kicking off a reconnect loop in this test — we only care about the timeout firing. + reconnectionOptions: { + initialReconnectionDelay: 1000, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1, + maxRetries: 0 + } + }); + transport.onerror = onerror; + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Wait past the idle timeout window + await new Promise(resolve => setTimeout(resolve, 150)); + + expect(onerror).toHaveBeenCalled(); + const errMsg = (onerror.mock.calls[0]?.[0] as Error).message; + expect(errMsg).toMatch(/SSE stream disconnected/); + expect(errMsg).toMatch(/SSE idle timeout after 50ms/); + }); + + it('does not fire when chunks keep arriving within the window', async () => { + // Stream that emits a chunk every 30ms; idle timeout is 100ms — should never fire. + const encoder = new TextEncoder(); + let cancelled = false; + const stream = new ReadableStream({ + start(controller) { + let count = 0; + const tick = (): void => { + if (cancelled || count >= 4) return; + controller.enqueue( + encoder.encode(`event: message\ndata: {"jsonrpc":"2.0","method":"ping","params":{}}\n\n`) + ); + count++; + setTimeout(tick, 30); + }; + tick(); + }, + cancel() { + cancelled = true; + } + }); + + (globalThis.fetch as Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: stream + }); + + const onerror = vi.fn(); + const onmessage = vi.fn(); + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + idleTimeoutMs: 100 + }); + transport.onerror = onerror; + transport.onmessage = onmessage; + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Let all 4 chunks deliver (~120ms) plus a small buffer + await new Promise(resolve => setTimeout(resolve, 200)); + + expect(onmessage).toHaveBeenCalled(); + expect(onerror).not.toHaveBeenCalled(); + }); + }); });