Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/server-streamablehttp-store-first.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/server': patch
---

`WebStandardStreamableHTTPServerTransport`: request-related events (progress, `ctx.mcpReq.notify`, handler-emitted log) and the final response are now persisted to the configured `eventStore` whenever the request is in flight, regardless of whether a live SSE writer currently exists — mirroring the standalone-SSE path's store-first semantics. This fixes the `closeSSE()` poll-and-replay drop (events emitted after `closeSSE()` were previously silently lost) and aligns with the 2025-11-25 specification ("disconnection SHOULD NOT be interpreted as the client cancelling its request"). When an `eventStore` is configured, a final response sent while no per-request stream is connected is stored for replay and returns cleanly instead of throwing "No connection established"; a `Last-Event-ID` reconnect after the request has been retired replays the stored response and then closes the resumed stream. When no `eventStore` is configured, the same condition is surfaced via `onerror` (the response is undeliverable) and the request id is retired.
1 change: 0 additions & 1 deletion examples/sse-polling/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"era": "legacy",
"path": "/mcp",
"timeoutMs": 20000,
"excluded": "replay assertion is timing-sensitive on CI; revisit",
"//": "SEP-1699 closeSSE/eventStore/retryInterval live on the sessionful-2025 transport; the client is era-blind so dual would duplicate."
}
}
138 changes: 120 additions & 18 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ interface StreamMapping {
encoder?: InstanceType<typeof TextEncoder>;
/** Promise resolver for JSON response mode */
resolveJson?: (response: Response) => void;
/**
* Event ids already written to this stream by `replayEventsAfter` — lets
* `send()` skip a duplicate write when the resumed stream registered
* during the `storeEvent()` await and replay already delivered the event.
*/
replayedEventIds?: Set<string>;
/** Cleanup function to close stream and remove mapping */
cleanup: () => void;
}
Expand Down Expand Up @@ -462,8 +468,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
this._streamMapping.delete(this._standaloneSseStreamId);
// Stream was cancelled by client. Only drop the mapping when
// it still points at THIS controller — a stale cancel must not
// delete a successor stream registered by a later GET/resume.
if (this._streamMapping.get(this._standaloneSseStreamId)?.controller === streamController) {
this._streamMapping.delete(this._standaloneSseStreamId);
}
}
});

Expand Down Expand Up @@ -536,20 +546,33 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
// Create a ReadableStream with controller for SSE
const encoder = new TextEncoder();
let streamController: ReadableStreamDefaultController<Uint8Array>;
// Captured by the cancel closure below before it's assigned (after
// replayEventsAfter resolves) — must be `let`.
// eslint-disable-next-line prefer-const
let replayedStreamId: string | undefined;

const readable = new ReadableStream<Uint8Array>({
start: controller => {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
// Cleanup will be handled by the mapping
// Stream was cancelled by client — drop the mapping so a
// subsequent reconnect with the same Last-Event-ID is not
// refused with 409 by the conflict check above. Only delete
// when the mapped entry is still THIS closure's controller:
// a stale cancel from an earlier resume must not delete a
// successor resumed stream a re-poll has since registered.
if (replayedStreamId !== undefined && this._streamMapping.get(replayedStreamId)?.controller === streamController) {
this._streamMapping.delete(replayedStreamId);
}
}
});

// Replay events - returns the streamId for backwards compatibility
const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, {
const replayedEventIds = new Set<string>();
replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, {
send: async (eventId: string, message: JSONRPCMessage) => {
replayedEventIds.add(eventId);
const success = this.writeSSEEvent(streamController!, encoder, message, eventId);
if (!success) {
try {
Expand All @@ -564,8 +587,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
this._streamMapping.set(replayedStreamId, {
controller: streamController!,
encoder,
replayedEventIds,
cleanup: () => {
this._streamMapping.delete(replayedStreamId);
this._streamMapping.delete(replayedStreamId!);
try {
streamController!.close();
} catch {
Expand All @@ -574,6 +598,25 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
}
});

// If this is a per-request stream and no in-flight request still
// targets this streamId, the request was already retired by the
// clean-return path while disconnected and the replay above just
// delivered the final response. Per the spec the server SHOULD
// close the SSE stream after the JSON-RPC response — close and
// unregister so a later reconnect isn't refused with 409. The
// standalone GET stream is never request-scoped and stays open.
if (replayedStreamId !== this._standaloneSseStreamId) {
const hasInFlightRequest = [...this._requestToStreamMapping.values()].includes(replayedStreamId);
if (!hasInFlightRequest) {
this._streamMapping.delete(replayedStreamId);
try {
streamController!.close();
} catch {
// Controller might already be closed
}
}
}

return new Response(readable, { headers });
} catch (error) {
this.onerror?.(error as Error);
Expand Down Expand Up @@ -770,8 +813,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
streamController = controller;
},
cancel: () => {
// Stream was cancelled by client
this._streamMapping.delete(streamId);
// Stream was cancelled by client. Only drop the mapping
// when it still points at THIS controller — a stale cancel
// (firing after a Last-Event-ID reconnect registered a
// resumed stream under the same streamId) must not delete
// the successor.
if (this._streamMapping.get(streamId)?.controller === streamController) {
this._streamMapping.delete(streamId);
}
}
});

Expand Down Expand Up @@ -987,8 +1036,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
return;
}

// Send the message to the standalone SSE stream
if (standaloneSse.controller && standaloneSse.encoder) {
// Send the message to the standalone SSE stream — unless the
// resumed stream's replay already delivered this exact eventId
// (identity dedup; mirrors the per-request path below).
if (
standaloneSse.controller &&
standaloneSse.encoder &&
(eventId === undefined || !standaloneSse.replayedEventIds?.has(eventId))
) {
this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId);
}
return;
Expand All @@ -1000,17 +1055,33 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}

const stream = this._streamMapping.get(streamId);

if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
// For SSE responses, generate event ID if event store is provided
let stream = this._streamMapping.get(streamId);

if (!this._enableJsonResponse) {
// Store FIRST so request-related events emitted while the per-request
// stream is disconnected (e.g. after `closeSSE()` or a transient
// client drop) are replayed on Last-Event-ID reconnect — same
// store-first semantics as the standalone path above. Storage is
// keyed on request-in-flight (`_requestToStreamMapping` resolved
// `streamId` above), not on whether a live SSE writer currently
// exists: `_streamMapping` tracks the delivery target only. Per
// 2025-11-25 transports.mdx, disconnection SHOULD NOT be
// interpreted as the client cancelling its request.
let eventId: string | undefined;

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
// Re-read after the await: a Last-Event-ID reconnect during
// storeEvent() may have registered a resumed stream under this
// streamId (mirrors the standalone path's post-await read).
stream = this._streamMapping.get(streamId);
}
// Write the event to the response stream — unless the resumed
// stream's replay already delivered this exact eventId (the store
// committed before replay scanned, so replay wrote it; identity
// dedup only, no ordering assumption).
if (stream?.controller && stream?.encoder && (eventId === undefined || !stream.replayedEventIds?.has(eventId))) {
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}
// Write the event to the response stream
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}

if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
Expand All @@ -1022,7 +1093,37 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {

if (allResponsesReady) {
if (!stream) {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
if (this._enableJsonResponse) {
// JSON-mode requires a resolveJson sink; with no stream entry the
// response is undeliverable.
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}
if (!this._eventStore) {
// SSE-mode with no live writer and no event store: the
// response is undeliverable AND not stored. Surface via
// onerror so the drop is observable (matching pre-PR
// behaviour), then run the bookkeeping cleanup so the
// request id is retired.
this.onerror?.(
new Error(
`Response for request ID ${String(requestId)} is undeliverable: per-request stream is disconnected and no eventStore is configured`
)
);
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
return;
}
// SSE-mode with no live writer and an event store configured:
// the response was stored above for replay on Last-Event-ID
// reconnect. Return cleanly after running the bookkeeping
// cleanup so the request id is retired.
for (const id of relatedIds) {
this._requestResponseMap.delete(id);
this._requestToStreamMapping.delete(id);
}
return;
}
if (this._enableJsonResponse && stream.resolveJson) {
// All responses ready, send as JSON
Expand All @@ -1040,6 +1141,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
} else {
stream.resolveJson(Response.json(responses, { status: 200, headers }));
}
stream.cleanup();
} else {
// End the SSE stream
stream.cleanup();
Expand Down
Loading
Loading