From 1a2d4c4452ae079e4e38bbf36b40bb0e8aa33ec0 Mon Sep 17 00:00:00 2001 From: Mukunda Rao Katta Date: Sun, 26 Apr 2026 14:46:58 -0700 Subject: [PATCH 1/2] feat(client): honor Retry-After on HTTP 429 responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StreamableHTTPClientTransport now treats 429 Too Many Requests as a retryable signal. When the server returns 429 the transport parses the Retry-After header (delta-seconds or HTTP-date per RFC 7231 §7.1.3), waits for the indicated duration (clamped to a configurable ceiling), and retries the original POST/GET. After a configurable number of consecutive 429 responses the transport surfaces a typed SdkErrorCode.ClientHttpRateLimited so callers can react. Behaviour is configurable via the new rateLimitOptions transport option. Setting maxRetries to 0 disables automatic retries entirely for applications that want to handle rate limiting themselves. Closes #1892. --- packages/client/src/client/streamableHttp.ts | 190 ++++++++++++++++- .../client/test/client/streamableHttp.test.ts | 191 ++++++++++++++++++ packages/core/src/errors/sdkErrors.ts | 4 +- 3 files changed, 377 insertions(+), 8 deletions(-) diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index cd643c96d..71dd33c83 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -25,6 +25,17 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp maxRetries: 2 }; +// Default behaviour when the server responds with HTTP 429 Too Many Requests. +// `maxRetries` keeps total wait time bounded; `maxRetryAfterMs` caps any +// individual `Retry-After` value so a malicious or misconfigured server cannot +// make the client sleep for hours. `defaultRetryAfterMs` is used when the 429 +// response omits `Retry-After` entirely. +const DEFAULT_STREAMABLE_HTTP_RATE_LIMIT_OPTIONS: StreamableHTTPRateLimitOptions = { + maxRetries: 3, + defaultRetryAfterMs: 1_000, + maxRetryAfterMs: 60_000 +}; + /** * Options for starting or authenticating an SSE connection */ @@ -79,6 +90,42 @@ export interface StreamableHTTPReconnectionOptions { maxRetries: number; } +/** + * Configuration options controlling how the {@linkcode StreamableHTTPClientTransport} + * reacts to HTTP `429 Too Many Requests` responses. + * + * On 429 the transport waits for the duration indicated by the response's + * `Retry-After` header (delta-seconds or HTTP-date per RFC 7231 §7.1.3) and + * then retries the original request. If the header is missing, malformed, or + * exceeds {@linkcode maxRetryAfterMs}, the transport falls back to + * {@linkcode defaultRetryAfterMs}. Once {@linkcode maxRetries} consecutive + * 429 responses have been received the transport throws + * {@linkcode SdkErrorCode.ClientHttpRateLimited}. + * + * Pass `{ maxRetries: 0 }` to disable automatic 429 retries entirely (useful + * if the application has its own rate-limit handling). + */ +export interface StreamableHTTPRateLimitOptions { + /** + * Maximum number of automatic retries after consecutive 429 responses. + * Set to 0 to disable retrying. Default is 3. + */ + maxRetries: number; + + /** + * Delay in milliseconds to use when the 429 response omits or has an + * unparsable `Retry-After` header. Default is 1000 (1 second). + */ + defaultRetryAfterMs: number; + + /** + * Upper bound, in milliseconds, on any single retry delay. If the server + * provides a larger `Retry-After` value, it is clamped to this. Default + * is 60000 (60 seconds). + */ + maxRetryAfterMs: number; +} + /** * Custom scheduler for SSE stream reconnection attempts. * @@ -142,6 +189,13 @@ export type StreamableHTTPClientTransportOptions = { */ reconnectionOptions?: StreamableHTTPReconnectionOptions; + /** + * Options controlling how the transport reacts to HTTP `429 Too Many + * Requests` responses (Retry-After parsing, retry caps). + * See {@linkcode StreamableHTTPRateLimitOptions}. + */ + rateLimitOptions?: StreamableHTTPRateLimitOptions; + /** * Custom scheduler for reconnection attempts. If not provided, `setTimeout` is used. * See {@linkcode ReconnectionScheduler}. @@ -179,6 +233,7 @@ export class StreamableHTTPClientTransport implements Transport { private _fetchWithInit: FetchLike; private _sessionId?: string; private _reconnectionOptions: StreamableHTTPReconnectionOptions; + private _rateLimitOptions: StreamableHTTPRateLimitOptions; private _protocolVersion?: string; private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field @@ -205,6 +260,7 @@ export class StreamableHTTPClientTransport implements Transport { this._sessionId = opts?.sessionId; this._protocolVersion = opts?.protocolVersion; this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; + this._rateLimitOptions = opts?.rateLimitOptions ?? DEFAULT_STREAMABLE_HTTP_RATE_LIMIT_OPTIONS; this._reconnectionScheduler = opts?.reconnectionScheduler; } @@ -230,6 +286,80 @@ export class StreamableHTTPClientTransport implements Transport { }); } + /** + * Parses an HTTP `Retry-After` header value (RFC 7231 §7.1.3). Accepts + * either a non-negative integer of seconds or an HTTP-date. + * + * @returns The retry delay in milliseconds, or `undefined` if the value + * is missing/unparseable or refers to a moment in the past. + */ + private static _parseRetryAfter(headerValue: string | null | undefined): number | undefined { + if (headerValue == null) { + return undefined; + } + const trimmed = headerValue.trim(); + if (trimmed.length === 0) { + return undefined; + } + // delta-seconds — the spec only requires non-negative integers, but we + // accept fractional seconds too because some servers emit them. + if (/^\d+(?:\.\d+)?$/.test(trimmed)) { + const seconds = Number(trimmed); + if (Number.isFinite(seconds) && seconds >= 0) { + return Math.round(seconds * 1000); + } + return undefined; + } + // HTTP-date — fall back to Date parsing. `Date.parse` is locale-tolerant + // enough for the IMF-fixdate / RFC 850 / asctime variants required by + // RFC 7231 in practice. + const dateMs = Date.parse(trimmed); + if (!Number.isFinite(dateMs)) { + return undefined; + } + const delta = dateMs - Date.now(); + return delta > 0 ? delta : 0; + } + + /** + * Inspects a 429 response and returns the delay (in ms) the client should + * wait before retrying, applying configured caps and fallbacks. Returns + * `null` if retries are disabled or exhausted. + */ + private _getRateLimitRetryDelay(response: Response, attempt: number): number | null { + const { maxRetries, defaultRetryAfterMs, maxRetryAfterMs } = this._rateLimitOptions; + if (maxRetries <= 0 || attempt >= maxRetries) { + return null; + } + const headerDelay = StreamableHTTPClientTransport._parseRetryAfter(response.headers.get('retry-after')); + const delay = headerDelay ?? defaultRetryAfterMs; + return Math.min(Math.max(delay, 0), maxRetryAfterMs); + } + + /** + * Sleep for `ms` milliseconds, aborting early if the transport's + * AbortController fires. The returned promise rejects with the abort + * reason in that case so the surrounding fetch loop bails out cleanly. + */ + private _sleepWithAbort(ms: number): Promise { + const signal = this._abortController?.signal; + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error('Aborted')); + return; + } + const timer = setTimeout(() => { + signal?.removeEventListener('abort', onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(timer); + reject(signal?.reason ?? new Error('Aborted')); + }; + signal?.addEventListener('abort', onAbort, { once: true }); + }); + } + private async _startOrAuthSse(options: StartSSEOptions, isAuthRetry = false): Promise { const { resumptionToken } = options; @@ -246,12 +376,36 @@ export class StreamableHTTPClientTransport implements Transport { headers.set('last-event-id', resumptionToken); } - const response = await (this._fetch ?? fetch)(this._url, { - ...this._requestInit, - method: 'GET', - headers, - signal: this._abortController?.signal - }); + // Issue the GET, retrying on 429 honouring `Retry-After`. All other + // status handling stays in the existing branches below. + const doGet = () => + (this._fetch ?? fetch)(this._url, { + ...this._requestInit, + method: 'GET', + headers, + signal: this._abortController?.signal + }); + let response = await doGet(); + let rateLimitAttempt = 0; + while (response.status === 429) { + const delay = this._getRateLimitRetryDelay(response, rateLimitAttempt); + if (delay === null) { + // Retries exhausted (or disabled) — surface as a typed error. + await response.text?.().catch(() => {}); + throw new SdkError( + SdkErrorCode.ClientHttpRateLimited, + `Server returned 429 after ${rateLimitAttempt} retr${rateLimitAttempt === 1 ? 'y' : 'ies'}`, + { + status: 429, + retryAfter: response.headers.get('retry-after') + } + ); + } + await response.text?.().catch(() => {}); + rateLimitAttempt += 1; + await this._sleepWithAbort(delay); + response = await doGet(); + } if (!response.ok) { if (response.status === 401 && this._authProvider) { @@ -552,7 +706,29 @@ export class StreamableHTTPClientTransport implements Transport { signal: this._abortController?.signal }; - const response = await (this._fetch ?? fetch)(this._url, init); + // Retry POST on 429 honouring `Retry-After`; non-429 responses fall + // through to the existing branches below for normal handling. + let response = await (this._fetch ?? fetch)(this._url, init); + let rateLimitAttempt = 0; + while (response.status === 429) { + const delay = this._getRateLimitRetryDelay(response, rateLimitAttempt); + if (delay === null) { + const text = await response.text?.().catch(() => null); + throw new SdkError( + SdkErrorCode.ClientHttpRateLimited, + `Server returned 429 after ${rateLimitAttempt} retr${rateLimitAttempt === 1 ? 'y' : 'ies'}`, + { + status: 429, + retryAfter: response.headers.get('retry-after'), + text + } + ); + } + await response.text?.().catch(() => {}); + rateLimitAttempt += 1; + await this._sleepWithAbort(delay); + response = await (this._fetch ?? fetch)(this._url, init); + } // Handle session ID received during initialization const sessionId = response.headers.get('mcp-session-id'); diff --git a/packages/client/test/client/streamableHttp.test.ts b/packages/client/test/client/streamableHttp.test.ts index b2138b3fa..f87faec19 100644 --- a/packages/client/test/client/streamableHttp.test.ts +++ b/packages/client/test/client/streamableHttp.test.ts @@ -2017,4 +2017,195 @@ describe('StreamableHTTPClientTransport', () => { expect(onclose).toHaveBeenCalledTimes(1); }); }); + + describe('HTTP 429 rate-limit handling', () => { + let rateLimitTransport: StreamableHTTPClientTransport; + + // Use fake timers so the transport's Retry-After sleep doesn't block the test. + beforeEach(() => vi.useFakeTimers()); + afterEach(async () => { + vi.useRealTimers(); + await rateLimitTransport?.close().catch(() => {}); + }); + + const message: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'test', + params: {}, + id: 'test-id' + }; + + const okResponse = (overrides: Partial<{ status: number; contentType: string; body: unknown }> = {}) => ({ + ok: true, + status: overrides.status ?? 202, + headers: new Headers(overrides.contentType ? { 'content-type': overrides.contentType } : {}), + text: () => Promise.resolve(''), + json: () => Promise.resolve(overrides.body ?? null) + }); + + const rateLimitedResponse = (retryAfter?: string) => ({ + ok: false, + status: 429, + statusText: 'Too Many Requests', + text: () => Promise.resolve('rate limited'), + headers: new Headers(retryAfter !== undefined ? { 'retry-after': retryAfter } : {}) + }); + + it('honours numeric Retry-After on 429 and retries after the indicated delay', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 3, defaultRetryAfterMs: 10, maxRetryAfterMs: 60_000 } + }); + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce(rateLimitedResponse('2')); + fetchMock.mockResolvedValueOnce(okResponse({ status: 202 })); + + const sendPromise = rateLimitTransport.send(message); + // Drain microtasks so the first fetch resolves and the sleep begins. + await vi.advanceTimersByTimeAsync(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Half-way through the wait we still should not have retried. + await vi.advanceTimersByTimeAsync(1_000); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Once the full Retry-After has elapsed the retry should fire. + await vi.advanceTimersByTimeAsync(1_000); + await sendPromise; + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('parses HTTP-date Retry-After values', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 3, defaultRetryAfterMs: 10, maxRetryAfterMs: 60_000 } + }); + + const baseNow = new Date('2026-01-01T00:00:00Z').getTime(); + vi.setSystemTime(baseNow); + const httpDate = new Date(baseNow + 3_000).toUTCString(); + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce(rateLimitedResponse(httpDate)); + fetchMock.mockResolvedValueOnce(okResponse({ status: 202 })); + + const sendPromise = rateLimitTransport.send(message); + await vi.advanceTimersByTimeAsync(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(2_999); + expect(fetchMock).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(2); + await sendPromise; + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('falls back to defaultRetryAfterMs when Retry-After is absent', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 3, defaultRetryAfterMs: 250, maxRetryAfterMs: 60_000 } + }); + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce(rateLimitedResponse(undefined)); + fetchMock.mockResolvedValueOnce(okResponse({ status: 202 })); + + const sendPromise = rateLimitTransport.send(message); + await vi.advanceTimersByTimeAsync(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Below the fallback delay nothing should retry yet. + await vi.advanceTimersByTimeAsync(100); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // After the fallback delay elapses the retry fires. + await vi.advanceTimersByTimeAsync(200); + await sendPromise; + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('clamps oversize Retry-After values to maxRetryAfterMs', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 3, defaultRetryAfterMs: 10, maxRetryAfterMs: 50 } + }); + + const fetchMock = globalThis.fetch as Mock; + // Server asks for an hour — we should not actually wait that long. + fetchMock.mockResolvedValueOnce(rateLimitedResponse('3600')); + fetchMock.mockResolvedValueOnce(okResponse({ status: 202 })); + + const sendPromise = rateLimitTransport.send(message); + await vi.advanceTimersByTimeAsync(0); + expect(fetchMock).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(60); + await sendPromise; + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('throws ClientHttpRateLimited after maxRetries consecutive 429 responses', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 3, defaultRetryAfterMs: 10, maxRetryAfterMs: 60_000 } + }); + + const fetchMock = globalThis.fetch as Mock; + // 4 consecutive 429s: 1 initial + 3 retries, then the throw. + for (let i = 0; i < 4; i += 1) { + fetchMock.mockResolvedValueOnce(rateLimitedResponse('0')); + } + const errorSpy = vi.fn(); + rateLimitTransport.onerror = errorSpy; + + const sendPromise = rateLimitTransport.send(message); + // Drive each Retry-After=0 sleep through the queue. + for (let i = 0; i < 5; i += 1) { + await vi.advanceTimersByTimeAsync(10); + } + + await expect(sendPromise).rejects.toMatchObject({ + code: SdkErrorCode.ClientHttpRateLimited + }); + expect(fetchMock).toHaveBeenCalledTimes(4); + expect(errorSpy).toHaveBeenCalled(); + }); + + it('does not retry when maxRetries is 0', async () => { + rateLimitTransport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + rateLimitOptions: { maxRetries: 0, defaultRetryAfterMs: 10, maxRetryAfterMs: 60_000 } + }); + + const fetchMock = globalThis.fetch as Mock; + fetchMock.mockResolvedValueOnce(rateLimitedResponse('1')); + + const errorSpy = vi.fn(); + rateLimitTransport.onerror = errorSpy; + + await expect(rateLimitTransport.send(message)).rejects.toMatchObject({ + code: SdkErrorCode.ClientHttpRateLimited + }); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('parses Retry-After in static helper across formats', () => { + const parse = (StreamableHTTPClientTransport as unknown as Record number | undefined>)[ + '_parseRetryAfter' + ]; + + expect(parse(null)).toBeUndefined(); + expect(parse(undefined)).toBeUndefined(); + expect(parse('')).toBeUndefined(); + expect(parse('not-a-number')).toBeUndefined(); + expect(parse('5')).toBe(5_000); + expect(parse(' 7 ')).toBe(7_000); + expect(parse('0')).toBe(0); + expect(parse('1.5')).toBe(1_500); + + const baseNow = new Date('2026-01-01T00:00:00Z').getTime(); + vi.setSystemTime(baseNow); + const future = new Date(baseNow + 4_000).toUTCString(); + const past = new Date(baseNow - 4_000).toUTCString(); + expect(parse(future)).toBe(4_000); + // Past dates collapse to a zero-delay retry rather than a negative wait. + expect(parse(past)).toBe(0); + }); + }); }); diff --git a/packages/core/src/errors/sdkErrors.ts b/packages/core/src/errors/sdkErrors.ts index f53c07ccf..4ef7eab61 100644 --- a/packages/core/src/errors/sdkErrors.ts +++ b/packages/core/src/errors/sdkErrors.ts @@ -33,7 +33,9 @@ export enum SdkErrorCode { ClientHttpForbidden = 'CLIENT_HTTP_FORBIDDEN', ClientHttpUnexpectedContent = 'CLIENT_HTTP_UNEXPECTED_CONTENT', ClientHttpFailedToOpenStream = 'CLIENT_HTTP_FAILED_TO_OPEN_STREAM', - ClientHttpFailedToTerminateSession = 'CLIENT_HTTP_FAILED_TO_TERMINATE_SESSION' + ClientHttpFailedToTerminateSession = 'CLIENT_HTTP_FAILED_TO_TERMINATE_SESSION', + /** Server returned 429 Too Many Requests and the client exhausted its retries. */ + ClientHttpRateLimited = 'CLIENT_HTTP_RATE_LIMITED' } /** From 0d047ed73da6fbd9bb7dc8465d6f97f450267562 Mon Sep 17 00:00:00 2001 From: Mukunda Rao Katta Date: Sun, 26 Apr 2026 18:31:21 -0700 Subject: [PATCH 2/2] chore: add changeset for Retry-After 429 handling --- .changeset/honor-retry-after-429.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/honor-retry-after-429.md diff --git a/.changeset/honor-retry-after-429.md b/.changeset/honor-retry-after-429.md new file mode 100644 index 000000000..ab681aed4 --- /dev/null +++ b/.changeset/honor-retry-after-429.md @@ -0,0 +1,6 @@ +--- +'@modelcontextprotocol/client': minor +'@modelcontextprotocol/core': patch +--- + +Honor `Retry-After` on HTTP 429 responses in `StreamableHTTPClientTransport`. Both POST and GET paths now parse `Retry-After` (delta-seconds and HTTP-date per RFC 7231 §7.1.3), sleep for the indicated duration, and retry up to a configurable max. New `rateLimitOptions` transport option exposes `maxRetries` (default 3), `defaultRetryAfterMs` (default 1s) for missing/garbage headers, and `maxRetryAfterMs` (default 60s) cap. Sleep honors the existing `AbortController` so `transport.close()` cancels in-flight waits. After the cap is hit, throws a new typed `SdkErrorCode.ClientHttpRateLimited` with the parsed `Retry-After` value attached to `error.data`.