From 9c2aee4a660f42d947ad5abacc5a5de15fe97f2f Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 19 Jun 2026 14:06:10 +0000 Subject: [PATCH] fix(server): streamableHttp stores request-related events when stream is disconnected The standalone-SSE path stores to eventStore first, then writes if connected. The request-related path only stored when the stream was live, so a notification sent after closeSSE() (SEP-1699 polling) was silently dropped instead of being persisted for replay on reconnect. Exposed by the ctx.mcpReq.log request-related change against the new sse-polling example story; the gap pre-exists on main for any request-related notification (progress, ctx.mcpReq.notify) emitted after closeSSE(). --- .../server-streamablehttp-store-first.md | 5 + examples/sse-polling/package.json | 1 - packages/server/src/server/streamableHttp.ts | 138 +++++- .../server/test/server/streamableHttp.test.ts | 413 ++++++++++++++++++ 4 files changed, 538 insertions(+), 19 deletions(-) create mode 100644 .changeset/server-streamablehttp-store-first.md diff --git a/.changeset/server-streamablehttp-store-first.md b/.changeset/server-streamablehttp-store-first.md new file mode 100644 index 0000000000..cae12269c8 --- /dev/null +++ b/.changeset/server-streamablehttp-store-first.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/server': patch +--- + +`WebStandardStreamableHTTPServerTransport`: request-related events (progress, `ctx.mcpReq.notify`, handler-emitted log) and the final response are now persisted to the configured `eventStore` whenever the request is in flight, regardless of whether a live SSE writer currently exists — mirroring the standalone-SSE path's store-first semantics. This fixes the `closeSSE()` poll-and-replay drop (events emitted after `closeSSE()` were previously silently lost) and aligns with the 2025-11-25 specification ("disconnection SHOULD NOT be interpreted as the client cancelling its request"). When an `eventStore` is configured, a final response sent while no per-request stream is connected is stored for replay and returns cleanly instead of throwing "No connection established"; a `Last-Event-ID` reconnect after the request has been retired replays the stored response and then closes the resumed stream. When no `eventStore` is configured, the same condition is surfaced via `onerror` (the response is undeliverable) and the request id is retired. diff --git a/examples/sse-polling/package.json b/examples/sse-polling/package.json index 838a71483b..b66b6cad33 100644 --- a/examples/sse-polling/package.json +++ b/examples/sse-polling/package.json @@ -25,7 +25,6 @@ "era": "legacy", "path": "/mcp", "timeoutMs": 20000, - "excluded": "replay assertion is timing-sensitive on CI; revisit", "//": "SEP-1699 closeSSE/eventStore/retryInterval live on the sessionful-2025 transport; the client is era-blind so dual would duplicate." } } diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fb6ffc2b02..4e611f0f2d 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -63,6 +63,12 @@ interface StreamMapping { encoder?: InstanceType; /** Promise resolver for JSON response mode */ resolveJson?: (response: Response) => void; + /** + * Event ids already written to this stream by `replayEventsAfter` — lets + * `send()` skip a duplicate write when the resumed stream registered + * during the `storeEvent()` await and replay already delivered the event. + */ + replayedEventIds?: Set; /** Cleanup function to close stream and remove mapping */ cleanup: () => void; } @@ -462,8 +468,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { streamController = controller; }, cancel: () => { - // Stream was cancelled by client - this._streamMapping.delete(this._standaloneSseStreamId); + // Stream was cancelled by client. Only drop the mapping when + // it still points at THIS controller — a stale cancel must not + // delete a successor stream registered by a later GET/resume. + if (this._streamMapping.get(this._standaloneSseStreamId)?.controller === streamController) { + this._streamMapping.delete(this._standaloneSseStreamId); + } } }); @@ -536,20 +546,33 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // Create a ReadableStream with controller for SSE const encoder = new TextEncoder(); let streamController: ReadableStreamDefaultController; + // Captured by the cancel closure below before it's assigned (after + // replayEventsAfter resolves) — must be `let`. + // eslint-disable-next-line prefer-const + let replayedStreamId: string | undefined; const readable = new ReadableStream({ start: controller => { streamController = controller; }, cancel: () => { - // Stream was cancelled by client - // Cleanup will be handled by the mapping + // Stream was cancelled by client — drop the mapping so a + // subsequent reconnect with the same Last-Event-ID is not + // refused with 409 by the conflict check above. Only delete + // when the mapped entry is still THIS closure's controller: + // a stale cancel from an earlier resume must not delete a + // successor resumed stream a re-poll has since registered. + if (replayedStreamId !== undefined && this._streamMapping.get(replayedStreamId)?.controller === streamController) { + this._streamMapping.delete(replayedStreamId); + } } }); // Replay events - returns the streamId for backwards compatibility - const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { + const replayedEventIds = new Set(); + replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { + replayedEventIds.add(eventId); const success = this.writeSSEEvent(streamController!, encoder, message, eventId); if (!success) { try { @@ -564,8 +587,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { this._streamMapping.set(replayedStreamId, { controller: streamController!, encoder, + replayedEventIds, cleanup: () => { - this._streamMapping.delete(replayedStreamId); + this._streamMapping.delete(replayedStreamId!); try { streamController!.close(); } catch { @@ -574,6 +598,25 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } }); + // If this is a per-request stream and no in-flight request still + // targets this streamId, the request was already retired by the + // clean-return path while disconnected and the replay above just + // delivered the final response. Per the spec the server SHOULD + // close the SSE stream after the JSON-RPC response — close and + // unregister so a later reconnect isn't refused with 409. The + // standalone GET stream is never request-scoped and stays open. + if (replayedStreamId !== this._standaloneSseStreamId) { + const hasInFlightRequest = [...this._requestToStreamMapping.values()].includes(replayedStreamId); + if (!hasInFlightRequest) { + this._streamMapping.delete(replayedStreamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + } + return new Response(readable, { headers }); } catch (error) { this.onerror?.(error as Error); @@ -770,8 +813,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { streamController = controller; }, cancel: () => { - // Stream was cancelled by client - this._streamMapping.delete(streamId); + // Stream was cancelled by client. Only drop the mapping + // when it still points at THIS controller — a stale cancel + // (firing after a Last-Event-ID reconnect registered a + // resumed stream under the same streamId) must not delete + // the successor. + if (this._streamMapping.get(streamId)?.controller === streamController) { + this._streamMapping.delete(streamId); + } } }); @@ -987,8 +1036,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { return; } - // Send the message to the standalone SSE stream - if (standaloneSse.controller && standaloneSse.encoder) { + // Send the message to the standalone SSE stream — unless the + // resumed stream's replay already delivered this exact eventId + // (identity dedup; mirrors the per-request path below). + if ( + standaloneSse.controller && + standaloneSse.encoder && + (eventId === undefined || !standaloneSse.replayedEventIds?.has(eventId)) + ) { this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId); } return; @@ -1000,17 +1055,33 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { throw new Error(`No connection established for request ID: ${String(requestId)}`); } - const stream = this._streamMapping.get(streamId); - - if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { - // For SSE responses, generate event ID if event store is provided + let stream = this._streamMapping.get(streamId); + + if (!this._enableJsonResponse) { + // Store FIRST so request-related events emitted while the per-request + // stream is disconnected (e.g. after `closeSSE()` or a transient + // client drop) are replayed on Last-Event-ID reconnect — same + // store-first semantics as the standalone path above. Storage is + // keyed on request-in-flight (`_requestToStreamMapping` resolved + // `streamId` above), not on whether a live SSE writer currently + // exists: `_streamMapping` tracks the delivery target only. Per + // 2025-11-25 transports.mdx, disconnection SHOULD NOT be + // interpreted as the client cancelling its request. let eventId: string | undefined; - if (this._eventStore) { eventId = await this._eventStore.storeEvent(streamId, message); + // Re-read after the await: a Last-Event-ID reconnect during + // storeEvent() may have registered a resumed stream under this + // streamId (mirrors the standalone path's post-await read). + stream = this._streamMapping.get(streamId); + } + // Write the event to the response stream — unless the resumed + // stream's replay already delivered this exact eventId (the store + // committed before replay scanned, so replay wrote it; identity + // dedup only, no ordering assumption). + if (stream?.controller && stream?.encoder && (eventId === undefined || !stream.replayedEventIds?.has(eventId))) { + this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); } - // Write the event to the response stream - this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); } if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) { @@ -1022,7 +1093,37 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { if (allResponsesReady) { if (!stream) { - throw new Error(`No connection established for request ID: ${String(requestId)}`); + if (this._enableJsonResponse) { + // JSON-mode requires a resolveJson sink; with no stream entry the + // response is undeliverable. + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } + if (!this._eventStore) { + // SSE-mode with no live writer and no event store: the + // response is undeliverable AND not stored. Surface via + // onerror so the drop is observable (matching pre-PR + // behaviour), then run the bookkeeping cleanup so the + // request id is retired. + this.onerror?.( + new Error( + `Response for request ID ${String(requestId)} is undeliverable: per-request stream is disconnected and no eventStore is configured` + ) + ); + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + return; + } + // SSE-mode with no live writer and an event store configured: + // the response was stored above for replay on Last-Event-ID + // reconnect. Return cleanly after running the bookkeeping + // cleanup so the request id is retired. + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + return; } if (this._enableJsonResponse && stream.resolveJson) { // All responses ready, send as JSON @@ -1040,6 +1141,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } else { stream.resolveJson(Response.json(responses, { status: 200, headers })); } + stream.cleanup(); } else { // End the SSE stream stream.cleanup(); diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index 7a23dd56bb..b95e8bafd1 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -705,6 +705,369 @@ describe('Zod v4', () => { // Should have id: field in the SSE event expect(text).toContain('id:'); }); + + it('should store request-related events emitted after closeSSEStream() and not throw on the final response', async () => { + // The SEP-1699 poll-and-replay flow: handler closes the per-request + // SSE stream, then emits a notification and its final result while + // the client has not yet reconnected. Both must be persisted to the + // eventStore (so they replay on Last-Event-ID reconnect) and the + // final-response send must not surface a spurious error. + mcpServer.registerTool( + 'poll', + { description: 'closeSSE then emit', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + ctx.http?.closeSSE?.(); + await ctx.mcpReq.notify({ + method: 'notifications/progress', + params: { progressToken: 'poll-1', progress: 75 } + }); + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + + const sendErrors: unknown[] = []; + mcpServer.server.onerror = e => sendErrors.push(e); + + sessionId = await initializeServer(); + storedEvents.clear(); + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'poll', arguments: {} }, + id: 'poll-1' + }; + const response = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + // closeSSE() in the handler closes the controller; drain the (now + // closed) body so the Response is fully consumed. + await response.text().catch(() => {}); + // Let the async handler chain (notify + final response send) settle. + await new Promise(resolve => setTimeout(resolve, 50)); + + const stored = [...storedEvents.values()].map(e => e.message); + expect( + stored.some(m => 'method' in m && m.method === 'notifications/progress'), + 'progress notification should be stored for replay after closeSSE()' + ).toBe(true); + expect( + stored.some(m => 'id' in m && m.id === 'poll-1' && 'result' in m), + 'final response should be stored for replay after closeSSE()' + ).toBe(true); + expect(sendErrors).toEqual([]); + }); + + it('should store request-related events after a client disconnect while the request is still in flight', async () => { + // Per 2025-11-25 transports.mdx, disconnection SHOULD NOT be + // interpreted as the client cancelling its request — storage is + // keyed on request-in-flight (_requestToStreamMapping), not on + // whether a live SSE writer exists. The final-response send must + // not throw and must clear the request id from the mapping. + let release!: () => void; + const gate = new Promise(resolve => { + release = resolve; + }); + mcpServer.registerTool( + 'disconnect', + { description: 'emit after client disconnect', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + await gate; + await ctx.mcpReq.notify({ + method: 'notifications/progress', + params: { progressToken: 'disconnect-1', progress: 50 } + }); + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + const sendErrors: unknown[] = []; + mcpServer.server.onerror = e => sendErrors.push(e); + + sessionId = await initializeServer(); + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'disconnect', arguments: {} }, + id: 'disconnect-1' + }; + const response = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + storedEvents.clear(); + // Client disconnect: cancel the per-request stream (the ReadableStream + // cancel callback deletes the _streamMapping entry — no live writer). + await response.body?.cancel(); + await new Promise(resolve => setTimeout(resolve, 10)); + release(); + await new Promise(resolve => setTimeout(resolve, 50)); + + const stored = [...storedEvents.values()].map(e => e.message); + expect( + stored.some(m => 'method' in m && m.method === 'notifications/progress'), + 'progress notification should be stored while request is in flight (disconnect ≠ cancel)' + ).toBe(true); + expect( + stored.some(m => 'id' in m && m.id === 'disconnect-1' && 'result' in m), + 'final response should be stored for replay when no live writer exists' + ).toBe(true); + expect(sendErrors).toEqual([]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((transport as any)._requestToStreamMapping.has('disconnect-1')).toBe(false); + }); + + it('should accept Last-Event-ID reconnect after closeSSEStream() and replay stored events', async () => { + // closeSSEStream() removes the _streamMapping entry, so the + // replayEvents() conflict check sees no active connection and the + // reconnect succeeds (200), replaying the stored notification. + let release!: () => void; + const gate = new Promise(resolve => { + release = resolve; + }); + mcpServer.registerTool( + 'reconnect', + { description: 'closeSSE, emit, then wait for reconnect', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + ctx.http?.closeSSE?.(); + await ctx.mcpReq.notify({ + method: 'notifications/progress', + params: { progressToken: 'reconnect-1', progress: 75 } + }); + await gate; + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + mcpServer.server.onerror = () => {}; + + sessionId = await initializeServer(); + storedEvents.clear(); + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'reconnect', arguments: {} }, + id: 'reconnect-1' + }; + const response = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + // Read the priming event so we have a Last-Event-ID to reconnect with. + const primingText = await response.text().catch(() => ''); + const primingId = /id:\s*(\S+)/.exec(primingText)?.[1]; + expect(primingId).toBeDefined(); + // Let the closeSSE + notify settle (handler is now gated on `gate`). + await new Promise(resolve => setTimeout(resolve, 30)); + + // Reconnect with Last-Event-ID while the request is still in flight. + const reconnect = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect.status).toBe(200); + release(); + const replayed = await readSSEEvent(reconnect); + expect(replayed).toContain('notifications/progress'); + }); + + it('should close and unregister the resumed stream when reconnecting after the request was already retired', async () => { + // Retire-then-reconnect: handler closeSSE → emit + result. With no + // live writer the final response is stored and the request id is + // retired by the clean-return path. A subsequent Last-Event-ID + // reconnect must replay the stored response AND close the resumed + // stream (per spec: server SHOULD close the SSE stream after the + // JSON-RPC response) so a second reconnect is not refused with 409. + mcpServer.registerTool( + 'retire', + { description: 'closeSSE then emit then return', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + ctx.http?.closeSSE?.(); + await ctx.mcpReq.notify({ + method: 'notifications/progress', + params: { progressToken: 'retire-1', progress: 75 } + }); + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + mcpServer.server.onerror = () => {}; + + sessionId = await initializeServer(); + storedEvents.clear(); + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'retire', arguments: {} }, + id: 'retire-1' + }; + const response = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + // Read the priming event so we have a Last-Event-ID to reconnect with. + const primingText = await response.text().catch(() => ''); + const primingId = /id:\s*(\S+)/.exec(primingText)?.[1]; + expect(primingId).toBeDefined(); + // Let the handler chain (notify + final response send → clean-return) settle. + await new Promise(resolve => setTimeout(resolve, 50)); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((transport as any)._requestToStreamMapping.has('retire-1')).toBe(false); + + // Reconnect after the request was retired. + const reconnect = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect.status).toBe(200); + const replayed = await reconnect.text(); + expect(replayed).toContain('notifications/progress'); + expect(replayed).toContain('"id":"retire-1"'); + expect(replayed, 'replay should include the stored final response').toContain('"result"'); + + // Resumed stream must have been closed and unregistered: a second + // reconnect with the same Last-Event-ID is accepted (200), not 409. + const reconnect2 = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect2.status).toBe(200); + await reconnect2.body?.cancel(); + }); + + it('should write to a stream resumed during the storeEvent() await (re-read after await, no TOCTOU)', async () => { + // send() reads _streamMapping[streamId] before `await storeEvent()` + // and decides on that snapshot. A Last-Event-ID reconnect during + // the await registers a resumed stream — send() must re-read after + // the await so the final response is written to it (and the + // all-responses-ready path closes/unregisters it), not silently + // dropped into the clean-return. + let handlerRelease!: () => void; + const handlerGate = new Promise(resolve => { + handlerRelease = resolve; + }); + mcpServer.registerTool( + 'toctou', + { description: 'closeSSE, gate, then return', inputSchema: z.object({}) }, + async (_args, ctx): Promise => { + ctx.http?.closeSSE?.(); + await handlerGate; + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + mcpServer.server.onerror = () => {}; + + sessionId = await initializeServer(); + storedEvents.clear(); + + // Gate storeEvent() only after we flip `gateStores` (the priming + // event must store and flush ungated so we have a Last-Event-ID). + let gateStores = false; + let storeRelease!: () => void; + const storeGate = new Promise(resolve => { + storeRelease = resolve; + }); + const realStoreEvent = eventStore.storeEvent.bind(eventStore); + eventStore.storeEvent = async (sid, msg) => { + const id = await realStoreEvent(sid, msg); + if (gateStores) { + await storeGate; + } + return id; + }; + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'toctou', arguments: {} }, + id: 'toctou-1' + }; + const response = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + const primingText = await response.text().catch(() => ''); + const primingId = /id:\s*(\S+)/.exec(primingText)?.[1]; + expect(primingId).toBeDefined(); + + // Arm the storeEvent gate, then let the handler return so send() + // for the final response enters `await storeEvent()` and parks. + gateStores = true; + handlerRelease(); + await new Promise(resolve => setTimeout(resolve, 30)); + + // Reconnect while storeEvent() is pending — registers a resumed + // stream under the same streamId. The request is still in flight + // (send() is parked on the await), so replayEvents() leaves it open. + const reconnect = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect.status).toBe(200); + + // Release storeEvent — send() re-reads _streamMapping, sees the + // resumed stream, writes the result, and the all-responses-ready + // path closes/unregisters it. + storeRelease(); + const body = await reconnect.text(); + expect(body, 'final response must be written to the resumed stream').toContain('"id":"toctou-1"'); + expect(body).toContain('"result"'); + // Exactly-once on the resumed stream: replay may have written it, + // and send() must dedup against replayedEventIds (no double write). + expect(body.match(/"id":"toctou-1"/g)?.length, 'result must be delivered exactly once').toBe(1); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((transport as any)._requestToStreamMapping.has('toctou-1')).toBe(false); + // Resumed stream was closed/unregistered: second reconnect → 200. + const reconnect2 = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect2.status).toBe(200); + await reconnect2.body?.cancel(); + }); + + it('should not let a stale per-request cancel delete a successor resumed stream (identity-guarded)', async () => { + // EventStore without getStreamIdForEventId → replayEvents() skips + // the conflict check and registers the resumed stream under the + // SAME streamId, OVERWRITING the original entry. A late cancel of + // the original POST body must identity-check before deleting so + // the successor survives and receives subsequent send()s. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + delete (eventStore as any).getStreamIdForEventId; + + let release!: () => void; + const gate = new Promise(resolve => { + release = resolve; + }); + mcpServer.registerTool( + 'stalecancel', + { description: 'gate then return', inputSchema: z.object({}) }, + async (): Promise => { + await gate; + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + mcpServer.server.onerror = () => {}; + + sessionId = await initializeServer(); + storedEvents.clear(); + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'stalecancel', arguments: {} }, + id: 'stalecancel-1' + }; + const original = await transport.handleRequest(createRequest('POST', callMessage, { sessionId })); + const reader = original.body!.getReader(); + const { value } = await reader.read(); + const primingText = new TextDecoder().decode(value); + const primingId = /id:\s*(\S+)/.exec(primingText)?.[1]; + expect(primingId).toBeDefined(); + + // Reconnect while the original is still mapped (no conflict check + // without getStreamIdForEventId) — successor overwrites the entry. + const reconnect = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'Last-Event-ID': primingId! } }) + ); + expect(reconnect.status).toBe(200); + + // Late cancel of the ORIGINAL per-request stream — its source + // cancel callback fires now. Identity-guard must keep the + // successor's _streamMapping entry intact. + await reader.cancel(); + await new Promise(resolve => setTimeout(resolve, 10)); + + // Handler returns → send() finds the successor and writes to it. + release(); + const body = await reconnect.text(); + expect(body, 'result must reach the successor resumed stream').toContain('"id":"stalecancel-1"'); + expect(body).toContain('"result"'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((transport as any)._requestToStreamMapping.has('stalecancel-1')).toBe(false); + }); }); describe('HTTPServerTransport - Protocol Version Validation', () => { @@ -794,6 +1157,56 @@ describe('Zod v4', () => { return response.headers.get('mcp-session-id') as string; } + it('should call onerror when the per-request stream disconnects mid-handler with no eventStore configured', async () => { + // Sessionful transport WITHOUT an eventStore: a final response sent + // while no live writer exists is undeliverable AND not stored. The + // drop must be observable via onerror, and the request id retired. + // Fresh server (the suite beforeEach connects before any tool is + // registered, which would trip registerCapabilities). + let release!: () => void; + const gate = new Promise(resolve => { + release = resolve; + }); + const localServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: {} }); + localServer.registerTool( + 'disconnect', + { description: 'return after client disconnect', inputSchema: z.object({}) }, + async (): Promise => { + await gate; + return { content: [{ type: 'text', text: 'done' }] }; + } + ); + const localTransport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID() + }); + const localErrors: Error[] = []; + localServer.server.onerror = e => localErrors.push(e as Error); + await localServer.connect(localTransport); + + const initRes = await localTransport.handleRequest(createRequest('POST', TEST_MESSAGES.initialize)); + const sessionId = initRes.headers.get('mcp-session-id') as string; + + const callMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'disconnect', arguments: {} }, + id: 'disconnect-1' + }; + const response = await localTransport.handleRequest(createRequest('POST', callMessage, { sessionId })); + // Client hard-disconnect: cancel the per-request stream (the + // ReadableStream cancel callback drops the _streamMapping entry). + await response.body?.cancel(); + await new Promise(resolve => setTimeout(resolve, 10)); + release(); + await new Promise(resolve => setTimeout(resolve, 50)); + + expect(localErrors.length, 'onerror should surface the undeliverable response').toBeGreaterThan(0); + expect(localErrors[0]?.message).toMatch(/undeliverable.*no eventStore is configured/); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((localTransport as any)._requestToStreamMapping.has('disconnect-1')).toBe(false); + await localTransport.close(); + }); + it('should call onerror for invalid JSON', async () => { const request = new Request('http://localhost/mcp', { method: 'POST',