From e1f940c6faa436a4465a7df35b67b89f8a0a45df Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Fri, 12 Jun 2026 15:32:10 +0800 Subject: [PATCH] fix(mcp-bridge): only accept messages for an established session The message endpoint trusted the supplied sessionId and pushed into a queue for any value, creating a queue for sessions that were never established. Track established SSE sessions with a refreshed liveness marker in the shared dict and return 404 from the message endpoint when the sessionId is missing or does not match a live session. --- apisix/plugins/mcp/broker/shared_dict.lua | 26 +++++++++++ apisix/plugins/mcp/server.lua | 20 ++++++++- apisix/plugins/mcp/server_wrapper.lua | 14 ++++-- t/plugin/mcp-bridge.t | 54 +++++++++++++++++++++++ 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/apisix/plugins/mcp/broker/shared_dict.lua b/apisix/plugins/mcp/broker/shared_dict.lua index 83e3d86d7579..09e93b09716b 100644 --- a/apisix/plugins/mcp/broker/shared_dict.lua +++ b/apisix/plugins/mcp/broker/shared_dict.lua @@ -30,6 +30,12 @@ local mt = { __index = _M } local STORAGE_SUFFIX_QUEUE = ":queue" +local STORAGE_SUFFIX_ALIVE = ":alive" + +-- liveness marker TTL in seconds. it is refreshed on each keepalive so it +-- comfortably outlives the 30s ping interval, and self-expires if the worker +-- holding the session goes away without running teardown. +local ALIVE_TTL = 90 function _M.new(opts) @@ -57,6 +63,26 @@ function _M.push(self, message) end +function _M.mark_alive(self) + return shared_dict:set(self.session_id .. STORAGE_SUFFIX_ALIVE, true, ALIVE_TTL) +end + + +function _M.clear_alive(self) + shared_dict:delete(self.session_id .. STORAGE_SUFFIX_ALIVE) +end + + +-- whether an SSE session with this id is currently established on any worker. +-- called as a plain function (no instance) from the message endpoint. +function _M.is_alive(session_id) + if not session_id then + return false + end + return shared_dict:get(session_id .. STORAGE_SUFFIX_ALIVE) ~= nil +end + + function _M.start(self) self.thread = thread_spawn(function() while not worker_exiting() do diff --git a/apisix/plugins/mcp/server.lua b/apisix/plugins/mcp/server.lua index 11a41b9de22d..2c6cb468c3a2 100644 --- a/apisix/plugins/mcp/server.lua +++ b/apisix/plugins/mcp/server.lua @@ -24,6 +24,7 @@ local thread_kill = ngx.thread.kill local worker_exiting = ngx.worker.exiting local core = require("apisix.core") local broker_utils = require("apisix.plugins.mcp.broker.utils") +local shared_dict_broker = require("apisix.plugins.mcp.broker.shared_dict") local _M = {} @@ -38,7 +39,7 @@ function _M.new(opts) local session_id = opts.session_id or core.id.gen_uuid_v4() -- TODO: configurable broker type - local message_broker = require("apisix.plugins.mcp.broker.shared_dict").new({ + local message_broker = shared_dict_broker.new({ session_id = session_id, }) @@ -70,6 +71,18 @@ function _M.on(self, event, cb) end +-- mark this session as established so the message endpoint can find it. +function _M.mark_alive(self) + self.message_broker:mark_alive() +end + + +-- whether an established SSE session exists for the given id. +function _M.session_exists(session_id) + return shared_dict_broker.is_alive(session_id) +end + + function _M.start(self) self.message_broker:start() @@ -80,6 +93,10 @@ function _M.start(self) break end + -- refresh liveness so the session stays discoverable by the + -- message endpoint for as long as the SSE connection is open + self.message_broker:mark_alive() + self.next_ping_id = self.next_ping_id + 1 local ok, err = self.transport:send( '{"jsonrpc": "2.0","method": "ping","id":"ping:' .. self.next_ping_id .. '"}') @@ -99,6 +116,7 @@ end function _M.close(self) if self.message_broker then + self.message_broker:clear_alive() self.message_broker:close() end end diff --git a/apisix/plugins/mcp/server_wrapper.lua b/apisix/plugins/mcp/server_wrapper.lua index 5b0ed8831bd0..fb36065db688 100644 --- a/apisix/plugins/mcp/server_wrapper.lua +++ b/apisix/plugins/mcp/server_wrapper.lua @@ -33,6 +33,10 @@ local function sse_handler(conf, ctx, opts) local server = opts.server + -- mark the session established before advertising the endpoint, so a + -- message posted right after the client learns the id is not rejected + server:mark_alive() + -- send endpoint event to advertise the message endpoint server.transport:send(conf.base_uri .. "/message?sessionId=" .. server.session_id, "endpoint") @@ -93,9 +97,13 @@ function _M.access(conf, ctx, opts) end if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then - -- TODO: check ctx.var.arg_sessionId - -- recover server instead of create - opts.server = mcp_server.new({ session_id = ctx.var.arg_sessionId }) + local session_id = ctx.var.arg_sessionId + -- only deliver to an already established SSE session; a missing or + -- unknown sessionId must not create a new queue + if not mcp_server.session_exists(session_id) then + return core.response.exit(404) + end + opts.server = mcp_server.new({ session_id = session_id }) return core.response.exit(message_handler(conf, ctx, opts)) end diff --git a/t/plugin/mcp-bridge.t b/t/plugin/mcp-bridge.t index 25369f02f821..77e1cccb4b6f 100644 --- a/t/plugin/mcp-bridge.t +++ b/t/plugin/mcp-bridge.t @@ -57,3 +57,57 @@ property "command" is required property "command" validation failed: wrong type: expected string, got number done property "args" validation failed: wrong type: expected array, got string + + + +=== TEST 2: set up a route with mcp-bridge +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/mcp/*", + "plugins": { + "mcp-bridge": { + "command": "/bin/cat", + "base_uri": "/mcp" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { "127.0.0.1:1": 1 } + } + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.say("passed") + } + } +--- response_body +passed + + + +=== TEST 3: message endpoint rejects an unknown sessionId +--- request +POST /mcp/message?sessionId=00000000-0000-4000-8000-000000000000 +{"jsonrpc":"2.0","id":1,"method":"tools/list"} +--- more_headers +Content-Type: application/json +--- error_code: 404 + + + +=== TEST 4: message endpoint rejects a missing sessionId +--- request +POST /mcp/message +{"jsonrpc":"2.0","id":1,"method":"tools/list"} +--- more_headers +Content-Type: application/json +--- error_code: 404