Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/progress-race-abort-reason.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/core-internal': patch
---

Fix a client-side race where `notifications/progress` emitted immediately before a response could surface as a spurious "unknown progress token" `onerror`: notification handlers now dispatch synchronously in arrival order (matching responses), and progress for a request that has already settled is dropped silently — never-issued tokens still error. A new `ProtocolOptions.onLateProgress` hook lets callers observe the dropped late notifications. Also: aborts (pre-dispatch and in-flight) now carry the original `signal.reason` at `SdkError.data.reason` in addition to the stringified message, so callers can recover the abort reason structurally.
100 changes: 87 additions & 13 deletions packages/core-internal/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,29 @@
* e.g., `['notifications/tools/list_changed']`
*/
debouncedNotificationMethods?: string[];

/**
* Optional hook invoked when a `notifications/progress` arrives for a
* progress token whose request has already settled (response received,
* cancelled, timed out, or send-failed). This is a benign race — the
* peer emitted progress immediately before the response and the two were
* delivered out of order or in the same chunk — so by default such late
* progress is dropped silently rather than surfaced via `onerror`.
* Provide this hook to observe the dropped notifications. Progress for a
* token that was never issued at all still surfaces via `onerror`.
*/
onLateProgress?: (notification: ProgressNotification) => void;

Check failure on line 101 in packages/core-internal/src/shared/protocol.ts

View check run for this annotation

Claude / Claude Code Review

onLateProgress: speculative public API with no concrete consumer and no prose docs

The new public `ProtocolOptions.onLateProgress` hook has no concrete consumer in this PR — only the new tests use it — and the core fix (tombstoning retired tokens + silently dropping late progress) works without it; per the repo's minimalism conventions, consider shipping the silent-drop fix without the hook or justifying it with a real consumer. If the hook (and the new silent-drop semantics) is kept, prose docs should be updated as well — docs/client.md's "Tracking progress" section doesn't m
Comment on lines +91 to +101

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The new public ProtocolOptions.onLateProgress hook has no concrete consumer in this PR — only the new tests use it — and the core fix (tombstoning retired tokens + silently dropping late progress) works without it; per the repo's minimalism conventions, consider shipping the silent-drop fix without the hook or justifying it with a real consumer. If the hook (and the new silent-drop semantics) is kept, prose docs should be updated as well — docs/client.md's "Tracking progress" section doesn't mention either; the PR adds only JSDoc and a changeset.

Extended reasoning...

What this is. The PR adds a new public option, ProtocolOptions.onLateProgress, alongside the actual fix for the progress-notification race. The fix itself has two independent parts: (1) notification handlers now dispatch synchronously in arrival order, and (2) progress tokens retired by a response/cancel/timeout/send-failure are tombstoned so a genuinely-late notifications/progress is dropped instead of surfacing as a spurious unknown-token onerror. Neither part requires the hook — it is purely an observability escape hatch for the dropped notifications.

No concrete consumer. A grep across the repo shows the only callers of onLateProgress are the new tests in protocol.test.ts; there is no production callsite, example, or cookbook usage anywhere in the diff. The repository's review conventions are explicit on this: burden of proof is on addition, every new export is intentional, and new abstractions have at least one concrete callsite in the PR. The hook fails all three as it stands.

Why this matters more than a typical option. ProtocolOptions is not internal-only surface: it is re-exported from packages/core-internal/src/exports/public/index.ts and both ClientOptions (packages/client/src/client/client.ts:160) and ServerOptions (packages/server/src/server/server.ts:72) extend it. So onLateProgress becomes user-facing public API on the Client and Server constructors the moment this merges, and removing it later is a breaking change. That asymmetry — easy to add now, hard to remove later — is exactly what the minimalism convention is guarding against.

The strongest argument for keeping it, and why it doesn't carry on its own. Before this PR, late progress surfaced via onerror, so the hook arguably preserves observability that the silent-drop change removes. But the PR's own framing (and the changeset) treats that onerror as spurious noise — that's the bug being fixed — and no real consumer asking to observe these dropped notifications is cited. A caller receiving the bare ProgressNotification couldn't act on it meaningfully anyway (the request has already settled). If a maintainer or downstream user has a concrete need, that justification belongs in the PR; absent one, the simpler shape is to ship the tombstone + silent drop without the hook.

Concrete walk-through. A user constructs new Client({...}, { onLateProgress: ... }) — this works today only because ClientOptions extends ProtocolOptions. Once published in a release, any later decision that the hook was unnecessary requires a deprecation cycle on the public client/server option surface, even though nothing in the SDK or its examples ever exercised it. Compare with shipping just the fix: the only externally visible change is that the spurious onerror stops firing for tombstoned tokens, which is the behavior the PR description advertises.

Documentation gap (if the hook stays). docs/client.md's "Tracking progress" section (~line 424) documents onprogress, resetTimeoutOnProgress, and maxTotalTimeout, and docs/server.md has a "Progress" section — neither mentions the new late-progress drop semantics or onLateProgress, and the PR adds only JSDoc and a changeset. The repo conventions ask for prose documentation (not just JSDoc) for new user-visible features. To be fair, no existing prose described the old unknown-token onerror behavior, so nothing in the docs is now incorrect — this is a missing-new-docs point, and it is contingent on the hook surviving the design question above. If the hook is dropped, a one-line note about the silent-drop behavior change is optional.

Suggested resolution. Either remove onLateProgress and ship the tombstone/silent-drop fix on its own (preferred per the conventions), or keep it with (a) a concrete consumer or stated real-world need and (b) a paragraph in the "Tracking progress" docs covering the new late-progress semantics and the hook.

};

/**
* Bound on the recently-retired progress-token tombstone set. FIFO-evicted at
* this cap, so a long-running connection cannot accumulate unbounded state. 64
* comfortably covers the realistic in-flight-requests-with-progress window; a
* late progress for a token evicted from the set degrades to the unknown-token
* `onerror` it would have produced before the tombstone set existed.
*/
const RETIRED_PROGRESS_TOKEN_CAP = 64;

/**
* The default request timeout, in milliseconds.
*/
Expand Down Expand Up @@ -522,6 +543,14 @@
private _notificationHandlers: Map<string, (notification: JSONRPCNotification, codec: WireCodec) => Promise<void>> = new Map();
private _responseHandlers: Map<number, (response: JSONRPCResultResponse | Error) => void> = new Map();
private _progressHandlers: Map<number, ProgressCallback> = new Map();
/**
* Progress tokens whose handler has been retired (response, cancel,
* max-timeout, or send-failure). Late `notifications/progress` for these
* tokens is a benign delivery race, not an unknown-token protocol error;
* dropped silently (or surfaced via {@linkcode ProtocolOptions.onLateProgress}).
* Bounded at {@linkcode RETIRED_PROGRESS_TOKEN_CAP} with FIFO eviction.
*/
private _retiredProgressTokens: Set<number> = new Set();
private _timeoutInfo: Map<number, TimeoutInfo> = new Map();
private _pendingDebouncedNotifications = new Set<string>();

Expand Down Expand Up @@ -734,6 +763,24 @@
}
}

/**
* Retire a progress handler and record its token in the bounded tombstone
* set so late `notifications/progress` for this request is recognized as a
* benign race (dropped or routed to `onLateProgress`) instead of surfaced
* as an unknown-token `onerror`. Only tombstones tokens that were actually
* issued (`onprogress` was supplied for the request); a request that never
* carried a `progressToken` stays a real unknown-token case.
*/
private _retireProgressHandler(messageId: number): void {
if (!this._progressHandlers.delete(messageId)) return;
this._retiredProgressTokens.add(messageId);
if (this._retiredProgressTokens.size > RETIRED_PROGRESS_TOKEN_CAP) {
// FIFO evict — Set iteration order is insertion order.
const oldest = this._retiredProgressTokens.values().next().value;
if (oldest !== undefined) this._retiredProgressTokens.delete(oldest);
}
}

/**
* Attaches to the given transport, starts it, and starts listening for messages.
*
Expand Down Expand Up @@ -785,6 +832,7 @@
const responseHandlers = this._responseHandlers;
this._responseHandlers = new Map();
this._progressHandlers.clear();
this._retiredProgressTokens.clear();
this._pendingDebouncedNotifications.clear();

for (const info of this._timeoutInfo.values()) {
Expand Down Expand Up @@ -877,10 +925,22 @@
return;
}

// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
Promise.resolve()
.then(() => (handler === undefined ? fallback!(notification) : handler(notification, codec)))
.catch(error => this._onerror(new Error(`Uncaught error in notification handler: ${error}`)));
// Dispatch SYNCHRONOUSLY in arrival order. Responses are dispatched
// synchronously (`_onresponse`), so deferring notification handlers
// by a microtask (the previous `Promise.resolve().then(handler)`
// shape) lets a same-chunk [progress…, result] pair retire the
// progress handler before the trailing progress runs — surfacing as
// a spurious unknown-token `onerror` (#166). The defer existed only
// to capture synchronous throws; an explicit try/catch keeps that
// guarantee while restoring arrival order. Asynchronous rejections
// from the handler still surface via `onerror`.
try {
Promise.resolve(handler === undefined ? fallback!(notification) : handler(notification, codec)).catch(error =>
this._onerror(new Error(`Uncaught error in notification handler: ${error}`))
);
} catch (error) {
this._onerror(new Error(`Uncaught error in notification handler: ${error}`));
}
}

private _onrequest(rawRequest: JSONRPCRequest, extra?: MessageExtraInfo): void {
Expand Down Expand Up @@ -1120,6 +1180,15 @@

const handler = this._progressHandlers.get(messageId);
if (!handler) {
// Late progress for a request that has already settled is a
// benign delivery race (the peer emitted progress immediately
// before the response): drop silently, or surface via the
// optional `onLateProgress` hook. A token that was never issued
// — never in the tombstone set — stays a real protocol error.
if (this._retiredProgressTokens.has(messageId)) {
this._options?.onLateProgress?.(notification);
return;
}
this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`));
return;
}
Expand All @@ -1133,7 +1202,7 @@
} catch (error) {
// Clean up if maxTotalTimeout was exceeded
this._responseHandlers.delete(messageId);
this._progressHandlers.delete(messageId);
this._retireProgressHandler(messageId);
this._cleanupTimeout(messageId);
responseHandler(error as Error);
return;
Expand All @@ -1160,7 +1229,7 @@

this._responseHandlers.delete(messageId);
this._cleanupTimeout(messageId);
this._progressHandlers.delete(messageId);
this._retireProgressHandler(messageId);

if (isJSONRPCResultResponse(response)) {
handler(response);
Expand Down Expand Up @@ -1352,11 +1421,14 @@
// in-flight abort does (`SdkError(RequestTimeout, reason)` via
// `cancel()` below). Bare `throwIfAborted()` would propagate the
// raw `signal.reason` instead, so callers that introduce an async
// hop before `request()` (e.g. a cache freshness check) would see
// a different rejection type depending on where the abort lands.
// hop before `request()` (v2's `callTool()` cache/mirroring
// preamble) would see a different rejection type depending on
// where the abort lands. The original reason is carried at
// `.data.reason` so callers can recover it structurally rather
// than parsing the stringified message (#159).
if (options?.signal?.aborted) {
const reason = options.signal.reason;
throw reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason));
throw reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason), { reason });
}

// Spec basic/patterns/cancellation §Transport-Specific (2026-07-28):
Expand Down Expand Up @@ -1403,7 +1475,7 @@
if (responseReceived) {
return;
}
this._progressHandlers.delete(messageId);
this._retireProgressHandler(messageId);

if (requestAbort === undefined) {
this._transport
Expand All @@ -1428,8 +1500,10 @@
requestAbort.abort();
}

// Wrap the reason in an SdkError if it isn't already
const error = reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason));
// Wrap the reason in an SdkError if it isn't already; carry
// the original at `.data.reason` so it is structurally
// recoverable (parity with the pre-aborted-signal path).
const error = reason instanceof SdkError ? reason : new SdkError(SdkErrorCode.RequestTimeout, String(reason), { reason });
reject(error);
};

Expand Down Expand Up @@ -1514,7 +1588,7 @@
this._transport
.send(outbound, { relatedRequestId, resumptionToken, onresumptiontoken, headers, requestSignal: requestAbort?.signal })
.catch(error => {
this._progressHandlers.delete(messageId);
this._retireProgressHandler(messageId);
reject(error);
});
}).finally(() => {
Expand Down
122 changes: 122 additions & 0 deletions packages/core-internal/test/shared/protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,128 @@ describe('protocol tests', () => {
expect(cancelledSent(tx.sent)).toHaveLength(0);
});
});

// #166 — same-chunk progress+result must not surface as an unknown-token
// onerror; #159 — pre-dispatch aborts must carry the caller's reason.
describe('progress-notification race + abort-reason preservation', () => {
test('tight progress loop + immediate result delivers all progress in arrival order, no onerror', async () => {
await protocol.connect(transport);
const onerror = vi.fn();
protocol.onerror = onerror;
const onprogress = vi.fn();

const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({ ok: z.boolean() }), { onprogress });

// Reproduces the chat-cli race: server emits N progress
// notifications and the result back-to-back in the same tick.
for (let i = 1; i <= 5; i++) {
transport.onmessage?.({
jsonrpc: '2.0',
method: 'notifications/progress',
params: { progressToken: 0, progress: i, total: 5 }
});
}
transport.onmessage?.({ jsonrpc: '2.0', id: 0, result: { ok: true } });

await expect(pending).resolves.toEqual({ ok: true });
expect(onprogress).toHaveBeenCalledTimes(5);
expect(onprogress).toHaveBeenNthCalledWith(1, { progress: 1, total: 5 });
expect(onprogress).toHaveBeenNthCalledWith(5, { progress: 5, total: 5 });
expect(onerror).not.toHaveBeenCalled();
});

test('late progress after a settled request is dropped silently (tombstoned), never-issued tokens still error', async () => {
const onLateProgress = vi.fn();
const proto = new TestProtocolImpl({ onLateProgress });
const tx = new MockTransport();
await proto.connect(tx);
const onerror = vi.fn();
proto.onerror = onerror;

const pending = testRequest(proto, { method: 'example', params: {} }, z.object({ ok: z.boolean() }), { onprogress: vi.fn() });
tx.onmessage?.({ jsonrpc: '2.0', id: 0, result: { ok: true } });
await expect(pending).resolves.toEqual({ ok: true });

// Late progress for the now-settled request: tombstoned → onLateProgress, no onerror.
tx.onmessage?.({
jsonrpc: '2.0',
method: 'notifications/progress',
params: { progressToken: 0, progress: 1, total: 1 }
});
expect(onerror).not.toHaveBeenCalled();
expect(onLateProgress).toHaveBeenCalledTimes(1);
expect(onLateProgress).toHaveBeenCalledWith(expect.objectContaining({ params: expect.objectContaining({ progressToken: 0 }) }));

// A token that was never issued still surfaces as the unknown-token onerror.
tx.onmessage?.({
jsonrpc: '2.0',
method: 'notifications/progress',
params: { progressToken: 999, progress: 1, total: 1 }
});
expect(onerror).toHaveBeenCalledTimes(1);
expect(onerror).toHaveBeenCalledWith(expect.objectContaining({ message: expect.stringContaining('unknown token') }));
});

test('late progress after caller-signal cancel is tombstoned (no onerror)', async () => {
await protocol.connect(transport);
const onerror = vi.fn();
protocol.onerror = onerror;

const ac = new AbortController();
const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({}), {
onprogress: vi.fn(),
signal: ac.signal
});
ac.abort('user cancel');
await expect(pending).rejects.toThrow();

// The chat-cli Ctrl-C reproduction: server keeps emitting progress
// after the client has cancelled. Tombstoned → no onerror noise.
transport.onmessage?.({
jsonrpc: '2.0',
method: 'notifications/progress',
params: { progressToken: 0, progress: 1, total: 3 }
});
// The cancel path also sends a notifications/cancelled on the
// wire — onerror is reserved here for unknown-token noise only.
const unknownTokenErrors = onerror.mock.calls.filter(c => String(c[0]).includes('unknown token'));
expect(unknownTokenErrors).toHaveLength(0);
});

test('pre-aborted signal rejects with SdkError(RequestTimeout) carrying the abort reason (#159)', async () => {
await protocol.connect(transport);
const ac = new AbortController();
const reason = new Error('operator abort');
ac.abort(reason);

const err = await testRequest(protocol, { method: 'example', params: {} }, z.object({}), { signal: ac.signal }).catch(
(e: unknown) => e
);

expect(err).toBeInstanceOf(SdkError);
const sdkErr = err as SdkError;
expect(sdkErr.code).toBe(SdkErrorCode.RequestTimeout);
// Reason preserved both stringified in the message AND structurally at .data.reason.
expect(sdkErr.message).toContain('operator abort');
expect((sdkErr.data as { reason?: unknown }).reason).toBe(reason);
// Nothing reached the wire on a pre-dispatch abort.
expect(sendSpy).not.toHaveBeenCalled();
});

test('in-flight abort carries the reason at .data.reason (parity with the pre-dispatch path)', async () => {
await protocol.connect(transport);
const ac = new AbortController();
const pending = testRequest(protocol, { method: 'example', params: {} }, z.object({}), { signal: ac.signal });
ac.abort('user cancel');

const err = await pending.catch((e: unknown) => e);
expect(err).toBeInstanceOf(SdkError);
const sdkErr = err as SdkError;
expect(sdkErr.code).toBe(SdkErrorCode.RequestTimeout);
expect(sdkErr.message).toBe('user cancel');
expect((sdkErr.data as { reason?: unknown }).reason).toBe('user cancel');
});
});
});

// (2025-11 experimental test suites removed under SEP-2663; see git history.)
Expand Down
Loading