diff --git a/.changeset/progress-race-abort-reason.md b/.changeset/progress-race-abort-reason.md new file mode 100644 index 000000000..117d26712 --- /dev/null +++ b/.changeset/progress-race-abort-reason.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/core-internal': patch +--- + +Fix a client-side race where `notifications/progress` emitted immediately before a response could surface as a spurious "unknown progress token" `onerror`: notification handlers now dispatch synchronously in arrival order (matching responses), and progress for a request that has already settled is dropped silently — never-issued tokens still error. A new `ProtocolOptions.onLateProgress` hook lets callers observe the dropped late notifications. Also: aborts (pre-dispatch and in-flight) now carry the original `signal.reason` at `SdkError.data.reason` in addition to the stringified message, so callers can recover the abort reason structurally. diff --git a/packages/core-internal/src/shared/protocol.ts b/packages/core-internal/src/shared/protocol.ts index 08cc10047..89cd96ad3 100644 --- a/packages/core-internal/src/shared/protocol.ts +++ b/packages/core-internal/src/shared/protocol.ts @@ -87,8 +87,29 @@ export type ProtocolOptions = { * e.g., `['notifications/tools/list_changed']` */ debouncedNotificationMethods?: string[]; + + /** + * Optional hook invoked when a `notifications/progress` arrives for a + * progress token whose request has already settled (response received, + * cancelled, timed out, or send-failed). This is a benign race — the + * peer emitted progress immediately before the response and the two were + * delivered out of order or in the same chunk — so by default such late + * progress is dropped silently rather than surfaced via `onerror`. + * Provide this hook to observe the dropped notifications. Progress for a + * token that was never issued at all still surfaces via `onerror`. + */ + onLateProgress?: (notification: ProgressNotification) => void; }; +/** + * Bound on the recently-retired progress-token tombstone set. FIFO-evicted at + * this cap, so a long-running connection cannot accumulate unbounded state. 64 + * comfortably covers the realistic in-flight-requests-with-progress window; a + * late progress for a token evicted from the set degrades to the unknown-token + * `onerror` it would have produced before the tombstone set existed. + */ +const RETIRED_PROGRESS_TOKEN_CAP = 64; + /** * The default request timeout, in milliseconds. */ @@ -522,6 +543,14 @@ export abstract class Protocol { private _notificationHandlers: Map Promise> = new Map(); private _responseHandlers: Map void> = new Map(); private _progressHandlers: Map = new Map(); + /** + * Progress tokens whose handler has been retired (response, cancel, + * max-timeout, or send-failure). Late `notifications/progress` for these + * tokens is a benign delivery race, not an unknown-token protocol error; + * dropped silently (or surfaced via {@linkcode ProtocolOptions.onLateProgress}). + * Bounded at {@linkcode RETIRED_PROGRESS_TOKEN_CAP} with FIFO eviction. + */ + private _retiredProgressTokens: Set = new Set(); private _timeoutInfo: Map = new Map(); private _pendingDebouncedNotifications = new Set(); @@ -734,6 +763,24 @@ export abstract class Protocol { } } + /** + * Retire a progress handler and record its token in the bounded tombstone + * set so late `notifications/progress` for this request is recognized as a + * benign race (dropped or routed to `onLateProgress`) instead of surfaced + * as an unknown-token `onerror`. Only tombstones tokens that were actually + * issued (`onprogress` was supplied for the request); a request that never + * carried a `progressToken` stays a real unknown-token case. + */ + private _retireProgressHandler(messageId: number): void { + if (!this._progressHandlers.delete(messageId)) return; + this._retiredProgressTokens.add(messageId); + if (this._retiredProgressTokens.size > RETIRED_PROGRESS_TOKEN_CAP) { + // FIFO evict — Set iteration order is insertion order. + const oldest = this._retiredProgressTokens.values().next().value; + if (oldest !== undefined) this._retiredProgressTokens.delete(oldest); + } + } + /** * Attaches to the given transport, starts it, and starts listening for messages. * @@ -785,6 +832,7 @@ export abstract class Protocol { const responseHandlers = this._responseHandlers; this._responseHandlers = new Map(); this._progressHandlers.clear(); + this._retiredProgressTokens.clear(); this._pendingDebouncedNotifications.clear(); for (const info of this._timeoutInfo.values()) { @@ -877,10 +925,22 @@ export abstract class Protocol { return; } - // Starting with Promise.resolve() puts any synchronous errors into the monad as well. - Promise.resolve() - .then(() => (handler === undefined ? fallback!(notification) : handler(notification, codec))) - .catch(error => this._onerror(new Error(`Uncaught error in notification handler: ${error}`))); + // Dispatch SYNCHRONOUSLY in arrival order. Responses are dispatched + // synchronously (`_onresponse`), so deferring notification handlers + // by a microtask (the previous `Promise.resolve().then(handler)` + // shape) lets a same-chunk [progress…, result] pair retire the + // progress handler before the trailing progress runs — surfacing as + // a spurious unknown-token `onerror` (#166). The defer existed only + // to capture synchronous throws; an explicit try/catch keeps that + // guarantee while restoring arrival order. Asynchronous rejections + // from the handler still surface via `onerror`. + try { + Promise.resolve(handler === undefined ? fallback!(notification) : handler(notification, codec)).catch(error => + this._onerror(new Error(`Uncaught error in notification handler: ${error}`)) + ); + } catch (error) { + this._onerror(new Error(`Uncaught error in notification handler: ${error}`)); + } } private _onrequest(rawRequest: JSONRPCRequest, extra?: MessageExtraInfo): void { @@ -1120,6 +1180,15 @@ export abstract class Protocol { const handler = this._progressHandlers.get(messageId); if (!handler) { + // Late progress for a request that has already settled is a + // benign delivery race (the peer emitted progress immediately + // before the response): drop silently, or surface via the + // optional `onLateProgress` hook. A token that was never issued + // — never in the tombstone set — stays a real protocol error. + if (this._retiredProgressTokens.has(messageId)) { + this._options?.onLateProgress?.(notification); + return; + } this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`)); return; } @@ -1133,7 +1202,7 @@ export abstract class Protocol { } catch (error) { // Clean up if maxTotalTimeout was exceeded this._responseHandlers.delete(messageId); - this._progressHandlers.delete(messageId); + this._retireProgressHandler(messageId); this._cleanupTimeout(messageId); responseHandler(error as Error); return; @@ -1160,7 +1229,7 @@ export abstract class Protocol { this._responseHandlers.delete(messageId); this._cleanupTimeout(messageId); - this._progressHandlers.delete(messageId); + this._retireProgressHandler(messageId); if (isJSONRPCResultResponse(response)) { handler(response); @@ -1352,11 +1421,14 @@ export abstract class Protocol { // in-flight abort does (`SdkError(RequestTimeout, reason)` via // `cancel()` below). Bare `throwIfAborted()` would propagate the // raw `signal.reason` instead, so callers that introduce an async - // hop before `request()` (e.g. a cache freshness check) would see - // a different rejection type depending on where the abort lands. + // hop before `request()` (v2's `callTool()` cache/mirroring + // preamble) would see a different rejection type depending on + // where the abort lands. The original reason is carried at + // `.data.reason` so callers can recover it structurally rather + // than parsing the stringified message (#159). if (options?.signal?.aborted) { const reason = options.signal.reason; - throw reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason)); + throw reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason), { reason }); } // Spec basic/patterns/cancellation §Transport-Specific (2026-07-28): @@ -1403,7 +1475,7 @@ export abstract class Protocol { if (responseReceived) { return; } - this._progressHandlers.delete(messageId); + this._retireProgressHandler(messageId); if (requestAbort === undefined) { this._transport @@ -1428,8 +1500,10 @@ export abstract class Protocol { requestAbort.abort(); } - // Wrap the reason in an SdkError if it isn't already - const error = reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason)); + // Wrap the reason in an SdkError if it isn't already; carry + // the original at `.data.reason` so it is structurally + // recoverable (parity with the pre-aborted-signal path). + const error = reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason), { reason }); reject(error); }; @@ -1514,7 +1588,7 @@ export abstract class Protocol { this._transport .send(outbound, { relatedRequestId, resumptionToken, onresumptiontoken, headers, requestSignal: requestAbort?.signal }) .catch(error => { - this._progressHandlers.delete(messageId); + this._retireProgressHandler(messageId); reject(error); }); }).finally(() => { diff --git a/packages/core-internal/test/shared/protocol.test.ts b/packages/core-internal/test/shared/protocol.test.ts index 2ecdc40ad..0a17bb723 100644 --- a/packages/core-internal/test/shared/protocol.test.ts +++ b/packages/core-internal/test/shared/protocol.test.ts @@ -912,6 +912,128 @@ describe('protocol tests', () => { expect(cancelledSent(tx.sent)).toHaveLength(0); }); }); + + // #166 — same-chunk progress+result must not surface as an unknown-token + // onerror; #159 — pre-dispatch aborts must carry the caller's reason. + describe('progress-notification race + abort-reason preservation', () => { + test('tight progress loop + immediate result delivers all progress in arrival order, no onerror', async () => { + await protocol.connect(transport); + const onerror = vi.fn(); + protocol.onerror = onerror; + const onprogress = vi.fn(); + + const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({ ok: z.boolean() }), { onprogress }); + + // Reproduces the chat-cli race: server emits N progress + // notifications and the result back-to-back in the same tick. + for (let i = 1; i <= 5; i++) { + transport.onmessage?.({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: { progressToken: 0, progress: i, total: 5 } + }); + } + transport.onmessage?.({ jsonrpc: '2.0', id: 0, result: { ok: true } }); + + await expect(pending).resolves.toEqual({ ok: true }); + expect(onprogress).toHaveBeenCalledTimes(5); + expect(onprogress).toHaveBeenNthCalledWith(1, { progress: 1, total: 5 }); + expect(onprogress).toHaveBeenNthCalledWith(5, { progress: 5, total: 5 }); + expect(onerror).not.toHaveBeenCalled(); + }); + + test('late progress after a settled request is dropped silently (tombstoned), never-issued tokens still error', async () => { + const onLateProgress = vi.fn(); + const proto = new TestProtocolImpl({ onLateProgress }); + const tx = new MockTransport(); + await proto.connect(tx); + const onerror = vi.fn(); + proto.onerror = onerror; + + const pending = testRequest(proto, { method: 'example', params: {} }, z.object({ ok: z.boolean() }), { onprogress: vi.fn() }); + tx.onmessage?.({ jsonrpc: '2.0', id: 0, result: { ok: true } }); + await expect(pending).resolves.toEqual({ ok: true }); + + // Late progress for the now-settled request: tombstoned → onLateProgress, no onerror. + tx.onmessage?.({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: { progressToken: 0, progress: 1, total: 1 } + }); + expect(onerror).not.toHaveBeenCalled(); + expect(onLateProgress).toHaveBeenCalledTimes(1); + expect(onLateProgress).toHaveBeenCalledWith(expect.objectContaining({ params: expect.objectContaining({ progressToken: 0 }) })); + + // A token that was never issued still surfaces as the unknown-token onerror. + tx.onmessage?.({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: { progressToken: 999, progress: 1, total: 1 } + }); + expect(onerror).toHaveBeenCalledTimes(1); + expect(onerror).toHaveBeenCalledWith(expect.objectContaining({ message: expect.stringContaining('unknown token') })); + }); + + test('late progress after caller-signal cancel is tombstoned (no onerror)', async () => { + await protocol.connect(transport); + const onerror = vi.fn(); + protocol.onerror = onerror; + + const ac = new AbortController(); + const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({}), { + onprogress: vi.fn(), + signal: ac.signal + }); + ac.abort('user cancel'); + await expect(pending).rejects.toThrow(); + + // The chat-cli Ctrl-C reproduction: server keeps emitting progress + // after the client has cancelled. Tombstoned → no onerror noise. + transport.onmessage?.({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: { progressToken: 0, progress: 1, total: 3 } + }); + // The cancel path also sends a notifications/cancelled on the + // wire — onerror is reserved here for unknown-token noise only. + const unknownTokenErrors = onerror.mock.calls.filter(c => String(c[0]).includes('unknown token')); + expect(unknownTokenErrors).toHaveLength(0); + }); + + test('pre-aborted signal rejects with SdkError(RequestTimeout) carrying the abort reason (#159)', async () => { + await protocol.connect(transport); + const ac = new AbortController(); + const reason = new Error('operator abort'); + ac.abort(reason); + + const err = await testRequest(protocol, { method: 'example', params: {} }, z.object({}), { signal: ac.signal }).catch( + (e: unknown) => e + ); + + expect(err).toBeInstanceOf(SdkError); + const sdkErr = err as SdkError; + expect(sdkErr.code).toBe(SdkErrorCode.RequestTimeout); + // Reason preserved both stringified in the message AND structurally at .data.reason. + expect(sdkErr.message).toContain('operator abort'); + expect((sdkErr.data as { reason?: unknown }).reason).toBe(reason); + // Nothing reached the wire on a pre-dispatch abort. + expect(sendSpy).not.toHaveBeenCalled(); + }); + + test('in-flight abort carries the reason at .data.reason (parity with the pre-dispatch path)', async () => { + await protocol.connect(transport); + const ac = new AbortController(); + const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({}), { signal: ac.signal }); + ac.abort('user cancel'); + + const err = await pending.catch((e: unknown) => e); + expect(err).toBeInstanceOf(SdkError); + const sdkErr = err as SdkError; + expect(sdkErr.code).toBe(SdkErrorCode.RequestTimeout); + expect(sdkErr.message).toBe('user cancel'); + expect((sdkErr.data as { reason?: unknown }).reason).toBe('user cancel'); + }); + }); }); // (2025-11 experimental test suites removed under SEP-2663; see git history.)