Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The MCP SDK provides a PSR-14 compatible event system that allows you to hook in
- [ResponseEvent](#responseevent)
- [ErrorEvent](#errorevent)
- [NotificationEvent](#notificationevent)
- [OutgoingRequestEvent](#outgoingrequestevent)
- [ClientResponseEvent](#clientresponseevent)
- [List Change Events](#list-change-events)

## Setup
Expand Down Expand Up @@ -37,7 +39,7 @@ $server = Server::builder()

## Protocol Events

The SDK dispatches 4 broad event types at the protocol level, allowing you to observe and modify all server operations:
The SDK dispatches 6 broad event types at the protocol level, allowing you to observe and modify all server operations:

### RequestEvent

Expand All @@ -51,7 +53,7 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob

### ResponseEvent

**Dispatched**: When a successful response is ready to be sent to the client, after handler execution.
**Dispatched**: When a successful response is ready to be sent to the client, after handler execution. Also dispatched when a suspended Fiber completes (e.g. after elicitation or sampling).

**Properties**:
- `getResponse(): Response` - The response being sent
Expand All @@ -62,7 +64,7 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob

### ErrorEvent

**Dispatched**: When an error occurs during request processing.
**Dispatched**: When an error occurs during request processing. Also dispatched when a suspended Fiber completes with an error.

**Properties**:
- `getError(): Error` - The error being sent
Expand All @@ -81,6 +83,26 @@ The SDK dispatches 4 broad event types at the protocol level, allowing you to ob
- `getSession(): SessionInterface` - The current session
- `getMethod(): string` - Convenience method to get the notification method

### OutgoingRequestEvent

**Dispatched**: When the server sends a request to the client (e.g. `elicitation/create`, `sampling/create`).

**Properties**:
- `getRequest(): Request` - The outgoing request (with server-assigned ID)
- `getSession(): SessionInterface` - The current session
- `getTimeout(): int` - Maximum time to wait for the client response (seconds)
- `getMethod(): string` - Convenience method to get the request method

### ClientResponseEvent

**Dispatched**: When the server receives a client response to a prior outgoing request.

**Properties**:
- `getResponse(): Response|Error` - The client's reply
- `getSession(): SessionInterface` - The current session
- `getId(): string|int` - The JSON-RPC message ID
- `isError(): bool` - Whether the client returned a JSON-RPC error

## List Change Events

These events are dispatched when the lists of available capabilities change:
Expand Down
56 changes: 56 additions & 0 deletions src/Event/ClientResponseEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

/*
* This file is part of the official PHP MCP SDK.
*
* A collaboration between Symfony and the PHP Foundation.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Mcp\Event;

use Mcp\Schema\JsonRpc\Error;
use Mcp\Schema\JsonRpc\Response;
use Mcp\Server\Session\SessionInterface;

/**
* Event dispatched when the server receives a client response to a prior outgoing request.
*
* @author Olivier Mouren <mouren.olivier@gmail.com>
*/
final class ClientResponseEvent
{
/**
* @param Response<mixed>|Error $response
*/
public function __construct(
private readonly Response|Error $response,
private readonly SessionInterface $session,
) {
}

/**
* @return Response<mixed>|Error
*/
public function getResponse(): Response|Error
{
return $this->response;
}

public function getSession(): SessionInterface
{
return $this->session;
}

public function getId(): string|int
{
return $this->response->getId();
}

public function isError(): bool
{
return $this->response instanceof Error;
}
}
50 changes: 50 additions & 0 deletions src/Event/OutgoingRequestEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the official PHP MCP SDK.
*
* A collaboration between Symfony and the PHP Foundation.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Mcp\Event;

use Mcp\Schema\JsonRpc\Request;
use Mcp\Server\Session\SessionInterface;

/**
* Event dispatched when the server sends a request to the client (e.g. elicitation/create, sampling/create).
*
* @author Olivier Mouren <mouren.olivier@gmail.com>
*/
final class OutgoingRequestEvent
{
public function __construct(
private readonly Request $request,
private readonly int $timeout,
private readonly SessionInterface $session,
) {
}

public function getRequest(): Request
{
return $this->request;
}

public function getSession(): SessionInterface
{
return $this->session;
}

public function getTimeout(): int
{
return $this->timeout;
}

public function getMethod(): string
{
return $this->request::getMethod();
}
}
10 changes: 10 additions & 0 deletions src/JsonRpc/MessageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ public function create(string $input): array
return $messages;
}

/**
* @param array<string, mixed> $data
*
* @throws InvalidInputMessageException
*/
public function createFromArray(array $data): MessageInterface
{
return $this->createMessage($data);
}

/**
* Creates a single message object from parsed JSON data.
*
Expand Down
64 changes: 64 additions & 0 deletions src/Server/Protocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

namespace Mcp\Server;

use Mcp\Event\ClientResponseEvent;
use Mcp\Event\ErrorEvent;
use Mcp\Event\NotificationEvent;
use Mcp\Event\OutgoingRequestEvent;
use Mcp\Event\RequestEvent;
use Mcp\Event\ResponseEvent;
use Mcp\Exception\InvalidInputMessageException;
Expand Down Expand Up @@ -56,6 +58,9 @@ class Protocol
/** Session key for outgoing message queue */
private const SESSION_OUTGOING_QUEUE = '_mcp.outgoing_queue';

/** Session key for the client request that started a suspended Fiber */
private const SESSION_FIBER_PARENT_REQUEST = '_mcp.fiber_parent_request';

/** Session key for active request meta */
public const SESSION_ACTIVE_REQUEST_META = '_mcp.active_request_meta';

Expand Down Expand Up @@ -96,6 +101,8 @@ public function connect(TransportInterface $transport): void

$transport->setFiberYieldHandler($this->handleFiberYield(...));

$transport->setFiberTerminationHandler($this->handleFiberTermination(...));

$this->logger->info('Protocol connected to transport', ['transport' => $transport::class]);
}

Expand Down Expand Up @@ -199,6 +206,8 @@ private function handleRequest(TransportInterface $transport, Request $request,
$result = $fiber->start();

if ($fiber->isSuspended()) {
$session->set(self::SESSION_FIBER_PARENT_REQUEST, $request->jsonSerialize());

if (\is_array($result) && isset($result['type'])) {
if ('notification' === $result['type']) {
$notification = $result['notification'];
Expand Down Expand Up @@ -262,6 +271,8 @@ private function handleResponse(Response|Error $response, SessionInterface $sess
{
$this->logger->info('Handling response from client.', ['response' => $response]);

$this->dispatchEvent(new ClientResponseEvent($response, $session));

$messageId = $response->getId();

$session->set(self::SESSION_RESPONSES.".{$messageId}", $response->jsonSerialize());
Expand Down Expand Up @@ -303,6 +314,8 @@ public function sendRequest(Request $request, int $timeout, SessionInterface $se

$requestWithId = $request->withId($requestId);

$this->dispatchEvent(new OutgoingRequestEvent($requestWithId, $timeout, $session));

$this->logger->info('Queueing server request to client', [
'request_id' => $requestId,
'method' => $request::getMethod(),
Expand Down Expand Up @@ -540,6 +553,57 @@ public function handleFiberYield(mixed $yieldedValue, ?Uuid $sessionId): void
}
}

/**
* Handle the final result of a suspended Fiber when it completes.
*
* Dispatches ResponseEvent or ErrorEvent for the original client request that
* started the Fiber, allowing listeners to observe deferred responses.
*
* @phpstan-param Response<mixed>|Error $finalResult
*
* @phpstan-return Response<mixed>|Error
*/
public function handleFiberTermination(Response|Error $finalResult, Uuid $sessionId): Response|Error
{
$session = $this->sessionManager->createWithId($sessionId);
$parentRequest = $this->resolveFiberParentRequest(
$session->pull(self::SESSION_FIBER_PARENT_REQUEST)
);

if (!$parentRequest) {
$session->save();

return $finalResult;
}

if ($finalResult instanceof Response) {
$responseEvent = $this->dispatchEvent(new ResponseEvent($finalResult, $parentRequest, $session));
$finalResult = $responseEvent->getResponse();
} else {
$errorEvent = $this->dispatchEvent(new ErrorEvent($finalResult, $parentRequest, $session, null));
$finalResult = $errorEvent->getError();
}

$session->save();

return $finalResult;
}

private function resolveFiberParentRequest(mixed $data): ?Request
{
if (!\is_array($data)) {
return null;
}

try {
$message = $this->messageFactory->createFromArray($data);
} catch (\Throwable) {
return null;
}

return $message instanceof Request ? $message : null;
}

/**
* @param array<int, mixed> $messages
*/
Expand Down
21 changes: 21 additions & 0 deletions src/Server/Transport/BaseTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,27 @@ protected function handleFiberYield(mixed $yielded, ?Uuid $sessionId): void
}
}

/**
* @phpstan-param FiberReturn $finalResult
*
* @phpstan-return FiberReturn
*/
protected function handleFiberTerminationResult(Response|Error $finalResult): Response|Error
{
if ($this->sessionId && \is_callable($this->fiberTerminationHandler)) {
try {
return ($this->fiberTerminationHandler)($finalResult, $this->sessionId);
} catch (\Throwable $e) {
$this->logger->error('Fiber termination handler failed.', [
'exception' => $e,
'sessionId' => $this->sessionId->toRfc4122(),
]);
}
}

return $finalResult;
}

protected function handleMessage(string $payload, ?Uuid $sessionId): void
{
if (\is_callable($this->messageListener)) {
Expand Down
11 changes: 11 additions & 0 deletions src/Server/Transport/ManagesTransportCallbacks.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ trait ManagesTransportCallbacks
/** @var callable(FiberSuspend|null, ?Uuid): void */
protected $fiberYieldHandler;

/** @var callable(FiberReturn, Uuid): FiberReturn */
protected $fiberTerminationHandler;

public function onMessage(callable $listener): void
{
$this->messageListener = $listener;
Expand Down Expand Up @@ -79,4 +82,12 @@ public function setFiberYieldHandler(callable $handler): void
{
$this->fiberYieldHandler = $handler;
}

/**
* @param callable(FiberReturn, Uuid): FiberReturn $handler
*/
public function setFiberTerminationHandler(callable $handler): void
{
$this->fiberTerminationHandler = $handler;
}
}
2 changes: 2 additions & 0 deletions src/Server/Transport/StdioTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ private function handleFiberTermination(): void
$finalResult = $this->sessionFiber->getReturn();

if (null !== $finalResult) {
$finalResult = $this->handleFiberTerminationResult($finalResult);

try {
$encoded = json_encode($finalResult, \JSON_THROW_ON_ERROR);
$this->writeLine($encoded);
Expand Down
2 changes: 2 additions & 0 deletions src/Server/Transport/StreamableHttpTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ protected function handleFiberTermination(): void
$finalResult = $this->sessionFiber->getReturn();

if (null !== $finalResult) {
$finalResult = $this->handleFiberTerminationResult($finalResult);

try {
$encoded = json_encode($finalResult, \JSON_THROW_ON_ERROR);
echo "event: message\n";
Expand Down
9 changes: 9 additions & 0 deletions src/Server/Transport/TransportInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public function setResponseFinder(callable $finder): void;
*/
public function setFiberYieldHandler(callable $handler): void;

/**
* Set a handler invoked when a suspended Fiber completes.
*
* The transport calls this before sending the Fiber's final result to the client.
*
* @param callable(FiberReturn, Uuid): FiberReturn $handler
*/
public function setFiberTerminationHandler(callable $handler): void;

/**
* @param McpFiber $fiber
*/
Expand Down
Loading