Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
89cea04
feat: concurrent message dispatch across all transports
ghostflyby May 29, 2026
0274fab
chore: revert a indent
ghostflyby May 30, 2026
2147ce3
fix: BaseTransportTest assertion order problem for readability
ghostflyby May 30, 2026
c3a337e
fix: error handling in StreamableHttpServerTransport
ghostflyby May 30, 2026
2267a15
review: address Copilot PR comments
ghostflyby May 30, 2026
320f9ab
review: remove StdioServerTransport scope.cancel, fix ChannelTranspor…
ghostflyby May 30, 2026
bed090e
review: remove unnecessary withTimeout from ChannelTransportTest
ghostflyby May 30, 2026
39dee98
Merge branch 'main' into concurrent
ghostflyby May 30, 2026
33ba1e3
review: fix handlerJobs thread-safety and memory leak, remove unused …
ghostflyby May 30, 2026
074cc32
review: fix exceptions in message handler test for concurrent dispatch
ghostflyby May 30, 2026
52477eb
review: address remaining Copilot comments - KDoc, NonCancellable, sp…
ghostflyby May 30, 2026
4f74f9e
fix: use Channel instead of mutableListOf for errors in ChannelTransp…
ghostflyby May 30, 2026
0d8999b
review: report call.reject failure via _onError instead of silent swa…
ghostflyby May 30, 2026
57e0cd2
feat: concurrent message dispatch on top of upstream/main lifecycle h…
ghostflyby Jun 1, 2026
39c1fcf
Merge branch 'concurrent-rebased' into concurrent
ghostflyby Jun 1, 2026
6c33d94
fix: use CompletableDeferred for error capture in StdioServerTranspor…
ghostflyby Jun 1, 2026
e43dc77
style: apply ktlint formatting
ghostflyby Jun 1, 2026
8b3a51e
fix: address Copilot review - Channel capacity and closeResources chi…
ghostflyby Jun 1, 2026
f11c623
refactor: separate notification fire-and-forget from request await in…
ghostflyby Jun 1, 2026
78b7f21
fix: move handlerScope.cancel after stream cleanup; strengthen test a…
ghostflyby Jun 1, 2026
1783558
fix: revert ChannelTransport.closeResources to scope.cancel()+join() …
ghostflyby Jun 1, 2026
e7c671b
fix: split ChannelTransport into eventLoop scope and handlerScope to …
ghostflyby Jun 1, 2026
9cd31ad
Fix ChannelTransport startup message race
ghostflyby Jun 2, 2026
8e22f9d
fix: hardened channel close order on mulitplatforms
ghostflyby Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.modelcontextprotocol.kotlin.sdk.shared

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import io.kotest.matchers.shouldBe
import io.modelcontextprotocol.kotlin.sdk.types.InitializedNotification
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.types.PingRequest
import io.modelcontextprotocol.kotlin.sdk.types.toJSON
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.Channel
import kotlin.test.fail
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -46,14 +47,10 @@ abstract class BaseTransportTest {
InitializedNotification().toJSON(),
)

val readMessages = mutableListOf<JSONRPCMessage>()
val finished = CompletableDeferred<Unit>()
val chan = Channel<JSONRPCMessage>(messages.size)

transport.onMessage { message ->
readMessages.add(message)
if (message == messages.last()) {
finished.complete(Unit)
}
chan.send(message)
}
Comment thread
ghostflyby marked this conversation as resolved.

transport.start()
Expand All @@ -62,9 +59,13 @@ abstract class BaseTransportTest {
transport.send(message)
}

finished.await()
val readMessages = buildList {
repeat(messages.size) {
add(chan.receive())
}
}

messages shouldBe readMessages
readMessages shouldContainExactlyInAnyOrder messages

transport.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.util.concurrent.CopyOnWriteArrayList
Expand Down Expand Up @@ -215,7 +216,7 @@ abstract class AbstractResourceIntegrationTest : KotlinTestBase() {
val invalidUri = "test://nonexistent.txt"

val exception = assertThrows<McpException> {
runBlocking {
withContext(Dispatchers.Default) {
client.readResource(ReadResourceRequest(ReadResourceRequestParams(uri = invalidUri)))
}
Comment on lines +219 to 221
}
Comment thread
ghostflyby marked this conversation as resolved.
Comment thread
ghostflyby marked this conversation as resolved.
Comment thread
ghostflyby marked this conversation as resolved.
Comment thread
ghostflyby marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class SseClientTransport(
reconnectionTime = reconnectionTime,
block = requestBuilder,
)
scope = CoroutineScope(session.coroutineContext + SupervisorJob())
scope = CoroutineScope(session.coroutineContext + SupervisorJob(session.coroutineContext[Job]))

job = scope.launch(CoroutineName("SseMcpClientTransport.connect#${hashCode()}")) {
collectMessages()
Expand Down Expand Up @@ -163,17 +163,31 @@ public class SseClientTransport(
}
}

private suspend fun handleMessage(data: String) {
private fun handleMessage(data: String) {
try {
val message = McpJson.decodeFromString<JSONRPCMessage>(data)
_onMessage(message)
launchMessageHandler(message)
} catch (e: SerializationException) {
_onError(e)
}
}

private fun launchMessageHandler(message: JSONRPCMessage) {
scope.launch(CoroutineName("SseMcpClientTransport.message#${hashCode()}")) {
try {
_onMessage(message)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message" }
_onError(e)
}
}
}

override suspend fun closeResources() {
withContext(NonCancellable) {
invokeOnCloseCallback()
job?.cancel()
try {
if (::session.isInitialized) session.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public class StdioClientTransport @JvmOverloads public constructor(
.collect { event ->
when (event) {
is Event.JsonRpc -> {
handleJSONRPCMessage(event.message)
launchMessageHandler(event.message)
}

is Event.StderrEvent -> {
Expand Down Expand Up @@ -264,14 +264,16 @@ public class StdioClientTransport @JvmOverloads public constructor(
}
}

private suspend fun handleJSONRPCMessage(msg: JSONRPCMessage) {
try {
_onMessage.invoke(msg)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message." }
runCatching { _onError.invoke(e) }
private fun launchMessageHandler(message: JSONRPCMessage) {
scope.launch(CoroutineName("StdioClientTransport.message#${hashCode()}")) {
try {
_onMessage.invoke(message)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message." }
runCatching { _onError.invoke(e) }
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlin.math.pow
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
Expand Down Expand Up @@ -168,7 +169,7 @@ public class StreamableHttpClientTransport(
when (response.contentType()?.withoutParameters()) {
ContentType.Application.Json -> response.bodyAsText().takeIf { it.isNotEmpty() }?.let { json ->
runCatching { McpJson.decodeFromString<JSONRPCMessage>(json) }
.onSuccess { _onMessage(it) }
.onSuccess { launchMessageHandler(it) }
.onFailure {
_onError(it)
throw it
Expand Down Expand Up @@ -218,8 +219,11 @@ public class StreamableHttpClientTransport(

override suspend fun closeResources() {
logger.debug { "Client transport closing." }
sseJob?.cancelAndJoin()
scope.cancel()
withContext(NonCancellable) {
invokeOnCloseCallback()
sseJob?.cancel()
scope.coroutineContext[Job]?.cancelAndJoin()
}
}
Comment thread
ghostflyby marked this conversation as resolved.

/**
Expand Down Expand Up @@ -392,9 +396,9 @@ public class StreamableHttpClientTransport(
.onSuccess { msg ->
if (msg is JSONRPCResponse) receivedResponse = true
if (replayMessageId != null && msg is JSONRPCResponse) {
_onMessage(msg.copy(id = replayMessageId))
launchMessageHandler(msg.copy(id = replayMessageId))
} else {
_onMessage(msg)
launchMessageHandler(msg)
}
}
.onFailure(_onError)
Expand Down Expand Up @@ -427,7 +431,7 @@ public class StreamableHttpClientTransport(
var id: String? = null
var eventName: String? = null

suspend fun dispatch(id: String?, eventName: String?, data: String) {
fun dispatch(id: String?, eventName: String?, data: String) {
id?.let {
localLastEventId = it
hasPrimingEvent = true
Expand All @@ -441,9 +445,9 @@ public class StreamableHttpClientTransport(
.onSuccess { msg ->
if (msg is JSONRPCResponse) receivedResponse = true
if (replayMessageId != null && msg is JSONRPCResponse) {
_onMessage(msg.copy(id = replayMessageId))
launchMessageHandler(msg.copy(id = replayMessageId))
} else {
_onMessage(msg)
launchMessageHandler(msg)
}
}
.onFailure {
Expand Down Expand Up @@ -481,4 +485,16 @@ public class StreamableHttpClientTransport(
}
return SseStreamResult(hasPrimingEvent, receivedResponse, localLastEventId, localServerRetryDelay)
}
private fun launchMessageHandler(message: JSONRPCMessage) {
Comment on lines 487 to +488
scope.launch(CoroutineName("StreamableHttpTransport.message#${hashCode()}")) {
try {
_onMessage(message)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message" }
_onError(e)
}
}
}
Comment on lines +488 to +499
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.modelcontextprotocol.kotlin.sdk.client.streamable.http

import io.kotest.assertions.fail
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeInstanceOf
Expand Down Expand Up @@ -611,11 +612,10 @@ class StreamableHttpClientTransportTest {

receivedMessages shouldHaveSize 2

val firstNotification = receivedMessages[0] as JSONRPCNotification
firstNotification.method shouldBe "notifications/progress"

val secondNotification = receivedMessages[1] as JSONRPCNotification
secondNotification.method shouldBe "notifications/tools/list_changed"
receivedMessages
.filterIsInstance<JSONRPCNotification>()
.map { it.method }
.shouldContainExactlyInAnyOrder("notifications/progress", "notifications/tools/list_changed")

transport.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import io.modelcontextprotocol.kotlin.sdk.types.McpJson
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlin.concurrent.atomics.AtomicBoolean
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.coroutines.cancellation.CancellationException
Expand All @@ -30,7 +34,7 @@ private val logger = KotlinLogging.logger {}
@OptIn(ExperimentalAtomicApi::class)
public abstract class WebSocketMcpTransport : AbstractTransport() {
private val scope by lazy {
CoroutineScope(session.coroutineContext + SupervisorJob())
CoroutineScope(session.coroutineContext + SupervisorJob(session.coroutineContext.job))
}

private val initialized: AtomicBoolean = AtomicBoolean(false)
Expand Down Expand Up @@ -73,8 +77,8 @@ public abstract class WebSocketMcpTransport : AbstractTransport() {
}

try {
val message = McpJson.decodeFromString<JSONRPCMessage>(message.readText())
_onMessage.invoke(message)
val parsedMessage = McpJson.decodeFromString<JSONRPCMessage>(message.readText())
launchMessageHandler(parsedMessage)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
Expand All @@ -86,6 +90,11 @@ public abstract class WebSocketMcpTransport : AbstractTransport() {

@OptIn(InternalCoroutinesApi::class)
session.coroutineContext.job.invokeOnCompletion {
Comment thread
ghostflyby marked this conversation as resolved.
// Cancel the scope when the session completes. For normal session
// completion the SupervisorJob parent does not auto-cancel children;
// for error/cancellation the propagation already cancels the scope
// job, making this cancel a no-op.
scope.cancel()
if (it != null) {
_onError.invoke(it)
} else {
Expand All @@ -94,6 +103,19 @@ public abstract class WebSocketMcpTransport : AbstractTransport() {
}
}

private fun launchMessageHandler(message: JSONRPCMessage) {
scope.launch(CoroutineName("WebSocketMcpTransport.message#${hashCode()}")) {
try {
_onMessage.invoke(message)
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
logger.error(e) { "Error processing message" }
_onError.invoke(e)
}
}
}

override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
logger.debug { "Sending message" }
if (!initialized.load()) {
Expand All @@ -109,7 +131,10 @@ public abstract class WebSocketMcpTransport : AbstractTransport() {
}

logger.debug { "Closing websocket session" }
session.close()
session.coroutineContext.job.join()
withContext(NonCancellable) {
invokeOnCloseCallback()
session.close()
scope.coroutineContext.job.cancelAndJoin()
}
Comment thread
ghostflyby marked this conversation as resolved.
}
}
Loading