Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/sse-idle-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@modelcontextprotocol/client': minor
---

Add `idleTimeoutMs` option to `StreamableHTTPClientTransport`. When set, the SSE stream
reader cancels itself if no chunk arrives within the configured window, and the existing
disconnect/reconnect path runs (same as a network drop). The timer resets on every chunk,
so this is a per-chunk inactivity timeout, not a total stream lifetime.

Useful for half-open TCP connections, stalled servers, and proxies that go silent without
closing the socket — without it, `reader.read()` blocks indefinitely and the agent hangs.
Defaults to undefined (no timeout, same as today). Closes #1883.
48 changes: 48 additions & 0 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ export type StreamableHTTPClientTransportOptions = {
* handshake so the reconnected transport continues sending the required header.
*/
protocolVersion?: string;

/**
* Idle timeout for the SSE stream reader, in milliseconds.
*
* If no chunk is received from the server within this window, the reader is cancelled and
* the stream's normal disconnect/reconnect path runs (same as a network drop). The timer
* resets on every chunk arrival, so this is a per-chunk inactivity timeout, not a total
* stream lifetime.
*
* Useful for half-open TCP connections, stalled servers, and proxies that go silent
* without closing the socket. Without this option, `reader.read()` blocks indefinitely
* when the stream stalls.
*
* Defaults to undefined (no idle timeout — preserves existing behavior).
*/
idleTimeoutMs?: number;
};

/**
Expand All @@ -184,6 +200,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private readonly _reconnectionScheduler?: ReconnectionScheduler;
private _cancelReconnection?: () => void;
private readonly _idleTimeoutMs?: number; // Per-chunk inactivity timeout for SSE reader; opt-in.

onclose?: () => void;
onerror?: (error: Error) => void;
Expand All @@ -206,6 +223,12 @@ export class StreamableHTTPClientTransport implements Transport {
this._protocolVersion = opts?.protocolVersion;
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
this._reconnectionScheduler = opts?.reconnectionScheduler;
if (opts?.idleTimeoutMs !== undefined) {
if (!Number.isFinite(opts.idleTimeoutMs) || opts.idleTimeoutMs <= 0) {
throw new Error(`idleTimeoutMs must be a positive finite number; got ${opts.idleTimeoutMs}`);
}
this._idleTimeoutMs = opts.idleTimeoutMs;
}
}

private async _commonHeaders(): Promise<Headers> {
Expand Down Expand Up @@ -376,7 +399,11 @@ export class StreamableHTTPClientTransport implements Transport {
// Track whether we've received a response - if so, no need to reconnect
// Reconnection is for when server disconnects BEFORE sending response
let receivedResponse = false;
const idleTimeoutMs = this._idleTimeoutMs;
const processStream = async () => {
// Track the idle timer outside the try so we can clear it from the catch and
// the post-loop block. Reassigned on every chunk arrival.
let idleTimer: ReturnType<typeof setTimeout> | undefined;
// this is the closest we can get to trying to catch network errors
// if something happens reader will throw
try {
Expand All @@ -393,8 +420,24 @@ export class StreamableHTTPClientTransport implements Transport {
)
.getReader();

// Per-chunk inactivity timer. Cancelling the reader makes the next
// `await reader.read()` reject, which falls through to the catch
// block below — same path as a network drop.
const armIdleTimer = (): void => {
if (idleTimeoutMs === undefined) return;
if (idleTimer !== undefined) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
idleTimer = undefined;
// `cancel()` on a ReadableStreamDefaultReader rejects pending reads.
reader.cancel(new Error(`SSE idle timeout after ${idleTimeoutMs}ms`)).catch(() => {});
}, idleTimeoutMs);
};
armIdleTimer();

while (true) {
const { value: event, done } = await reader.read();
// Reset the idle timer on every chunk (data, priming, or close).
armIdleTimer();
if (done) {
break;
}
Expand Down Expand Up @@ -470,6 +513,11 @@ export class StreamableHTTPClientTransport implements Transport {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
}
}
} finally {
if (idleTimer !== undefined) {
clearTimeout(idleTimer);
idleTimer = undefined;
}
}
};
processStream();
Expand Down
132 changes: 132 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2017,4 +2017,136 @@ describe('StreamableHTTPClientTransport', () => {
expect(onclose).toHaveBeenCalledTimes(1);
});
});

describe('idleTimeoutMs', () => {
it('rejects non-positive or non-finite values', () => {
expect(
() => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: 0 })
).toThrow(/positive finite/);
expect(
() => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: -1 })
).toThrow(/positive finite/);
expect(
() => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: Infinity })
).toThrow(/positive finite/);
expect(
() => new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { idleTimeoutMs: Number.NaN })
).toThrow(/positive finite/);
});

it('defaults to no idle timeout (preserves existing behavior)', async () => {
// A stream that never enqueues anything should not trigger any error
// when no idleTimeoutMs is set — the reader simply waits.
const stream = new ReadableStream<Uint8Array>({
start() {
/* never enqueue, never close */
}
});

(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: stream
});

const onerror = vi.fn();
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'));
transport.onerror = onerror;

await transport.start();
await transport['_startOrAuthSse']({});

// Wait long enough that any default timeout would have fired.
await new Promise(resolve => setTimeout(resolve, 100));
expect(onerror).not.toHaveBeenCalled();
});

it('cancels the reader and fires onerror when no chunk arrives within idleTimeoutMs', async () => {
// Mock SSE source that never sends data
const stream = new ReadableStream<Uint8Array>({
start() {
/* never enqueue, never close */
}
});

(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: stream
});

const onerror = vi.fn();
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
idleTimeoutMs: 50,
// Avoid kicking off a reconnect loop in this test — we only care about the timeout firing.
reconnectionOptions: {
initialReconnectionDelay: 1000,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1,
maxRetries: 0
}
});
transport.onerror = onerror;

await transport.start();
await transport['_startOrAuthSse']({});

// Wait past the idle timeout window
await new Promise(resolve => setTimeout(resolve, 150));

expect(onerror).toHaveBeenCalled();
const errMsg = (onerror.mock.calls[0]?.[0] as Error).message;
expect(errMsg).toMatch(/SSE stream disconnected/);
expect(errMsg).toMatch(/SSE idle timeout after 50ms/);
});

it('does not fire when chunks keep arriving within the window', async () => {
// Stream that emits a chunk every 30ms; idle timeout is 100ms — should never fire.
const encoder = new TextEncoder();
let cancelled = false;
const stream = new ReadableStream<Uint8Array>({
start(controller) {
let count = 0;
const tick = (): void => {
if (cancelled || count >= 4) return;
controller.enqueue(
encoder.encode(`event: message\ndata: {"jsonrpc":"2.0","method":"ping","params":{}}\n\n`)
);
count++;
setTimeout(tick, 30);
};
tick();
},
cancel() {
cancelled = true;
}
});

(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: stream
});

const onerror = vi.fn();
const onmessage = vi.fn();
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
idleTimeoutMs: 100
});
transport.onerror = onerror;
transport.onmessage = onmessage;

await transport.start();
await transport['_startOrAuthSse']({});

// Let all 4 chunks deliver (~120ms) plus a small buffer
await new Promise(resolve => setTimeout(resolve, 200));

expect(onmessage).toHaveBeenCalled();
expect(onerror).not.toHaveBeenCalled();
});
});
});
Loading