From f221c38b4c0f5d34440022f4b683f4f38828c27d Mon Sep 17 00:00:00 2001 From: Olivier Mouren Date: Wed, 24 Jun 2026 14:02:55 +0200 Subject: [PATCH] Add outgoing request and client response events Introduce `OutgoingRequestEvent` and `ClientResponseEvent` to provide full observability over the server's interaction with the client. This includes requests sent by the server (e.g., elicitation, sampling) and the client's replies to those requests. Additionally, update `ResponseEvent` and `ErrorEvent` to be dispatched when a suspended Fiber completes. This ensures that deferred responses from long-running, asynchronous operations are also observable via the existing event mechanisms. --- docs/events.md | 28 +- src/Event/ClientResponseEvent.php | 56 +++ src/Event/OutgoingRequestEvent.php | 50 +++ src/JsonRpc/MessageFactory.php | 10 + src/Server/Protocol.php | 64 ++++ src/Server/Transport/BaseTransport.php | 21 ++ .../Transport/ManagesTransportCallbacks.php | 11 + src/Server/Transport/StdioTransport.php | 2 + .../Transport/StreamableHttpTransport.php | 2 + src/Server/Transport/TransportInterface.php | 9 + tests/Unit/JsonRpc/MessageFactoryTest.php | 12 + tests/Unit/Server/ProtocolTest.php | 347 ++++++++++++++++++ 12 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 src/Event/ClientResponseEvent.php create mode 100644 src/Event/OutgoingRequestEvent.php diff --git a/docs/events.md b/docs/events.md index ebd70ed2..0580db0d 100644 --- a/docs/events.md +++ b/docs/events.md @@ -10,6 +10,8 @@ The MCP SDK provides a PSR-14 compatible event system that allows you to hook in - [ResponseEvent](#responseevent) - [ErrorEvent](#errorevent) - [NotificationEvent](#notificationevent) + - [OutgoingRequestEvent](#outgoingrequestevent) + - [ClientResponseEvent](#clientresponseevent) - [List Change Events](#list-change-events) ## Setup @@ -37,7 +39,7 @@ $server = Server::builder() ## Protocol Events -The SDK dispatches 4 broad event types at the protocol level, allowing you to observe and modify all server operations: +The SDK dispatches 6 broad event types at the protocol level, allowing you to observe and modify all server operations: ### RequestEvent @@ -51,7 +53,7 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob ### ResponseEvent -**Dispatched**: When a successful response is ready to be sent to the client, after handler execution. +**Dispatched**: When a successful response is ready to be sent to the client, after handler execution. Also dispatched when a suspended Fiber completes (e.g. after elicitation or sampling). **Properties**: - `getResponse(): Response` - The response being sent @@ -62,7 +64,7 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob ### ErrorEvent -**Dispatched**: When an error occurs during request processing. +**Dispatched**: When an error occurs during request processing. Also dispatched when a suspended Fiber completes with an error. **Properties**: - `getError(): Error` - The error being sent @@ -81,6 +83,26 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob - `getSession(): SessionInterface` - The current session - `getMethod(): string` - Convenience method to get the notification method +### OutgoingRequestEvent + +**Dispatched**: When the server sends a request to the client (e.g. `elicitation/create`, `sampling/create`). + +**Properties**: +- `getRequest(): Request` - The outgoing request (with server-assigned ID) +- `getSession(): SessionInterface` - The current session +- `getTimeout(): int` - Maximum time to wait for the client response (seconds) +- `getMethod(): string` - Convenience method to get the request method + +### ClientResponseEvent + +**Dispatched**: When the server receives a client response to a prior outgoing request. + +**Properties**: +- `getResponse(): Response|Error` - The client's reply +- `getSession(): SessionInterface` - The current session +- `getId(): string|int` - The JSON-RPC message ID +- `isError(): bool` - Whether the client returned a JSON-RPC error + ## List Change Events These events are dispatched when the lists of available capabilities change: diff --git a/src/Event/ClientResponseEvent.php b/src/Event/ClientResponseEvent.php new file mode 100644 index 00000000..84b34fff --- /dev/null +++ b/src/Event/ClientResponseEvent.php @@ -0,0 +1,56 @@ + + */ +final class ClientResponseEvent +{ + /** + * @param Response|Error $response + */ + public function __construct( + private readonly Response|Error $response, + private readonly SessionInterface $session, + ) { + } + + /** + * @return Response|Error + */ + public function getResponse(): Response|Error + { + return $this->response; + } + + public function getSession(): SessionInterface + { + return $this->session; + } + + public function getId(): string|int + { + return $this->response->getId(); + } + + public function isError(): bool + { + return $this->response instanceof Error; + } +} diff --git a/src/Event/OutgoingRequestEvent.php b/src/Event/OutgoingRequestEvent.php new file mode 100644 index 00000000..5c970bd0 --- /dev/null +++ b/src/Event/OutgoingRequestEvent.php @@ -0,0 +1,50 @@ + + */ +final class OutgoingRequestEvent +{ + public function __construct( + private readonly Request $request, + private readonly int $timeout, + private readonly SessionInterface $session, + ) { + } + + public function getRequest(): Request + { + return $this->request; + } + + public function getSession(): SessionInterface + { + return $this->session; + } + + public function getTimeout(): int + { + return $this->timeout; + } + + public function getMethod(): string + { + return $this->request::getMethod(); + } +} diff --git a/src/JsonRpc/MessageFactory.php b/src/JsonRpc/MessageFactory.php index 1bb82db8..6d247587 100644 --- a/src/JsonRpc/MessageFactory.php +++ b/src/JsonRpc/MessageFactory.php @@ -118,6 +118,16 @@ public function create(string $input): array return $messages; } + /** + * @param array $data + * + * @throws InvalidInputMessageException + */ + public function createFromArray(array $data): MessageInterface + { + return $this->createMessage($data); + } + /** * Creates a single message object from parsed JSON data. * diff --git a/src/Server/Protocol.php b/src/Server/Protocol.php index d9af4e4c..74e0721e 100644 --- a/src/Server/Protocol.php +++ b/src/Server/Protocol.php @@ -11,8 +11,10 @@ namespace Mcp\Server; +use Mcp\Event\ClientResponseEvent; use Mcp\Event\ErrorEvent; use Mcp\Event\NotificationEvent; +use Mcp\Event\OutgoingRequestEvent; use Mcp\Event\RequestEvent; use Mcp\Event\ResponseEvent; use Mcp\Exception\InvalidInputMessageException; @@ -56,6 +58,9 @@ class Protocol /** Session key for outgoing message queue */ private const SESSION_OUTGOING_QUEUE = '_mcp.outgoing_queue'; + /** Session key for the client request that started a suspended Fiber */ + private const SESSION_FIBER_PARENT_REQUEST = '_mcp.fiber_parent_request'; + /** Session key for active request meta */ public const SESSION_ACTIVE_REQUEST_META = '_mcp.active_request_meta'; @@ -96,6 +101,8 @@ public function connect(TransportInterface $transport): void $transport->setFiberYieldHandler($this->handleFiberYield(...)); + $transport->setFiberTerminationHandler($this->handleFiberTermination(...)); + $this->logger->info('Protocol connected to transport', ['transport' => $transport::class]); } @@ -199,6 +206,8 @@ private function handleRequest(TransportInterface $transport, Request $request, $result = $fiber->start(); if ($fiber->isSuspended()) { + $session->set(self::SESSION_FIBER_PARENT_REQUEST, $request->jsonSerialize()); + if (\is_array($result) && isset($result['type'])) { if ('notification' === $result['type']) { $notification = $result['notification']; @@ -262,6 +271,8 @@ private function handleResponse(Response|Error $response, SessionInterface $sess { $this->logger->info('Handling response from client.', ['response' => $response]); + $this->dispatchEvent(new ClientResponseEvent($response, $session)); + $messageId = $response->getId(); $session->set(self::SESSION_RESPONSES.".{$messageId}", $response->jsonSerialize()); @@ -303,6 +314,8 @@ public function sendRequest(Request $request, int $timeout, SessionInterface $se $requestWithId = $request->withId($requestId); + $this->dispatchEvent(new OutgoingRequestEvent($requestWithId, $timeout, $session)); + $this->logger->info('Queueing server request to client', [ 'request_id' => $requestId, 'method' => $request::getMethod(), @@ -540,6 +553,57 @@ public function handleFiberYield(mixed $yieldedValue, ?Uuid $sessionId): void } } + /** + * Handle the final result of a suspended Fiber when it completes. + * + * Dispatches ResponseEvent or ErrorEvent for the original client request that + * started the Fiber, allowing listeners to observe deferred responses. + * + * @phpstan-param Response|Error $finalResult + * + * @phpstan-return Response|Error + */ + public function handleFiberTermination(Response|Error $finalResult, Uuid $sessionId): Response|Error + { + $session = $this->sessionManager->createWithId($sessionId); + $parentRequest = $this->resolveFiberParentRequest( + $session->pull(self::SESSION_FIBER_PARENT_REQUEST) + ); + + if (!$parentRequest) { + $session->save(); + + return $finalResult; + } + + if ($finalResult instanceof Response) { + $responseEvent = $this->dispatchEvent(new ResponseEvent($finalResult, $parentRequest, $session)); + $finalResult = $responseEvent->getResponse(); + } else { + $errorEvent = $this->dispatchEvent(new ErrorEvent($finalResult, $parentRequest, $session, null)); + $finalResult = $errorEvent->getError(); + } + + $session->save(); + + return $finalResult; + } + + private function resolveFiberParentRequest(mixed $data): ?Request + { + if (!\is_array($data)) { + return null; + } + + try { + $message = $this->messageFactory->createFromArray($data); + } catch (\Throwable) { + return null; + } + + return $message instanceof Request ? $message : null; + } + /** * @param array $messages */ diff --git a/src/Server/Transport/BaseTransport.php b/src/Server/Transport/BaseTransport.php index 58172352..07f9486d 100644 --- a/src/Server/Transport/BaseTransport.php +++ b/src/Server/Transport/BaseTransport.php @@ -127,6 +127,27 @@ protected function handleFiberYield(mixed $yielded, ?Uuid $sessionId): void } } + /** + * @phpstan-param FiberReturn $finalResult + * + * @phpstan-return FiberReturn + */ + protected function handleFiberTerminationResult(Response|Error $finalResult): Response|Error + { + if ($this->sessionId && \is_callable($this->fiberTerminationHandler)) { + try { + return ($this->fiberTerminationHandler)($finalResult, $this->sessionId); + } catch (\Throwable $e) { + $this->logger->error('Fiber termination handler failed.', [ + 'exception' => $e, + 'sessionId' => $this->sessionId->toRfc4122(), + ]); + } + } + + return $finalResult; + } + protected function handleMessage(string $payload, ?Uuid $sessionId): void { if (\is_callable($this->messageListener)) { diff --git a/src/Server/Transport/ManagesTransportCallbacks.php b/src/Server/Transport/ManagesTransportCallbacks.php index 072d3f0e..69f934f3 100644 --- a/src/Server/Transport/ManagesTransportCallbacks.php +++ b/src/Server/Transport/ManagesTransportCallbacks.php @@ -44,6 +44,9 @@ trait ManagesTransportCallbacks /** @var callable(FiberSuspend|null, ?Uuid): void */ protected $fiberYieldHandler; + /** @var callable(FiberReturn, Uuid): FiberReturn */ + protected $fiberTerminationHandler; + public function onMessage(callable $listener): void { $this->messageListener = $listener; @@ -79,4 +82,12 @@ public function setFiberYieldHandler(callable $handler): void { $this->fiberYieldHandler = $handler; } + + /** + * @param callable(FiberReturn, Uuid): FiberReturn $handler + */ + public function setFiberTerminationHandler(callable $handler): void + { + $this->fiberTerminationHandler = $handler; + } } diff --git a/src/Server/Transport/StdioTransport.php b/src/Server/Transport/StdioTransport.php index 4da7f2a3..a6dbbf01 100644 --- a/src/Server/Transport/StdioTransport.php +++ b/src/Server/Transport/StdioTransport.php @@ -135,6 +135,8 @@ private function handleFiberTermination(): void $finalResult = $this->sessionFiber->getReturn(); if (null !== $finalResult) { + $finalResult = $this->handleFiberTerminationResult($finalResult); + try { $encoded = json_encode($finalResult, \JSON_THROW_ON_ERROR); $this->writeLine($encoded); diff --git a/src/Server/Transport/StreamableHttpTransport.php b/src/Server/Transport/StreamableHttpTransport.php index ab84b092..a3ad9f04 100644 --- a/src/Server/Transport/StreamableHttpTransport.php +++ b/src/Server/Transport/StreamableHttpTransport.php @@ -233,6 +233,8 @@ protected function handleFiberTermination(): void $finalResult = $this->sessionFiber->getReturn(); if (null !== $finalResult) { + $finalResult = $this->handleFiberTerminationResult($finalResult); + try { $encoded = json_encode($finalResult, \JSON_THROW_ON_ERROR); echo "event: message\n"; diff --git a/src/Server/Transport/TransportInterface.php b/src/Server/Transport/TransportInterface.php index 58d09789..a8636d4a 100644 --- a/src/Server/Transport/TransportInterface.php +++ b/src/Server/Transport/TransportInterface.php @@ -118,6 +118,15 @@ public function setResponseFinder(callable $finder): void; */ public function setFiberYieldHandler(callable $handler): void; + /** + * Set a handler invoked when a suspended Fiber completes. + * + * The transport calls this before sending the Fiber's final result to the client. + * + * @param callable(FiberReturn, Uuid): FiberReturn $handler + */ + public function setFiberTerminationHandler(callable $handler): void; + /** * @param McpFiber $fiber */ diff --git a/tests/Unit/JsonRpc/MessageFactoryTest.php b/tests/Unit/JsonRpc/MessageFactoryTest.php index d38aabeb..83842686 100644 --- a/tests/Unit/JsonRpc/MessageFactoryTest.php +++ b/tests/Unit/JsonRpc/MessageFactoryTest.php @@ -35,6 +35,18 @@ protected function setUp(): void ]); } + public function testCreateFromArrayRequest(): void + { + $message = $this->factory->createFromArray([ + 'jsonrpc' => '2.0', + 'method' => 'ping', + 'id' => 1, + ]); + + $this->assertInstanceOf(PingRequest::class, $message); + $this->assertSame(1, $message->getId()); + } + public function testCreateRequestWithIntegerId(): void { $json = '{"jsonrpc": "2.0", "method": "prompts/get", "params": {"name": "create_story"}, "id": 123}'; diff --git a/tests/Unit/Server/ProtocolTest.php b/tests/Unit/Server/ProtocolTest.php index f1d1c834..c7adf933 100644 --- a/tests/Unit/Server/ProtocolTest.php +++ b/tests/Unit/Server/ProtocolTest.php @@ -11,8 +11,10 @@ namespace Mcp\Tests\Unit\Server; +use Mcp\Event\ClientResponseEvent; use Mcp\Event\ErrorEvent; use Mcp\Event\NotificationEvent; +use Mcp\Event\OutgoingRequestEvent; use Mcp\Event\RequestEvent; use Mcp\Event\ResponseEvent; use Mcp\JsonRpc\MessageFactory; @@ -21,9 +23,12 @@ use Mcp\Schema\JsonRpc\Response; use Mcp\Schema\Notification\LoggingMessageNotification; use Mcp\Schema\Request\CallToolRequest; +use Mcp\Schema\Request\PingRequest; use Mcp\Server\Handler\Notification\NotificationHandlerInterface; use Mcp\Server\Handler\Request\RequestHandlerInterface; use Mcp\Server\Protocol; +use Mcp\Server\Session\InMemorySessionStore; +use Mcp\Server\Session\Session; use Mcp\Server\Session\SessionInterface; use Mcp\Server\Session\SessionManagerInterface; use Mcp\Server\Transport\TransportInterface; @@ -1312,4 +1317,346 @@ public function testNotificationEventWithNullDispatcher(): void $sessionId ); } + + #[TestDox('OutgoingRequestEvent is dispatched when server sends a request to the client')] + public function testOutgoingRequestEventIsDispatched(): void + { + $capturedEvent = null; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->expects($this->once()) + ->method('dispatch') + ->with($this->callback(static function ($event) use (&$capturedEvent) { + $capturedEvent = $event; + + return $event instanceof OutgoingRequestEvent; + })) + ->willReturnArgument(0); + + $session = $this->createMock(SessionInterface::class); + $session->method('get')->willReturnCallback(static function ($key, $default = null) { + if ('_mcp.request_id_counter' === $key) { + return 1000; + } + + return $default; + }); + $session->method('getId')->willReturn(Uuid::v4()); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $request = PingRequest::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 0, + 'method' => 'ping', + ]); + + $protocol->sendRequest($request, 60, $session); + + $this->assertInstanceOf(OutgoingRequestEvent::class, $capturedEvent); + $this->assertSame($session, $capturedEvent->getSession()); + $this->assertSame(60, $capturedEvent->getTimeout()); + $this->assertSame('ping', $capturedEvent->getMethod()); + $this->assertSame(1000, $capturedEvent->getRequest()->getId()); + } + + #[TestDox('ClientResponseEvent is dispatched when a client response is received')] + public function testClientResponseEventIsDispatched(): void + { + $capturedEvent = null; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->expects($this->once()) + ->method('dispatch') + ->with($this->callback(static function ($event) use (&$capturedEvent) { + $capturedEvent = $event; + + return $event instanceof ClientResponseEvent; + })) + ->willReturnArgument(0); + + $session = $this->createMock(SessionInterface::class); + + $this->sessionManager->method('createWithId')->willReturn($session); + $this->sessionManager->method('exists')->willReturn(true); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $sessionId = Uuid::v4(); + $protocol->processInput( + $this->transport, + '{"jsonrpc": "2.0", "id": 1000, "result": {"action": "accept"}}', + $sessionId + ); + + $this->assertInstanceOf(ClientResponseEvent::class, $capturedEvent); + $this->assertSame($session, $capturedEvent->getSession()); + $this->assertSame(1000, $capturedEvent->getId()); + $this->assertFalse($capturedEvent->isError()); + } + + #[TestDox('ClientResponseEvent reports errors via isError()')] + public function testClientResponseEventIsError(): void + { + $capturedEvent = null; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->method('dispatch') + ->willReturnCallback(static function ($event) use (&$capturedEvent) { + if ($event instanceof ClientResponseEvent) { + $capturedEvent = $event; + } + + return $event; + }); + + $session = $this->createMock(SessionInterface::class); + + $this->sessionManager->method('createWithId')->willReturn($session); + $this->sessionManager->method('exists')->willReturn(true); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $sessionId = Uuid::v4(); + $protocol->processInput( + $this->transport, + '{"jsonrpc": "2.0", "id": 1000, "error": {"code": -32603, "message": "Client error"}}', + $sessionId + ); + + $this->assertInstanceOf(ClientResponseEvent::class, $capturedEvent); + $this->assertTrue($capturedEvent->isError()); + } + + #[TestDox('ResponseEvent is dispatched when a suspended Fiber completes')] + public function testResponseEventIsDispatchedOnFiberTermination(): void + { + $capturedEvents = []; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->method('dispatch') + ->willReturnCallback(static function ($event) use (&$capturedEvents) { + $capturedEvents[] = $event; + + return $event; + }); + + $sessionId = Uuid::v4(); + $session = $this->createMock(SessionInterface::class); + $session->method('getId')->willReturn($sessionId); + + $parentRequest = PingRequest::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 1, + 'method' => 'ping', + ]); + + $session->method('pull') + ->with('_mcp.fiber_parent_request') + ->willReturn($parentRequest->jsonSerialize()); + + $this->sessionManager->method('createWithId')->willReturn($session); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $finalResult = Response::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 1, + 'result' => ['status' => 'ok'], + ]); + $result = $protocol->handleFiberTermination($finalResult, $sessionId); + + $this->assertSame(['status' => 'ok'], $result->result); + $this->assertCount(1, $capturedEvents); + $this->assertInstanceOf(ResponseEvent::class, $capturedEvents[0]); + $this->assertSame('ping', $capturedEvents[0]->getMethod()); + $this->assertSame($session, $capturedEvents[0]->getSession()); + } + + #[TestDox('ErrorEvent is dispatched when a suspended Fiber completes with an error')] + public function testErrorEventIsDispatchedOnFiberTermination(): void + { + $capturedEvents = []; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->method('dispatch') + ->willReturnCallback(static function ($event) use (&$capturedEvents) { + $capturedEvents[] = $event; + + return $event; + }); + + $sessionId = Uuid::v4(); + $session = $this->createMock(SessionInterface::class); + $session->method('getId')->willReturn($sessionId); + + $parentRequest = PingRequest::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 1, + 'method' => 'ping', + ]); + + $session->method('pull') + ->with('_mcp.fiber_parent_request') + ->willReturn($parentRequest->jsonSerialize()); + + $this->sessionManager->method('createWithId')->willReturn($session); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $finalResult = Error::forInternalError('Fiber failed', 1); + $result = $protocol->handleFiberTermination($finalResult, $sessionId); + + $this->assertInstanceOf(Error::class, $result); + $this->assertCount(1, $capturedEvents); + $this->assertInstanceOf(ErrorEvent::class, $capturedEvents[0]); + $this->assertSame('ping', $capturedEvents[0]->getRequest()::getMethod()); + } + + #[TestDox('Fiber parent request is stored when handler suspends')] + public function testFiberParentRequestIsStoredOnSuspend(): void + { + $storedParentRequest = null; + + $handler = $this->createMock(RequestHandlerInterface::class); + $handler->method('supports')->willReturn(true); + $handler->method('handle')->willReturnCallback(static function () { + \Fiber::suspend([ + 'type' => 'request', + 'request' => PingRequest::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 0, + 'method' => 'ping', + ]), + 'timeout' => 60, + ]); + + return new Response(1, []); + }); + + $session = $this->createMock(SessionInterface::class); + $session->method('getId')->willReturn(Uuid::v4()); + $session->method('get')->willReturnCallback(static function ($key, $default = null) { + if ('_mcp.request_id_counter' === $key) { + return 1000; + } + + return $default; + }); + $session->method('set')->willReturnCallback(static function ($key, $value) use (&$storedParentRequest) { + if ('_mcp.fiber_parent_request' === $key) { + $storedParentRequest = $value; + } + }); + + $this->sessionManager->method('createWithId')->willReturn($session); + $this->sessionManager->method('exists')->willReturn(true); + + $this->transport->expects($this->once())->method('attachFiberToSession'); + + $protocol = new Protocol( + requestHandlers: [$handler], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $this->sessionManager, + ); + + $sessionId = Uuid::v4(); + $protocol->processInput( + $this->transport, + '{"jsonrpc": "2.0", "id": 1, "method": "ping"}', + $sessionId + ); + + $this->assertIsArray($storedParentRequest); + $this->assertSame('ping', $storedParentRequest['method']); + $this->assertSame(1, $storedParentRequest['id']); + } + + #[TestDox('ResponseEvent is dispatched after session reload when Fiber completes')] + public function testResponseEventIsDispatchedOnFiberTerminationAfterSessionSave(): void + { + $capturedEvents = []; + + $eventDispatcher = $this->createMock(EventDispatcherInterface::class); + $eventDispatcher + ->method('dispatch') + ->willReturnCallback(static function ($event) use (&$capturedEvents) { + $capturedEvents[] = $event; + + return $event; + }); + + $store = new InMemorySessionStore(); + $sessionId = Uuid::v4(); + $session = new Session($store, $sessionId); + + $parentRequest = PingRequest::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 1, + 'method' => 'ping', + ]); + $session->set('_mcp.fiber_parent_request', $parentRequest->jsonSerialize()); + $session->save(); + + $sessionManager = $this->createMock(SessionManagerInterface::class); + $sessionManager->method('createWithId')->willReturnCallback( + static fn (Uuid $id) => new Session($store, $id) + ); + + $protocol = new Protocol( + requestHandlers: [], + notificationHandlers: [], + messageFactory: MessageFactory::make(), + sessionManager: $sessionManager, + eventDispatcher: $eventDispatcher, + ); + + $finalResult = Response::fromArray([ + 'jsonrpc' => '2.0', + 'id' => 1, + 'result' => ['status' => 'ok'], + ]); + $result = $protocol->handleFiberTermination($finalResult, $sessionId); + + $this->assertSame(['status' => 'ok'], $result->result); + $this->assertCount(1, $capturedEvents); + $this->assertInstanceOf(ResponseEvent::class, $capturedEvents[0]); + $this->assertSame('ping', $capturedEvents[0]->getMethod()); + } }