Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/mcp-send-outside-agent-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"agents": patch
---

Fix `Agent was not found in send` when an `McpAgent` issues server-initiated MCP requests (`elicitInput`, `createMessage`, `listRoots`) from code with no agent context on its call stack — e.g. a host-side callback invoked via RPC from a Worker Loader child isolate (sandboxed tool execution / codemode), a service binding, or a queue consumer. `StreamableHTTPServerTransport` now captures its owning agent at construction instead of recovering it from `AsyncLocalStorage` at send time, so server-initiated sends work regardless of how the calling code was reached. (#1490)

For other context-dependent APIs in such callbacks, route the callback through a public method on your agent class — custom methods are automatically wrapped and re-enter the agent's context (see `docs/get-current-agent.md`).
46 changes: 46 additions & 0 deletions docs/get-current-agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,52 @@ export class MyAgent extends AIChatAgent {
}
```

## When Context Is Lost

The agent context only propagates along the call tree of the original
invocation. Code reached **outside** that call tree starts with an empty
context, so `getCurrentAgent()` returns `undefined` there. Common cases:

- A host-side callback invoked via **RPC from a Worker Loader child isolate**
(e.g. sandboxed tool execution / codemode)
- Calls arriving over a **service binding** or **Durable Object RPC**
- **Queue consumers** and other entrypoints that hold an agent reference

The fix is to route such callbacks through a **public method on your agent
class** — custom methods are automatically wrapped, so calling
`agent.someMethod()` re-enters the agent's context no matter how the caller
was reached:

```typescript
import { RpcTarget } from "cloudflare:workers";

class HostCallbackBridge extends RpcTarget {
constructor(private agent: MyMcpAgent) {
super();
}

// Invoked via RPC from a Worker Loader child isolate — no context
// ancestry. Calling a public agent method restores it automatically.
async invoke() {
return this.agent.handleSandboxCallback();
}
}

export class MyMcpAgent extends McpAgent {
async handleSandboxCallback() {
const { agent } = getCurrentAgent<MyMcpAgent>();
// ✅ agent is available again
}
}
```

Context restored this way has `connection`, `request`, and `email` unset: it
is not tied to any live client I/O.

> Note: server-initiated MCP requests (`elicitInput`, `createMessage`,
> `listRoots`) on `McpAgent` do **not** require any of this — the MCP
> transport resolves its agent independently of the calling context.

## API Reference

The agents package exports one main function for context management:
Expand Down
32 changes: 25 additions & 7 deletions packages/agents/src/mcp/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ export class StreamableHTTPServerTransport implements Transport {
private _started = false;
private _eventStore?: EventStore | ClearableEventStore;

// Captured at construction (always inside agent context, see constructor)
// so that `send()` works from call sites with no ALS ancestry in the
// original invocation — e.g. server-initiated requests (elicitation,
// sampling) issued from a host-side callback invoked via Worker Loader
// child→host RPC, a service binding, or a queue consumer. The transport
// is owned by its agent (same DO, same lifetime), so the reference
// cannot go stale. See cloudflare/agents#1490.
private _agent: McpAgent;

// This tracks which messages on each POST stream have been answered.
// It is fine that we do not persist this since it only supports backwards
// compatibility for clients batching requests, which the spec discourages.
Expand Down Expand Up @@ -157,6 +166,7 @@ export class StreamableHTTPServerTransport implements Transport {

// Initialization is handled in `McpAgent.serve()` and agents are addressed by sessionId,
// so we'll always have this available.
this._agent = agent;
this.sessionId = agent.getSessionId();
this._eventStore = options.eventStore;
}
Expand Down Expand Up @@ -423,8 +433,7 @@ export class StreamableHTTPServerTransport implements Transport {

async close(): Promise<void> {
// Close all SSE connections
const { agent } = getCurrentAgent();
if (!agent) throw new Error("Agent was not found in close");
const agent = this._agent;

for (const conn of agent.getConnections()) {
conn.close(1000, "Session closed");
Expand Down Expand Up @@ -519,8 +528,10 @@ export class StreamableHTTPServerTransport implements Transport {
* there is at most one to send on.
*/
private async sendStandalone(message: JSONRPCMessage): Promise<void> {
const { agent } = getCurrentAgent<McpAgent>();
if (!agent) throw new Error("Agent was not found in send");
// Use the captured agent rather than getCurrentAgent(): server-initiated
// sends may originate from code with no agent context on the call stack
// (e.g. a callback reached via cross-isolate RPC). See #1490.
const agent = this._agent;

const eventId = await this._eventStore?.storeEvent(
STANDALONE_STREAM_ID,
Expand Down Expand Up @@ -549,9 +560,16 @@ export class StreamableHTTPServerTransport implements Transport {
message: JSONRPCMessage,
requestId: RequestId
): Promise<void> {
const { agent, connection: originatingConnection } =
getCurrentAgent<McpAgent>();
if (!agent) throw new Error("Agent was not found in send");
const agent = this._agent;
// The originating connection is request-scoped, so it can only come from
// ALS — but it's purely an optimization for disambiguating colliding
// request ids, so its absence (e.g. when sending from a callback reached
// via cross-isolate RPC, see #1490) is fine. Ignore the store entirely
// when it belongs to a different agent (e.g. an agent-to-agent call)
// so a foreign connection can never influence routing.
const context = getCurrentAgent<McpAgent>();
const originatingConnection =
context.agent === agent ? context.connection : undefined;

// Pick the live connection that should receive this message. Normally
// request ids uniquely identify a POST connection. If a client violates
Expand Down
93 changes: 93 additions & 0 deletions packages/agents/src/tests/agents/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Agent,
callable,
getCurrentAgent,
__DO_NOT_USE_WILL_BREAK__agentContext as agentContext,
type AgentContext
} from "../../index.ts";
import {
Expand Down Expand Up @@ -225,6 +226,98 @@ export class TestMcpAgent extends McpAgent<Cloudflare.Env, unknown, Props> {
}
);

// The next two tools run their body inside `agentContext.exit(...)`,
// i.e. with an empty AsyncLocalStorage store. This simulates host-side
// callbacks reached outside the original invocation's call tree — e.g.
// via RPC from a Worker Loader child isolate or a service binding —
// where `getCurrentAgent()` returns undefined. Regression coverage for
// https://github.com/cloudflare/agents/issues/1490.
this.server.tool(
"elicitNameOutsideContext",
"Elicit user input from outside the agent ALS context (request-scoped send)",
{},
async (_args, extra) => {
const result = await agentContext.exit(() =>
this.server.server.elicitInput(
{
message: "What is your name?",
requestedSchema: {
type: "object",
properties: {
name: {
type: "string",
description: "Your name"
}
},
required: ["name"]
}
},
{ relatedRequestId: extra.requestId }
)
);

if (result.action === "accept" && result.content?.name) {
return {
content: [
{
type: "text",
text: `Outside-context elicit: ${result.content.name}`
}
]
};
}

return {
content: [{ type: "text", text: "Outside-context elicit cancelled" }]
};
}
);

this.server.tool(
"elicitNameOutsideContextStandalone",
"Elicit user input from outside the agent ALS context (standalone GET stream send)",
{},
async () => {
// No relatedRequestId: the elicit request goes out on the
// standalone GET stream via the transport's sendStandalone path.
const result = await agentContext.exit(() =>
this.server.server.elicitInput({
message: "What is your name?",
requestedSchema: {
type: "object",
properties: {
name: {
type: "string",
description: "Your name"
}
},
required: ["name"]
}
})
);

if (result.action === "accept" && result.content?.name) {
return {
content: [
{
type: "text",
text: `Standalone outside-context elicit: ${result.content.name}`
}
]
};
}

return {
content: [
{
type: "text",
text: "Standalone outside-context elicit cancelled"
}
]
};
}
);

// Use `registerTool` so we can later remove it.
// Triggers notifications/tools/list_changed
this.server.registerTool(
Expand Down
150 changes: 150 additions & 0 deletions packages/agents/src/tests/mcp/outside-context-send.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { createExecutionContext } from "cloudflare:test";
import type {
JSONRPCMessage,
JSONRPCRequest,
JSONRPCResultResponse
} from "@modelcontextprotocol/sdk/types.js";
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import { describe, expect, it } from "vitest";
import {
initializeStreamableHTTPServer,
openStandaloneSSE,
parseSSEData,
sendPostRequest
} from "../shared/test-utils";

async function readOneFrame(
reader: ReadableStreamDefaultReader<Uint8Array>
): Promise<string> {
const { value } = await reader.read();
return new TextDecoder().decode(value!);
}

/**
* Regression tests for https://github.com/cloudflare/agents/issues/1490.
*
* Server-initiated MCP requests (elicitInput, createMessage, listRoots)
* used to throw "Agent was not found in send" when issued from code with
* no agent AsyncLocalStorage context on its call stack — e.g. a host-side
* callback invoked via RPC from a Worker Loader child isolate. The test
* tools simulate that by running `this.server.server.elicitInput(...)`
* inside `agentContext.exit(...)`, which strips the store exactly like a
* fresh entrypoint invocation does.
*/
describe("server-initiated sends outside the agent ALS context", () => {
const baseUrl = "http://example.com/mcp";

it("routes a request-scoped elicit (relatedRequestId) issued outside the context", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

const toolCallMsg: JSONRPCMessage = {
id: "outside-ctx-1",
jsonrpc: "2.0",
method: "tools/call",
params: { name: "elicitNameOutsideContext", arguments: {} }
};

const toolResponse = await sendPostRequest(
ctx,
baseUrl,
toolCallMsg,
sessionId
);
expect(toolResponse.status).toBe(200);

const reader = toolResponse.body?.getReader();
if (!reader) throw new Error("No reader available for POST stream");

// Before the fix this frame never arrived: the transport threw
// "Agent was not found in send" and the tool call errored out.
const elicitFrame = await readOneFrame(reader);
const elicitRequest = parseSSEData(elicitFrame) as JSONRPCRequest;
expect(elicitRequest.method).toBe("elicitation/create");

const elicitResponse: JSONRPCMessage = {
jsonrpc: "2.0",
id: elicitRequest.id,
result: {
action: "accept",
content: { name: "Alice" }
}
} as unknown as JSONRPCMessage;

const responsePost = await sendPostRequest(
ctx,
baseUrl,
elicitResponse,
sessionId
);
expect(responsePost.status).toBe(202);

const toolResultFrame = await readOneFrame(reader);
const toolResult = parseSSEData(toolResultFrame) as JSONRPCResultResponse;

expect(toolResult.id).toBe("outside-ctx-1");
const result = toolResult.result as CallToolResult;
expect(result.content).toEqual([
{ type: "text", text: "Outside-context elicit: Alice" }
]);
});

it("delivers a standalone elicit (no relatedRequestId) issued outside the context on the GET stream", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

// Server-initiated requests without relatedRequestId go out on the
// standalone GET stream (transport sendStandalone path).
const standaloneReader = await openStandaloneSSE(ctx, sessionId, baseUrl);

const toolCallMsg: JSONRPCMessage = {
id: "outside-ctx-standalone-1",
jsonrpc: "2.0",
method: "tools/call",
params: { name: "elicitNameOutsideContextStandalone", arguments: {} }
};

const toolResponse = await sendPostRequest(
ctx,
baseUrl,
toolCallMsg,
sessionId
);
expect(toolResponse.status).toBe(200);

const postReader = toolResponse.body?.getReader();
if (!postReader) throw new Error("No reader available for POST stream");

const elicitFrame = await readOneFrame(standaloneReader);
const elicitRequest = parseSSEData(elicitFrame) as JSONRPCRequest;
expect(elicitRequest.method).toBe("elicitation/create");

const elicitResponse: JSONRPCMessage = {
jsonrpc: "2.0",
id: elicitRequest.id,
result: {
action: "accept",
content: { name: "Bob" }
}
} as unknown as JSONRPCMessage;

const responsePost = await sendPostRequest(
ctx,
baseUrl,
elicitResponse,
sessionId
);
expect(responsePost.status).toBe(202);

const toolResultFrame = await readOneFrame(postReader);
const toolResult = parseSSEData(toolResultFrame) as JSONRPCResultResponse;

expect(toolResult.id).toBe("outside-ctx-standalone-1");
const result = toolResult.result as CallToolResult;
expect(result.content).toEqual([
{ type: "text", text: "Standalone outside-context elicit: Bob" }
]);

await standaloneReader.cancel();
});
});
Loading