diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4014956..387dc37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,7 +39,7 @@ jobs: run: sbt -v scalafmtCheckAll - name: Compile run: sbt -v compile - # TODO bring this step back after first release with new project structure (client + server) +# TODO uncomment after release # - name: Verify that examples compile using Scala CLI # run: sbt -v "project examples" verifyExamplesCompileUsingScalaCli - name: Compile documentation diff --git a/build.sbt b/build.sbt index 85b40f5..05ba85b 100644 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,7 @@ val sttpClientV = "4.0.25" val zioV = "2.1.26" val zioProcessV = "0.8.0" val zioHttpV = "3.8.0" +val oxV = "1.0.5" val testcontainersScalaV = "0.41.8" lazy val verifyExamplesCompileUsingScalaCli = taskKey[Unit]("Verify that each example compiles using Scala CLI") @@ -36,7 +37,7 @@ val scalaTest = "org.scalatest" %% "scalatest" % scalaTestV % Test lazy val root = (project in file(".")) .settings(commonSettings: _*) .settings(publishArtifact := false, name := "chimp") - .aggregate(core, server, serverZio, client, clientZio, examples, serverConformance, clientConformance) + .aggregate(core, server, serverZio, serverOx, client, clientZio, clientOx, examples, serverConformance, clientConformance) val conformance = inputKey[Unit]("Run the MCP conformance harness via npx, extra args are passed through") @@ -85,6 +86,18 @@ lazy val serverZio: Project = (project in file("server-streaming/server-zio")) ) .dependsOn(server % "compile->compile;test->test", clientZio % "test->compile") +lazy val serverOx: Project = (project in file("server-streaming/server-ox")) + .settings(commonSettings: _*) + .settings( + name := "chimp-server-ox", + libraryDependencies ++= Seq( + scalaTest, + "com.softwaremill.sttp.tapir" %% "tapir-netty-server-sync" % tapirV, + "com.softwaremill.ox" %% "core" % oxV + ) + ) + .dependsOn(server % "compile->compile;test->test", clientOx % "test->compile") + lazy val client: Project = (project in file("client")) .settings(commonSettings: _*) .settings( @@ -113,6 +126,18 @@ lazy val clientZio: Project = (project in file("client-streaming/client-zio")) ) .dependsOn(client % "compile->compile;test->test") +lazy val clientOx: Project = (project in file("client-streaming/client-ox")) + .settings(commonSettings: _*) + .settings( + name := "chimp-client-ox", + libraryDependencies ++= Seq( + scalaTest, + "com.softwaremill.sttp.client4" %% "core" % sttpClientV, + "com.softwaremill.ox" %% "core" % oxV + ) + ) + .dependsOn(client % "compile->compile;test->test") + lazy val examples = (project in file("examples")) .settings(commonSettings: _*) .settings( @@ -126,7 +151,7 @@ lazy val examples = (project in file("examples")) ), verifyExamplesCompileUsingScalaCli := VerifyExamplesCompileUsingScalaCli(sLog.value, sourceDirectory.value) ) - .dependsOn(server, client) + .dependsOn(server, serverOx, client, clientOx) import sbtassembly.AssemblyPlugin.autoImport.* @@ -266,4 +291,4 @@ lazy val docs: Project = (project in file("generated-docs")) publishArtifact := false, name := "docs" ) - .dependsOn(core, server, serverZio, client, clientZio) + .dependsOn(core, server, serverZio, serverOx, client, clientZio, clientOx) diff --git a/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientHttpTransport.scala b/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientHttpTransport.scala new file mode 100644 index 0000000..2e1d53f --- /dev/null +++ b/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientHttpTransport.scala @@ -0,0 +1,249 @@ +package chimp.client.transport.ox + +import chimp.client.internal.SyncPendingRequests +import chimp.client.transport.ClientHttpTransport.HttpOutcome +import chimp.client.transport.{ClientBidirectionalTransport, ClientHttpTransport, ClientTransport} +import chimp.client.{McpProtocolException, McpSessionNotFoundException, McpTransportException} +import chimp.protocol.{JSONRPCErrorCodes, JSONRPCErrorObject, JSONRPCMessage, ProtocolVersion, RequestId} +import org.slf4j.LoggerFactory +import ox.* +import sttp.client4.{asInputStreamUnsafe, basicRequest, Response, SyncBackend} +import sttp.model.sse.ServerSentEvent +import sttp.model.{MediaType, StatusCode, Uri} +import sttp.monad.{IdentityMonad, MonadError} +import sttp.shared.Identity + +import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} +import scala.concurrent.duration.{FiniteDuration, *} + +final class OxClientHttpTransport private ( + backend: SyncBackend, + uri: Uri, + protocolVersion: ProtocolVersion, + timeout: FiniteDuration, + scope: Ox, + sessionId: AtomicReference[Option[String]], + pending: SyncPendingRequests, + incoming: AtomicReference[JSONRPCMessage => Unit], + lastEventId: AtomicReference[Option[String]], + closing: AtomicBoolean, + sessionReady: CountDownLatch, + openStreams: java.util.Set[InputStream] +) extends ClientBidirectionalTransport[Identity]: + + private val log = LoggerFactory.getLogger(classOf[OxClientHttpTransport]) + private given Ox = scope + + given monad: MonadError[Identity] = IdentityMonad + + override def send(msg: JSONRPCMessage): Identity[Option[JSONRPCMessage]] = + if closing.get() then throw McpTransportException("HTTP transport is closed") + msg match + case request: JSONRPCMessage.Request => + val await = pending.register(request.id, timeout) + try sendRequest(request, await) + finally { val _ = pending.complete(request.id, cancelled(request.id)) } + case other => sendNonRequest(other) + + override def onIncoming(handler: JSONRPCMessage => Identity[Unit]): Identity[Unit] = incoming.set(handler) + + override def close(): Identity[Unit] = + if closing.compareAndSet(false, true) then + sessionReady.countDown() + openStreams.forEach(closeQuietly) + sessionId.getAndSet(None) match + case Some(id) => + try drainBody(ClientHttpTransport.baseDeleteRequest(uri, protocolVersion, id).response(asInputStreamUnsafe).send(backend)) + catch case _: Exception => () + case None => () + pending.closeAll("Transport closed") + + private def sendRequest(request: JSONRPCMessage.Request, await: () => JSONRPCMessage): Option[JSONRPCMessage] = + val response = post(request) + captureSession(response) + ClientHttpTransport.resolveResponse(response, sessionId.get()) match + case Left(err: McpSessionNotFoundException) => sessionId.set(None); throw err + case Left(err) => throw err + case Right(HttpOutcome.NoBody) => + drainBody(response) + throw McpProtocolException("Server returned 202 Accepted for a Request") + case Right(HttpOutcome.JsonBody) => + val message = decode(collectBody(response)) + routeMessage(message) + Some(await()) + case Right(HttpOutcome.SseBody) => + response.body match + case Right(stream) => forkSseDrain(stream, Some(request.id)); Some(await()) + case Left(err) => throw McpProtocolException(s"Expected SSE stream, got: $err") + + private def sendNonRequest(msg: JSONRPCMessage): Option[JSONRPCMessage] = + val response = post(msg) + captureSession(response) + ClientHttpTransport.resolveResponse(response, sessionId.get()) match + case Left(err: McpSessionNotFoundException) => sessionId.set(None); throw err + case Left(err) => throw err + case Right(HttpOutcome.NoBody) => drainBody(response); None + case Right(HttpOutcome.JsonBody) => drainBody(response); None + case Right(HttpOutcome.SseBody) => + response.body match + case Right(stream) => forkSseDrain(stream, None); None + case Left(_) => None + + private def post(msg: JSONRPCMessage): Response[Either[String, InputStream]] = + ClientHttpTransport + .basePostRequest(uri, protocolVersion, sessionId.get(), ClientTransport.encode(msg)) + .response(asInputStreamUnsafe) + .send(backend) + + private def captureSession(response: Response[?]): Unit = + response.header("Mcp-Session-Id").foreach(id => sessionId.set(Some(id))) + sessionReady.countDown() + + private def collectBody(response: Response[Either[String, InputStream]]): String = + response.body match + case Right(stream) => + try String(stream.readAllBytes(), StandardCharsets.UTF_8) + finally closeQuietly(stream) + case Left(err) => throw McpProtocolException(s"HTTP 200 with non-stream body: $err") + + private def drainBody(response: Response[Either[String, InputStream]]): Unit = + response.body match + case Right(stream) => + try { val _ = stream.readAllBytes() } + catch case _: Exception => () + finally closeQuietly(stream) + case Left(_) => () + + private def decode(body: String): JSONRPCMessage = + ClientTransport.decode(body) match + case Right(message) => message + case Left(err) => throw McpProtocolException(s"Failed to decode response body: ${err.getMessage}, payload $body") + + private def forkSseDrain(stream: InputStream, requestId: Option[RequestId]): Unit = + track(stream) + forkDiscard: + try drainSse(stream, _ => ()) + catch case e: Exception => if !closing.get() then log.warn(s"SSE drain error: ${e.getMessage}") + finally + requestId.foreach { id => + val _ = pending.complete(id, sseEnded(id)) + } + untrack(stream) + + private def routeMessage(msg: JSONRPCMessage): Unit = msg match + case response: JSONRPCMessage.Response => val _ = pending.complete(response.id, response) + case err: JSONRPCMessage.Error => val _ = pending.complete(err.id, err) + case other => incoming.get()(other) + + private[ox] def startGetListener(): Unit = forkDiscard(getListenerLoop()) + + private def getListenerLoop(): Unit = + sessionReady.await() + var attempt = 0 + var continue = !closing.get() + while continue do + openGetSseStream(lastEventId.get()) match + case None => continue = false + case Some(stream) => + attempt = 0 + try drainSse(stream, id => lastEventId.set(Some(id))) + catch case e: Exception => if !closing.get() then log.warn(s"GET SSE listener error: ${e.getMessage}") + finally untrack(stream) + if closing.get() then continue = false + else + attempt += 1 + sleep(reconnectDelay(attempt)) + + private def openGetSseStream(lastEvent: Option[String]): Option[InputStream] = + val base = basicRequest + .get(uri) + .header("Accept", MediaType.TextEventStream.toString) + .header("MCP-Protocol-Version", protocolVersion.name) + .response(asInputStreamUnsafe) + val withSession = sessionId.get().fold(base)(s => base.header("Mcp-Session-Id", s)) + val withLastEvent = lastEvent.fold(withSession)(id => withSession.header("Last-Event-ID", id)) + val response = withLastEvent.send(backend) + response.code match + case StatusCode.Ok => + response.body match + case Right(stream) => track(stream); Some(stream) + case Left(err) => log.warn(s"GET SSE stream returned non-stream body: $err"); None + case StatusCode.MethodNotAllowed => + drainBody(response) + log.info("Server does not support GET SSE stream") + None + case other => + drainBody(response) + log.warn(s"GET SSE stream returned HTTP ${other.code}; not reconnecting") + None + + private def drainSse(stream: InputStream, onEventId: String => Unit): Unit = + val reader = BufferedReader(InputStreamReader(stream, StandardCharsets.UTF_8)) + var buffered = List.empty[String] + var line = reader.readLine() + while line != null do + if line.isEmpty then + if buffered.nonEmpty then + dispatchEvent(ServerSentEvent.parse(buffered.reverse), onEventId) + buffered = Nil + else buffered = line :: buffered + line = reader.readLine() + if buffered.nonEmpty then dispatchEvent(ServerSentEvent.parse(buffered.reverse), onEventId) + + private def dispatchEvent(event: ServerSentEvent, onEventId: String => Unit): Unit = + event.id.filter(_.nonEmpty).foreach(onEventId) + event.data + .filter(_.nonEmpty) + .foreach: data => + ClientTransport.decode(data) match + case Right(message) => routeMessage(message) + case Left(_) => () + + private def track(stream: InputStream): Unit = { val _ = openStreams.add(stream) } + + private def untrack(stream: InputStream): Unit = + if openStreams.remove(stream) then closeQuietly(stream) + + private def closeQuietly(stream: InputStream): Unit = + try stream.close() + catch case _: Exception => () + + private def reconnectDelay(attempt: Int): FiniteDuration = + math.min(100L * (1L << math.min(attempt - 1, 8)), 30000L).millis + + private def cancelled(id: RequestId): JSONRPCMessage.Error = + JSONRPCMessage.Error(id = id, error = JSONRPCErrorObject(code = JSONRPCErrorCodes.InvocationError.code, message = "Request cancelled")) + + private def sseEnded(id: RequestId): JSONRPCMessage.Error = + JSONRPCMessage.Error( + id = id, + error = JSONRPCErrorObject(code = JSONRPCErrorCodes.InvocationError.code, message = "SSE stream ended before response") + ) + +object OxClientHttpTransport: + + def apply( + backend: SyncBackend, + uri: Uri, + protocolVersion: ProtocolVersion = ProtocolVersion.Latest, + timeout: FiniteDuration = ClientTransport.defaultTimeout + )(using Ox): OxClientHttpTransport = + val transport = new OxClientHttpTransport( + backend, + uri, + protocolVersion, + timeout, + summon[Ox], + AtomicReference[Option[String]](None), + SyncPendingRequests(), + AtomicReference[JSONRPCMessage => Unit](_ => ()), + AtomicReference[Option[String]](None), + AtomicBoolean(false), + CountDownLatch(1), + ConcurrentHashMap.newKeySet[InputStream]() + ) + transport.startGetListener() + transport diff --git a/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientStdioTransport.scala b/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientStdioTransport.scala new file mode 100644 index 0000000..61939e7 --- /dev/null +++ b/client-streaming/client-ox/src/main/scala/chimp/client/transport/ox/OxClientStdioTransport.scala @@ -0,0 +1,122 @@ +package chimp.client.transport.ox + +import chimp.client.McpTransportException +import chimp.client.internal.SyncPendingRequests +import chimp.client.transport.{ClientStreamingStdioTransport, ClientTransport} +import chimp.protocol.JSONRPCMessage +import org.slf4j.LoggerFactory +import ox.* +import sttp.monad.{IdentityMonad, MonadError} +import sttp.shared.Identity + +import java.io.{BufferedReader, BufferedWriter, File, InputStreamReader, OutputStreamWriter} +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.* + +final class OxClientStdioTransport private ( + command: List[String], + env: Map[String, String], + workDir: Option[File], + timeout: FiniteDuration, + process: Process, + pending: SyncPendingRequests, + incoming: AtomicReference[JSONRPCMessage => Unit], + closed: AtomicBoolean +) extends ClientStreamingStdioTransport[Identity](command, env, workDir, timeout): + + private val log = LoggerFactory.getLogger(classOf[OxClientStdioTransport]) + + given monad: MonadError[Identity] = IdentityMonad + + private val writer = BufferedWriter(OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8)) + + override def send(msg: JSONRPCMessage): Identity[Option[JSONRPCMessage]] = + if closed.get() then throw McpTransportException("Stdio transport is closed") + msg match + case request: JSONRPCMessage.Request => + val await = pending.register(request.id, timeout) + writeLine(request) + Some(await()) + case other => + writeLine(other) + None + + override def onIncoming(handler: JSONRPCMessage => Identity[Unit]): Identity[Unit] = incoming.set(handler) + + override def close(): Identity[Unit] = + if closed.compareAndSet(false, true) then + try writer.close() + catch case _: Exception => () + if process.isAlive then + if !process.waitFor(2, TimeUnit.SECONDS) then process.destroy() + if !process.waitFor(2, TimeUnit.SECONDS) then { val _ = process.destroyForcibly() } + pending.closeAll("Transport closed") + + private def writeLine(msg: JSONRPCMessage): Unit = + writer.synchronized: + writer.write(ClientTransport.encode(msg)) + writer.newLine() + writer.flush() + + private def dispatch(msg: JSONRPCMessage): Unit = msg match + case response: JSONRPCMessage.Response => val _ = pending.complete(response.id, response) + case err: JSONRPCMessage.Error => val _ = pending.complete(err.id, err) + case other => incoming.get()(other) + + private def readLoop(reader: BufferedReader): Unit = + try + var line = reader.readLine() + while line != null do + if line.nonEmpty then + ClientTransport.decode(line) match + case Right(msg) => dispatch(msg) + case Left(e) => log.warn(s"Failed to parse JSON-RPC line: ${e.getMessage}; raw: $line") + line = reader.readLine() + catch case e: Exception => if !closed.get() then log.warn(s"Reader loop ended: ${e.getMessage}") + finally pending.closeAll("Transport closed") + + private def drainStderr(errReader: BufferedReader): Unit = + try + var line = errReader.readLine() + while line != null do + log.info(s"stdio-server: $line") + line = errReader.readLine() + catch case _: Exception => () + +object OxClientStdioTransport: + + def apply( + command: List[String], + env: Map[String, String] = Map.empty, + workDir: Option[File] = None, + timeout: FiniteDuration = ClientTransport.defaultTimeout + )(using Ox): OxClientStdioTransport = + val pb = ProcessBuilder(command.asJava) + workDir.foreach(pb.directory) + if env.nonEmpty then + val procEnv = pb.environment() + env.foreach { (k, v) => + val _ = procEnv.put(k, v) + } + pb.redirectErrorStream(false) + val process = pb.start() + + val transport = new OxClientStdioTransport( + command, + env, + workDir, + timeout, + process, + SyncPendingRequests(), + AtomicReference[JSONRPCMessage => Unit](_ => ()), + AtomicBoolean(false) + ) + + val reader = BufferedReader(InputStreamReader(process.getInputStream, StandardCharsets.UTF_8)) + val errReader = BufferedReader(InputStreamReader(process.getErrorStream, StandardCharsets.UTF_8)) + forkDiscard(transport.readLoop(reader)) + forkDiscard(transport.drainStderr(errReader)) + transport diff --git a/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientHttpIntegrationSpec.scala b/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientHttpIntegrationSpec.scala new file mode 100644 index 0000000..2edca24 --- /dev/null +++ b/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientHttpIntegrationSpec.scala @@ -0,0 +1,25 @@ +package chimp.client.transport.ox + +import chimp.client.integration.{McpClientStreamingHttpIntegrationSpec, SyncToFuture} +import chimp.client.transport.ClientBidirectionalTransport +import ox.supervised +import sttp.client4.{DefaultSyncBackend, SyncBackend} +import sttp.model.Uri +import sttp.shared.Identity + +import scala.concurrent.duration.FiniteDuration + +class OxMcpClientHttpIntegrationSpec extends McpClientStreamingHttpIntegrationSpec[Identity, SyncBackend] with SyncToFuture: + + override def usingBackend[A](use: SyncBackend => Identity[A]): Identity[A] = + val backend = DefaultSyncBackend() + try use(backend) + finally backend.close() + + override def usingBidirectionalTransport[A](b: SyncBackend, uri: Uri, timeout: FiniteDuration)( + use: ClientBidirectionalTransport[Identity] => Identity[A] + ): Identity[A] = + supervised: + val transport = OxClientHttpTransport(b, uri, timeout = timeout) + try use(transport) + finally transport.close() diff --git a/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientStdioIntegrationSpec.scala b/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientStdioIntegrationSpec.scala new file mode 100644 index 0000000..df88ed4 --- /dev/null +++ b/client-streaming/client-ox/src/test/scala/chimp/client/transport/ox/OxMcpClientStdioIntegrationSpec.scala @@ -0,0 +1,18 @@ +package chimp.client.transport.ox + +import chimp.client.integration.{McpClientStdioIntegrationSpec, SyncToFuture} +import chimp.client.transport.ClientBidirectionalTransport +import ox.supervised +import sttp.shared.Identity + +import scala.concurrent.duration.FiniteDuration + +class OxMcpClientStdioIntegrationSpec extends McpClientStdioIntegrationSpec[Identity] with SyncToFuture: + + override def usingTransport[A](command: List[String], timeout: FiniteDuration)( + use: ClientBidirectionalTransport[Identity] => Identity[A] + ): Identity[A] = + supervised: + val transport = OxClientStdioTransport(command, timeout = timeout) + try use(transport) + finally transport.close() diff --git a/docs/client/capabilities.md b/docs/client/capabilities.md index f9e28d3..bede01f 100644 --- a/docs/client/capabilities.md +++ b/docs/client/capabilities.md @@ -9,7 +9,7 @@ Beyond calling tools, an MCP client can advertise capabilities that let the serv - [Notifications](https://modelcontextprotocol.io/specification/2025-11-25/basic/index#notifications) — receiving server-pushed events such as resource updates and list changes. ```{note} -All of these require the server to push messages to the client, so they only work over a **bidirectional, streaming transport** (e.g. `ZioClientHttpTransport`). They are unavailable on the plain `ClientHttpTransport`. +All of these require the server to push messages to the client, so they only work over a **bidirectional, streaming transport** (e.g. `ZioClientHttpTransport` or the direct-style `OxClientHttpTransport`). They are unavailable on the plain `ClientHttpTransport`. ``` Create the client with `McpClient.bidirectional`, providing a handler for each capability you want to enable — only capabilities backed by a handler are advertised to the server: diff --git a/docs/client/examples.md b/docs/client/examples.md index e853e94..71818ef 100644 --- a/docs/client/examples.md +++ b/docs/client/examples.md @@ -78,4 +78,37 @@ object RootsClient extends ZIOAppDefault: } ``` +## Bidirectional client over an Ox streaming transport + +The same bidirectional client in direct style, using `OxClientHttpTransport`. Capabilities such as [roots](https://modelcontextprotocol.io/specification/2025-11-25/client/roots) are advertised by passing a handler that the server invokes during the session. The transport's background SSE listener runs as a fork in the surrounding `supervised` scope, so the transport is created and used inside `supervised`: + +```scala mdoc:compile-only +import chimp.client.* +import chimp.client.notifications.ServerNotification +import chimp.client.transport.ox.OxClientHttpTransport +import chimp.protocol.* +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity + +object BidirectionalOxClient: + def main(args: Array[String]): Unit = + supervised: + val backend = DefaultSyncBackend() + val transport = OxClientHttpTransport(backend, uri"http://localhost:8080/mcp") + val client = McpClient.bidirectional[Identity]( + transport, + clientInfo = Implementation("my-client", "0.1.0"), + // the server calls back into this handler when it needs the client's roots + rootsHandler = Some(() => ListRootsResult(roots = List(Root("file:///workspace", Some("workspace"))))) + ) + // react to server-pushed notifications delivered over the SSE stream + client.onServerNotification: + case ServerNotification.ResourceUpdated(params) => println(s"resource changed: ${params.uri}") + case _ => () + client.close() + backend.close() +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/docs/client/quickstart.md b/docs/client/quickstart.md index da2cf0e..cdca72b 100644 --- a/docs/client/quickstart.md +++ b/docs/client/quickstart.md @@ -34,8 +34,14 @@ object QuickstartClient: backend.close() ``` -For streaming transports (e.g. ZIO), also add: +For streaming transports, also add the dependency for your effect system — ZIO: ```scala libraryDependencies += "com.softwaremill.chimp" %% "chimp-client-zio" % "0.3.0" ``` + +or direct-style Ox: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-client-ox" % "0.3.0" +``` diff --git a/docs/client/transport.md b/docs/client/transport.md index c1f8b0c..a04b636 100644 --- a/docs/client/transport.md +++ b/docs/client/transport.md @@ -5,7 +5,7 @@ A transport carries JSON-RPC messages between the client and the server. There a - **Unidirectional** (`ClientTransport[F]`) — the client sends a message and optionally gets a response back. Enough for calling tools, listing resources, etc. - **Bidirectional** (`ClientBidirectionalTransport[F]`) — additionally lets the server push messages to the client (server-initiated requests and notifications). Required for [client capabilities](capabilities.md). -The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO). +The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO and Ox). ```{mermaid} classDiagram @@ -41,6 +41,9 @@ The streaming transports have concrete implementations per effect system, in sep | Integration | Streaming HTTP | STDIO | |---|---|---| | ZIO | `ZioClientHttpTransport` | `ZioClientStdioTransport` | +| Ox (direct style) | `OxClientHttpTransport` | `OxClientStdioTransport` | + +The Ox implementations are direct-style (`F = Identity`). As sttp has no `StreamBackend` for Ox streams, `OxClientHttpTransport` extends `ClientBidirectionalTransport` directly: it runs on a plain `SyncBackend` and consumes Server-Sent Event responses by reading the response body as an `InputStream`, draining it on Ox forks. ## Backends diff --git a/docs/server/examples.md b/docs/server/examples.md index 10c4451..bc5ba45 100644 --- a/docs/server/examples.md +++ b/docs/server/examples.md @@ -1,6 +1,6 @@ # Examples -Each example builds an `McpServer` (or `StreamingMcpServer`) and serves it over a transport. The sync HTTP example uses `chimp-server`; the ZIO examples additionally use `chimp-server-zio`. +Each example builds an `McpServer` (or `StreamingMcpServer`) and serves it over a transport. The sync HTTP example uses `chimp-server`; the ZIO examples additionally use `chimp-server-zio`, and the direct-style Ox examples use `chimp-server-ox`. ## HTTP server @@ -88,4 +88,50 @@ object StdioZioServer extends ZIOAppDefault: override def run = ZioServerStdioTransport().serve(server) ``` +## Streaming HTTP server (Ox) + +The same streaming server in direct style, served with `OxServerHttpTransport` on `tapir-netty-server-sync`: + +```scala mdoc:compile-only +import chimp.server.{StreamingMcpServer, ToolResult, tool} +import chimp.server.ox.OxServerHttpTransport +import chimp.protocol.LoggingLevel +import io.circe.{Codec, Json} +import sttp.shared.Identity +import sttp.tapir.* +import sttp.tapir.server.netty.sync.NettySyncServer + +case class OxProgressInput(steps: Int) derives Codec, Schema + +object StreamingOxServer: + def main(args: Array[String]): Unit = + val work = tool("work").input[OxProgressInput].streamingServerLogic[Identity]: (_, ctx, _) => + ctx.reportProgress(0.5, total = Some(1.0)) + ctx.log(LoggingLevel.Info, Json.fromString("halfway")) + ToolResult.text("done") + val server = StreamingMcpServer[Identity]().withLoggingLevel(_ => ()).addStreamingTool(work) + val endpoint = OxServerHttpTransport(List("mcp")).serve(server) + NettySyncServer().port(8080).addEndpoint(endpoint).startAndWait() +``` + +## STDIO server (Ox) + +A direct-style server exchanging line-delimited JSON-RPC over stdin/stdout, served with `OxServerStdioTransport`: + +```scala mdoc:compile-only +import chimp.server.{StreamingMcpServer, ToolResult, tool} +import chimp.server.ox.OxServerStdioTransport +import io.circe.Codec +import sttp.shared.Identity +import sttp.tapir.* + +case class OxEchoInput(message: String) derives Codec, Schema + +object StdioOxServer: + def main(args: Array[String]): Unit = + val echo = tool("echo").input[OxEchoInput].handle(in => ToolResult.text(in.message)) + val server = StreamingMcpServer[Identity]().addTool(echo) + OxServerStdioTransport().serve(server) +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/docs/server/quickstart.md b/docs/server/quickstart.md index c1532da..b58b244 100644 --- a/docs/server/quickstart.md +++ b/docs/server/quickstart.md @@ -36,4 +36,16 @@ case class AdderInput(a: Int, b: Int) derives io.circe.Codec, Schema NettySyncServer().port(8080).addEndpoint(mcpServerEndpoint).startAndWait() ``` +For a streaming server that pushes progress and log notifications over SSE, add the dependency for your effect system — ZIO: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-server-zio" % "0.3.0" +``` + +or direct-style Ox: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-server-ox" % "0.3.0" +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/docs/server/transport.md b/docs/server/transport.md index 0931154..b97317f 100644 --- a/docs/server/transport.md +++ b/docs/server/transport.md @@ -5,7 +5,7 @@ A transport exposes an `McpServer` over a particular medium. `serve(server)` pro - **Unidirectional** (`ServerTransport[F, A]`) — request/response only. Enough for tools, resources, prompts, completion. - **Bidirectional** (`StreamingServerTransport[F, A]`) — additionally lets the server push messages to the client (progress and logging notifications). Required for [streaming server capabilities](capabilities.md). -The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO). +The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO and Ox). ```{mermaid} classDiagram @@ -42,6 +42,9 @@ The streaming transports have concrete implementations per effect system, in sep | Integration | Streaming HTTP | STDIO | |---|---|---| | ZIO | `ZioServerHttpTransport` | `ZioServerStdioTransport` | +| Ox (direct style) | `OxServerHttpTransport` | `OxServerStdioTransport` | + +The Ox implementations are direct-style (`F = Identity`); `OxServerHttpTransport` serves Server-Sent Events with `tapir-netty-server-sync` and its `OxStreams` capability. ## Medium diff --git a/examples/src/main/scala/examples/both/serverAndClient.scala b/examples/src/main/scala/examples/both/serverAndClient.scala new file mode 100644 index 0000000..3d40c8f --- /dev/null +++ b/examples/src/main/scala/examples/both/serverAndClient.scala @@ -0,0 +1,39 @@ +//> using dep com.softwaremill.chimp::chimp-server:0.3.0 +//> using dep com.softwaremill.chimp::chimp-client:0.3.0 +//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-sync:1.13.19 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.both + +import chimp.client.McpClient +import chimp.client.transport.ClientHttpTransport +import chimp.protocol.{Implementation, ToolContent} +import chimp.server.{tool, McpServer, ToolResult} +import io.circe.{Codec, Json} +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity +import sttp.tapir.Schema +import sttp.tapir.server.netty.sync.NettySyncServer + +case class AddInput(a: Int, b: Int) derives Codec, Schema + +// Runs a plain (non-streaming) MCP server and a synchronous client in one process: the client lists +// the server's tools and calls one over a simple request/response HTTP exchange. +@main def serverAndClient(): Unit = + supervised: + val adder = tool("adder").description("Adds two numbers").input[AddInput].handle(in => ToolResult.text(s"${in.a + in.b}")) + val binding = NettySyncServer().port(0).addEndpoint(McpServer(tools = List(adder)).endpoint(List("mcp"))).start() + try + val backend = DefaultSyncBackend() + try + val transport = ClientHttpTransport[Identity](backend, uri"http://localhost:${binding.port}/mcp") + val client = McpClient[Identity](transport, Implementation("both-client", "0.1.0")) + val tools = client.listTools() + println(s"tools: ${tools.tools.map(_.name).mkString(", ")}") + val result = client.callTool("adder", Json.obj("a" -> Json.fromInt(2), "b" -> Json.fromInt(3))) + result.content.collect { case ToolContent.Text(_, text) => text }.foreach(text => println(s"2 + 3 = $text")) + client.close() + finally backend.close() + finally binding.stop() diff --git a/examples/src/main/scala/examples/both/streamingServerAndClient.scala b/examples/src/main/scala/examples/both/streamingServerAndClient.scala new file mode 100644 index 0000000..e090bd6 --- /dev/null +++ b/examples/src/main/scala/examples/both/streamingServerAndClient.scala @@ -0,0 +1,49 @@ +//> using dep com.softwaremill.chimp::chimp-server-ox:0.3.0 +//> using dep com.softwaremill.chimp::chimp-client-ox:0.3.0 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.both + +import chimp.client.McpClient +import chimp.client.notifications.ServerNotification +import chimp.client.transport.ox.OxClientHttpTransport +import chimp.protocol.{Implementation, LoggingLevel, ToolContent} +import chimp.server.ox.OxServerHttpTransport +import chimp.server.{tool, StreamingMcpServer, ToolResult} +import io.circe.{Codec, Json} +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity +import sttp.tapir.Schema +import sttp.tapir.server.netty.sync.NettySyncServer + +case class NoInput() derives Codec, Schema + +// Runs an Ox streaming MCP server and an Ox bidirectional client in one process: the client calls +// the `noisy` tool and receives the log notifications the tool emits over SSE while it runs. +@main def streamingServerAndClient(): Unit = + supervised: + val noisy = tool("noisy") + .description("Logs three messages, then returns") + .input[NoInput] + .streamingServerLogic[Identity]: (_, ctx, _) => + ctx.log(LoggingLevel.Info, Json.fromString("one")) + ctx.log(LoggingLevel.Info, Json.fromString("two")) + ctx.log(LoggingLevel.Info, Json.fromString("three")) + ToolResult.text("done") + val server = StreamingMcpServer[Identity]().withLoggingLevel(_ => ()).addStreamingTool(noisy) + val binding = NettySyncServer().port(0).addEndpoint(OxServerHttpTransport(List("mcp")).serve(server)).start() + try + val backend = DefaultSyncBackend() + try + val transport = OxClientHttpTransport(backend, uri"http://localhost:${binding.port}/mcp") + val client = McpClient.bidirectional[Identity](transport, Implementation("both-client", "0.1.0")) + client.onServerNotification: + case ServerNotification.LoggingMessage(params) => println(s"notification: ${params.data}") + case _ => () + val result = client.callTool("noisy", Json.obj()) + result.content.collect { case ToolContent.Text(_, text) => text }.foreach(text => println(s"result: $text")) + client.close() + finally backend.close() + finally binding.stop() diff --git a/examples/src/main/scala/examples/client/bidirectionalClientOx.scala b/examples/src/main/scala/examples/client/bidirectionalClientOx.scala new file mode 100644 index 0000000..ea1cd68 --- /dev/null +++ b/examples/src/main/scala/examples/client/bidirectionalClientOx.scala @@ -0,0 +1,26 @@ +//> using dep com.softwaremill.chimp::chimp-client-ox:0.3.0 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.client + +import chimp.client.* +import chimp.client.notifications.ServerNotification +import chimp.client.transport.ox.OxClientHttpTransport +import chimp.protocol.* +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity + +@main def bidirectionalOxClientApp(): Unit = + supervised: + val backend = DefaultSyncBackend() + val transport = OxClientHttpTransport(backend, uri"http://localhost:8080/mcp") + val client = McpClient.bidirectional[Identity](transport, Implementation("ox-client", "0.1.0")) + client.onServerNotification: + case ServerNotification.ResourceUpdated(params) => println(s"resource changed: ${params.uri}") + case _ => () + val tools = client.listTools() + println(s"server exposes ${tools.tools.size} tools") + client.close() + backend.close() diff --git a/examples/src/main/scala/examples/client/stdioClientOx.scala b/examples/src/main/scala/examples/client/stdioClientOx.scala new file mode 100644 index 0000000..3f40caa --- /dev/null +++ b/examples/src/main/scala/examples/client/stdioClientOx.scala @@ -0,0 +1,18 @@ +//> using dep com.softwaremill.chimp::chimp-client-ox:0.3.0 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.client + +import chimp.client.* +import chimp.client.transport.ox.OxClientStdioTransport +import chimp.protocol.* +import ox.supervised +import sttp.shared.Identity + +@main def stdioOxClientApp(): Unit = + supervised: + val transport = OxClientStdioTransport(List("npx", "-y", "@modelcontextprotocol/server-everything")) + val client = McpClient.bidirectional[Identity](transport, Implementation("ox-stdio-client", "0.1.0")) + val tools = client.listTools() + println(s"server exposes ${tools.tools.size} tools") + client.close() diff --git a/examples/src/main/scala/examples/server/stdioMcpOx.scala b/examples/src/main/scala/examples/server/stdioMcpOx.scala new file mode 100644 index 0000000..760b5df --- /dev/null +++ b/examples/src/main/scala/examples/server/stdioMcpOx.scala @@ -0,0 +1,17 @@ +//> using dep com.softwaremill.chimp::chimp-server-ox:0.3.0 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.server + +import chimp.server.ox.OxServerStdioTransport +import chimp.server.{tool, StreamingMcpServer, ToolResult} +import io.circe.Codec +import sttp.shared.Identity +import sttp.tapir.Schema + +case class StdioEchoInput(message: String) derives Codec, Schema + +@main def mcpStdioOxApp(): Unit = + val echo = tool("echo").description("Echoes the message").input[StdioEchoInput].handle(in => ToolResult.text(in.message)) + val server = StreamingMcpServer[Identity]().addTool(echo) + OxServerStdioTransport().serve(server) diff --git a/examples/src/main/scala/examples/server/streamingMcpOx.scala b/examples/src/main/scala/examples/server/streamingMcpOx.scala new file mode 100644 index 0000000..df479e3 --- /dev/null +++ b/examples/src/main/scala/examples/server/streamingMcpOx.scala @@ -0,0 +1,29 @@ +//> using dep com.softwaremill.chimp::chimp-server-ox:0.3.0 +//> using dep ch.qos.logback:logback-classic:1.5.20 + +package examples.server + +import chimp.protocol.LoggingLevel +import chimp.server.* +import chimp.server.ox.OxServerHttpTransport +import io.circe.{Codec, Json} +import sttp.shared.Identity +import sttp.tapir.* +import sttp.tapir.server.netty.sync.NettySyncServer + +case class WorkInput(steps: Int) derives Codec, Schema + +@main def mcpStreamingOxApp(): Unit = + val work = tool("work") + .description("Reports progress and logs while running") + .input[WorkInput] + .streamingServerLogic[Identity]: (in, ctx, _) => + for step <- 1 to in.steps do + ctx.reportProgress(step.toDouble / in.steps, total = Some(1.0)) + ctx.log(LoggingLevel.Info, Json.fromString(s"step $step of ${in.steps}")) + ToolResult.text("done") + + val server = StreamingMcpServer[Identity]().withLoggingLevel(_ => ()).addStreamingTool(work) + val endpoint = OxServerHttpTransport(List("mcp")).serve(server) + + NettySyncServer().port(8080).addEndpoint(endpoint).startAndWait() diff --git a/examples/src/main/scala/examples/server/weatherMcp.scala b/examples/src/main/scala/examples/server/weatherMcp.scala index dbfab4f..43a3bbe 100644 --- a/examples/src/main/scala/examples/server/weatherMcp.scala +++ b/examples/src/main/scala/examples/server/weatherMcp.scala @@ -8,8 +8,8 @@ package examples.server import chimp.server.* import io.circe.Codec import io.circe.parser.decode -import ox.either -import ox.either.ok +import _root_.ox.either +import _root_.ox.either.ok import sttp.client4.* import sttp.tapir.* import sttp.tapir.server.netty.sync.NettySyncServer diff --git a/generated-docs/out/README.md b/generated-docs/out/README.md index c5d6c44..071d95e 100644 --- a/generated-docs/out/README.md +++ b/generated-docs/out/README.md @@ -34,5 +34,5 @@ Commit both `docs/` (the source) and `generated-docs/` (the mdoc output) — if ## Notes -- `0.3.0+22-cb3e0e1c+20260625-0956-SNAPSHOT` and other mdoc variables are **not** substituted in the local watch mode. For a fully-rendered preview, run `sbt docs/mdoc` from the repo root and serve `generated-docs/out/` instead. +- `0.3.0+24-78bb6a39+20260626-1208-SNAPSHOT` and other mdoc variables are **not** substituted in the local watch mode. For a fully-rendered preview, run `sbt docs/mdoc` from the repo root and serve `generated-docs/out/` instead. - Scala code snippets are verified by `sbt compileDocs` (also runs in CI). diff --git a/generated-docs/out/client/capabilities.md b/generated-docs/out/client/capabilities.md index 2966673..74deb8d 100644 --- a/generated-docs/out/client/capabilities.md +++ b/generated-docs/out/client/capabilities.md @@ -9,7 +9,7 @@ Beyond calling tools, an MCP client can advertise capabilities that let the serv - [Notifications](https://modelcontextprotocol.io/specification/2025-11-25/basic/index#notifications) — receiving server-pushed events such as resource updates and list changes. ```{note} -All of these require the server to push messages to the client, so they only work over a **bidirectional, streaming transport** (e.g. `ZioClientHttpTransport`). They are unavailable on the plain `ClientHttpTransport`. +All of these require the server to push messages to the client, so they only work over a **bidirectional, streaming transport** (e.g. `ZioClientHttpTransport` or the direct-style `OxClientHttpTransport`). They are unavailable on the plain `ClientHttpTransport`. ``` Create the client with `McpClient.bidirectional`, providing a handler for each capability you want to enable — only capabilities backed by a handler are advertised to the server: diff --git a/generated-docs/out/client/examples.md b/generated-docs/out/client/examples.md index b44541b..6a8a21d 100644 --- a/generated-docs/out/client/examples.md +++ b/generated-docs/out/client/examples.md @@ -78,4 +78,37 @@ object RootsClient extends ZIOAppDefault: } ``` +## Bidirectional client over an Ox streaming transport + +The same bidirectional client in direct style, using `OxClientHttpTransport`. Capabilities such as [roots](https://modelcontextprotocol.io/specification/2025-11-25/client/roots) are advertised by passing a handler that the server invokes during the session. The transport's background SSE listener runs as a fork in the surrounding `supervised` scope, so the transport is created and used inside `supervised`: + +```scala +import chimp.client.* +import chimp.client.notifications.ServerNotification +import chimp.client.transport.ox.OxClientHttpTransport +import chimp.protocol.* +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity + +object BidirectionalOxClient: + def main(args: Array[String]): Unit = + supervised: + val backend = DefaultSyncBackend() + val transport = OxClientHttpTransport(backend, uri"http://localhost:8080/mcp") + val client = McpClient.bidirectional[Identity]( + transport, + clientInfo = Implementation("my-client", "0.1.0"), + // the server calls back into this handler when it needs the client's roots + rootsHandler = Some(() => ListRootsResult(roots = List(Root("file:///workspace", Some("workspace"))))) + ) + // react to server-pushed notifications delivered over the SSE stream + client.onServerNotification: + case ServerNotification.ResourceUpdated(params) => println(s"resource changed: ${params.uri}") + case _ => () + client.close() + backend.close() +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/generated-docs/out/client/quickstart.md b/generated-docs/out/client/quickstart.md index 0a82d8a..f28100b 100644 --- a/generated-docs/out/client/quickstart.md +++ b/generated-docs/out/client/quickstart.md @@ -34,8 +34,14 @@ object QuickstartClient: backend.close() ``` -For streaming transports (e.g. ZIO), also add: +For streaming transports, also add the dependency for your effect system — ZIO: ```scala libraryDependencies += "com.softwaremill.chimp" %% "chimp-client-zio" % "0.3.0" ``` + +or direct-style Ox: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-client-ox" % "0.3.0" +``` diff --git a/generated-docs/out/client/transport.md b/generated-docs/out/client/transport.md index c1f8b0c..a04b636 100644 --- a/generated-docs/out/client/transport.md +++ b/generated-docs/out/client/transport.md @@ -5,7 +5,7 @@ A transport carries JSON-RPC messages between the client and the server. There a - **Unidirectional** (`ClientTransport[F]`) — the client sends a message and optionally gets a response back. Enough for calling tools, listing resources, etc. - **Bidirectional** (`ClientBidirectionalTransport[F]`) — additionally lets the server push messages to the client (server-initiated requests and notifications). Required for [client capabilities](capabilities.md). -The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO). +The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO and Ox). ```{mermaid} classDiagram @@ -41,6 +41,9 @@ The streaming transports have concrete implementations per effect system, in sep | Integration | Streaming HTTP | STDIO | |---|---|---| | ZIO | `ZioClientHttpTransport` | `ZioClientStdioTransport` | +| Ox (direct style) | `OxClientHttpTransport` | `OxClientStdioTransport` | + +The Ox implementations are direct-style (`F = Identity`). As sttp has no `StreamBackend` for Ox streams, `OxClientHttpTransport` extends `ClientBidirectionalTransport` directly: it runs on a plain `SyncBackend` and consumes Server-Sent Event responses by reading the response body as an `InputStream`, draining it on Ox forks. ## Backends diff --git a/generated-docs/out/conf.py b/generated-docs/out/conf.py index bcdedda..f7ec520 100644 --- a/generated-docs/out/conf.py +++ b/generated-docs/out/conf.py @@ -51,9 +51,9 @@ # built documents. # # The short X.Y version. -version = u'0.1' +version = u'0.3' # The full version, including alpha/beta/rc tags. -release = u'0.1' +release = u'0.3.0' # The language for content autogenerated by Sphinx. language = 'en' diff --git a/generated-docs/out/server/examples.md b/generated-docs/out/server/examples.md index d662bef..28cbba2 100644 --- a/generated-docs/out/server/examples.md +++ b/generated-docs/out/server/examples.md @@ -1,6 +1,6 @@ # Examples -Each example builds an `McpServer` (or `StreamingMcpServer`) and serves it over a transport. The sync HTTP example uses `chimp-server`; the ZIO examples additionally use `chimp-server-zio`. +Each example builds an `McpServer` (or `StreamingMcpServer`) and serves it over a transport. The sync HTTP example uses `chimp-server`; the ZIO examples additionally use `chimp-server-zio`, and the direct-style Ox examples use `chimp-server-ox`. ## HTTP server @@ -88,4 +88,50 @@ object StdioZioServer extends ZIOAppDefault: override def run = ZioServerStdioTransport().serve(server) ``` +## Streaming HTTP server (Ox) + +The same streaming server in direct style, served with `OxServerHttpTransport` on `tapir-netty-server-sync`: + +```scala +import chimp.server.{StreamingMcpServer, ToolResult, tool} +import chimp.server.ox.OxServerHttpTransport +import chimp.protocol.LoggingLevel +import io.circe.{Codec, Json} +import sttp.shared.Identity +import sttp.tapir.* +import sttp.tapir.server.netty.sync.NettySyncServer + +case class OxProgressInput(steps: Int) derives Codec, Schema + +object StreamingOxServer: + def main(args: Array[String]): Unit = + val work = tool("work").input[OxProgressInput].streamingServerLogic[Identity]: (_, ctx, _) => + ctx.reportProgress(0.5, total = Some(1.0)) + ctx.log(LoggingLevel.Info, Json.fromString("halfway")) + ToolResult.text("done") + val server = StreamingMcpServer[Identity]().withLoggingLevel(_ => ()).addStreamingTool(work) + val endpoint = OxServerHttpTransport(List("mcp")).serve(server) + NettySyncServer().port(8080).addEndpoint(endpoint).startAndWait() +``` + +## STDIO server (Ox) + +A direct-style server exchanging line-delimited JSON-RPC over stdin/stdout, served with `OxServerStdioTransport`: + +```scala +import chimp.server.{StreamingMcpServer, ToolResult, tool} +import chimp.server.ox.OxServerStdioTransport +import io.circe.Codec +import sttp.shared.Identity +import sttp.tapir.* + +case class OxEchoInput(message: String) derives Codec, Schema + +object StdioOxServer: + def main(args: Array[String]): Unit = + val echo = tool("echo").input[OxEchoInput].handle(in => ToolResult.text(in.message)) + val server = StreamingMcpServer[Identity]().addTool(echo) + OxServerStdioTransport().serve(server) +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/generated-docs/out/server/quickstart.md b/generated-docs/out/server/quickstart.md index c1532da..b58b244 100644 --- a/generated-docs/out/server/quickstart.md +++ b/generated-docs/out/server/quickstart.md @@ -36,4 +36,16 @@ case class AdderInput(a: Int, b: Int) derives io.circe.Codec, Schema NettySyncServer().port(8080).addEndpoint(mcpServerEndpoint).startAndWait() ``` +For a streaming server that pushes progress and log notifications over SSE, add the dependency for your effect system — ZIO: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-server-zio" % "0.3.0" +``` + +or direct-style Ox: + +```scala +libraryDependencies += "com.softwaremill.chimp" %% "chimp-server-ox" % "0.3.0" +``` + More runnable examples live in [`examples/`](https://github.com/softwaremill/chimp/tree/master/examples/src/main/scala/examples). diff --git a/generated-docs/out/server/transport.md b/generated-docs/out/server/transport.md index 0931154..b97317f 100644 --- a/generated-docs/out/server/transport.md +++ b/generated-docs/out/server/transport.md @@ -5,7 +5,7 @@ A transport exposes an `McpServer` over a particular medium. `serve(server)` pro - **Unidirectional** (`ServerTransport[F, A]`) — request/response only. Enough for tools, resources, prompts, completion. - **Bidirectional** (`StreamingServerTransport[F, A]`) — additionally lets the server push messages to the client (progress and logging notifications). Required for [streaming server capabilities](capabilities.md). -The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO). +The streaming transports are abstract; their concrete, effect-specific implementations live in separate modules (e.g. ZIO and Ox). ```{mermaid} classDiagram @@ -42,6 +42,9 @@ The streaming transports have concrete implementations per effect system, in sep | Integration | Streaming HTTP | STDIO | |---|---|---| | ZIO | `ZioServerHttpTransport` | `ZioServerStdioTransport` | +| Ox (direct style) | `OxServerHttpTransport` | `OxServerStdioTransport` | + +The Ox implementations are direct-style (`F = Identity`); `OxServerHttpTransport` serves Server-Sent Events with `tapir-netty-server-sync` and its `OxStreams` capability. ## Medium diff --git a/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerHttpTransport.scala b/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerHttpTransport.scala new file mode 100644 index 0000000..317b082 --- /dev/null +++ b/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerHttpTransport.scala @@ -0,0 +1,35 @@ +package chimp.server.ox + +import chimp.protocol.JSONRPCMessage +import chimp.server.OutboundSink +import chimp.server.transport.ServerStreamingHttpTransport +import io.circe.Json +import io.circe.syntax.* +import ox.* +import ox.Chunk +import ox.channels.Channel +import ox.flow.Flow +import sttp.model.sse.ServerSentEvent +import sttp.shared.Identity +import sttp.tapir.StreamBodyIO +import sttp.tapir.server.netty.sync.{serverSentEventsBody, OxStreams} + +final class OxServerHttpTransport(path: List[String]) extends ServerStreamingHttpTransport[Identity, OxStreams](path): + val streams: OxStreams = OxStreams + + type EventStream = Flow[ServerSentEvent] + + val sseBody: StreamBodyIO[Flow[Chunk[Byte]], EventStream, OxStreams] = serverSentEventsBody + + val emptyStream: EventStream = Flow.empty + + def eventStream(handle: OutboundSink[Identity] => Option[Json]): Flow[ServerSentEvent] = + Flow.usingEmit: emit => + supervised: + val outbound = Channel.buffered[Json](64) + val sink = new OutboundSink[Identity]: + def send(message: JSONRPCMessage): Unit = outbound.send(message.asJson.deepDropNullValues) + forkDiscard: + try handle(sink).foreach(outbound.send) + finally outbound.done() + outbound.foreach(json => emit(ServerSentEvent(data = Some(json.noSpaces)))) diff --git a/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerStdioTransport.scala b/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerStdioTransport.scala new file mode 100644 index 0000000..eae8f61 --- /dev/null +++ b/server-streaming/server-ox/src/main/scala/chimp/server/ox/OxServerStdioTransport.scala @@ -0,0 +1,50 @@ +package chimp.server.ox + +import chimp.protocol.{JSONRPCMessage, ProgressToken} +import chimp.server.transport.ServerStreamingStdioTransport +import chimp.server.{McpHandler, OutboundSink, SinkStreamingServerContext, StreamingMcpServer, StreamingServerContext} +import io.circe.syntax.* +import io.circe.{parser, Json} +import org.slf4j.LoggerFactory +import ox.* +import ox.channels.Channel +import sttp.monad.{IdentityMonad, MonadError} +import sttp.shared.Identity + +import java.io.{BufferedReader, BufferedWriter, InputStream, InputStreamReader, OutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets + +final class OxServerStdioTransport(in: InputStream = System.in, out: OutputStream = System.out) + extends ServerStreamingStdioTransport[Identity]: + private val log = LoggerFactory.getLogger(classOf[OxServerStdioTransport]) + + def serve(server: StreamingMcpServer[Identity]): Unit = + given MonadError[Identity] = IdentityMonad + val handler = new McpHandler[Identity, StreamingServerContext[Identity]](server) + val reader = BufferedReader(InputStreamReader(in, StandardCharsets.UTF_8)) + val writer = BufferedWriter(OutputStreamWriter(out, StandardCharsets.UTF_8)) + + def writeLine(json: Json): Unit = + writer.write(json.noSpaces) + writer.newLine() + writer.flush() + + supervised: + val outbound = Channel.buffered[Json](64) + val writerFork = fork(outbound.foreach(writeLine)) + + val sink = new OutboundSink[Identity]: + def send(message: JSONRPCMessage): Unit = outbound.send(message.asJson.deepDropNullValues) + val makeContext: Option[ProgressToken] => StreamingServerContext[Identity] = + token => SinkStreamingServerContext(sink, token) + + var line = reader.readLine() + while line != null do + if line.nonEmpty then + parser.parse(line) match + case Right(json) => handler.handleJsonRpc(json, Nil, makeContext).body.foreach(outbound.send) + case Left(error) => log.warn(s"Failed to parse JSON-RPC line: ${error.getMessage}; raw: $line") + line = reader.readLine() + + outbound.done() + writerFork.join() diff --git a/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerHttpSpec.scala b/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerHttpSpec.scala new file mode 100644 index 0000000..de51b3b --- /dev/null +++ b/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerHttpSpec.scala @@ -0,0 +1,43 @@ +package chimp.server.ox + +import chimp.client.transport.ClientTransport +import chimp.client.transport.ox.OxClientHttpTransport +import chimp.client.{BidirectionalMcpClient, McpClient} +import chimp.protocol.{Implementation, ProtocolVersion} +import chimp.server.{McpServer, McpServerStreamingTests, McpServerTests, StreamingMcpServer, SyncToFuture} +import org.scalatest.Assertion +import ox.supervised +import sttp.client4.DefaultSyncBackend +import sttp.model.Uri.UriContext +import sttp.shared.Identity +import sttp.tapir.server.netty.sync.NettySyncServer + +import scala.concurrent.Future + +class OxMcpServerHttpSpec extends McpServerTests[Identity] with McpServerStreamingTests[Identity] with SyncToFuture: + private val clientInfo = Implementation("chimp-server-test", "0.0.1") + + override protected def withServer(server: McpServer[Identity])(test: McpClient[Identity] => Identity[Assertion]): Future[Assertion] = + withStreamingServer(server.streaming)(test) + + override protected def withStreamingServer( + server: StreamingMcpServer[Identity] + )(test: BidirectionalMcpClient[Identity] => Identity[Assertion]): Future[Assertion] = + toFuture: + supervised: + val endpoint = OxServerHttpTransport(List("mcp")).serve(server) + val binding = NettySyncServer().port(0).addEndpoint(endpoint).start() + try + val backend = DefaultSyncBackend() + try + val transport = + OxClientHttpTransport( + backend, + uri"http://localhost:${binding.port}/mcp", + ProtocolVersion.Latest, + ClientTransport.defaultTimeout + ) + try test(McpClient.bidirectional(transport, clientInfo)) + finally transport.close() + finally backend.close() + finally binding.stop() diff --git a/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerStdioSpec.scala b/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerStdioSpec.scala new file mode 100644 index 0000000..40471af --- /dev/null +++ b/server-streaming/server-ox/src/test/scala/chimp/server/ox/OxMcpServerStdioSpec.scala @@ -0,0 +1,12 @@ +package chimp.server.ox + +import chimp.server.{ServerStdioTransportTests, StreamingMcpServer, SyncToFuture} +import sttp.shared.Identity + +import java.io.{InputStream, OutputStream} + +class OxMcpServerStdioSpec extends ServerStdioTransportTests[Identity] with SyncToFuture: + override protected def runStdioServer(server: StreamingMcpServer[Identity], in: InputStream, out: OutputStream): Unit = + val thread = Thread(() => OxServerStdioTransport(in, out).serve(server)) + thread.setDaemon(true) + thread.start()