From f2cc44dd38c33a4acb220f34ff7c65cf3215ae20 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Fri, 12 Jun 2026 15:46:17 +0800 Subject: [PATCH 1/2] fix(mcp-bridge): only queue messages for sessions with a live connection --- apisix/plugins/mcp/broker/shared_dict.lua | 51 ++++++++++++++++++++- apisix/plugins/mcp/server.lua | 21 ++++++++- apisix/plugins/mcp/server_wrapper.lua | 14 ++++-- t/plugin/mcp-bridge.t | 54 +++++++++++++++++++++++ 4 files changed, 134 insertions(+), 6 deletions(-) diff --git a/apisix/plugins/mcp/broker/shared_dict.lua b/apisix/plugins/mcp/broker/shared_dict.lua index 83e3d86d7579..69c7d10b0dbc 100644 --- a/apisix/plugins/mcp/broker/shared_dict.lua +++ b/apisix/plugins/mcp/broker/shared_dict.lua @@ -29,7 +29,16 @@ local _M = {} local mt = { __index = _M } -local STORAGE_SUFFIX_QUEUE = ":queue" +local STORAGE_SUFFIX_QUEUE = ":queue" +local STORAGE_SUFFIX_SESSION = ":session" + +-- a live SSE session refreshes its marker well within this window, so the +-- marker (and any queue left behind) expires on its own if the session goes +-- away without running its teardown path +local SESSION_TTL = 60 +-- upper bound on the number of messages buffered for a single session, so one +-- session cannot by itself consume the whole shared dictionary +local QUEUE_MAX_LENGTH = 1024 function _M.new(opts) @@ -45,14 +54,52 @@ function _M.on(self, event, cb) end +-- record that this session has a live SSE connection on some worker. the +-- message endpoint consults this before queueing, so that a request can only +-- enqueue work for a session that actually exists. refreshed periodically by +-- the owning session and removed on teardown; the TTL is a backstop for an +-- unclean teardown. +function _M.register(self) + local ok, err = shared_dict:set(self.session_id .. STORAGE_SUFFIX_SESSION, + true, SESSION_TTL) + if not ok then + return nil, "failed to register session: " .. err + end + return true +end + + +function _M.unregister(self) + shared_dict:delete(self.session_id .. STORAGE_SUFFIX_SESSION) + shared_dict:delete(self.session_id .. STORAGE_SUFFIX_QUEUE) +end + + +-- whether a session currently has a live SSE connection. module-level by +-- design: the message endpoint checks this before creating a server. +function _M.session_exists(session_id) + if not session_id then + return false + end + return shared_dict:get(session_id .. STORAGE_SUFFIX_SESSION) ~= nil +end + + function _M.push(self, message) if not message then return nil, "message is nil" end - local ok, err = shared_dict:rpush(self.session_id .. STORAGE_SUFFIX_QUEUE, message) + local key = self.session_id .. STORAGE_SUFFIX_QUEUE + local len = shared_dict:llen(key) + if len and len >= QUEUE_MAX_LENGTH then + return nil, "queue is full" + end + local ok, err = shared_dict:rpush(key, message) if not ok then return nil, "failed to push message to queue: " .. err end + -- keep the queue from outliving its session if teardown is missed + shared_dict:expire(key, SESSION_TTL) return true end diff --git a/apisix/plugins/mcp/server.lua b/apisix/plugins/mcp/server.lua index 11a41b9de22d..6378074e5e8e 100644 --- a/apisix/plugins/mcp/server.lua +++ b/apisix/plugins/mcp/server.lua @@ -24,6 +24,7 @@ local thread_kill = ngx.thread.kill local worker_exiting = ngx.worker.exiting local core = require("apisix.core") local broker_utils = require("apisix.plugins.mcp.broker.utils") +local shared_dict_broker = require("apisix.plugins.mcp.broker.shared_dict") local _M = {} @@ -38,7 +39,7 @@ function _M.new(opts) local session_id = opts.session_id or core.id.gen_uuid_v4() -- TODO: configurable broker type - local message_broker = require("apisix.plugins.mcp.broker.shared_dict").new({ + local message_broker = shared_dict_broker.new({ session_id = session_id, }) @@ -70,6 +71,20 @@ function _M.on(self, event, cb) end +-- mark this session as live so the message endpoint will accept messages for +-- it. called once the SSE connection is established and refreshed by the ping +-- loop while the connection stays open. +function _M.register(self) + return self.message_broker:register() +end + + +-- whether the given session currently has a live SSE connection. +function _M.session_exists(session_id) + return shared_dict_broker.session_exists(session_id) +end + + function _M.start(self) self.message_broker:start() @@ -89,6 +104,9 @@ function _M.start(self) self.need_exit = true break end + -- refresh the session marker so it stays alive as long as the + -- connection does, and expires shortly after it goes away + self.message_broker:register() ngx_sleep(30) end end) @@ -99,6 +117,7 @@ end function _M.close(self) if self.message_broker then + self.message_broker:unregister() self.message_broker:close() end end diff --git a/apisix/plugins/mcp/server_wrapper.lua b/apisix/plugins/mcp/server_wrapper.lua index 5b0ed8831bd0..718b75846930 100644 --- a/apisix/plugins/mcp/server_wrapper.lua +++ b/apisix/plugins/mcp/server_wrapper.lua @@ -33,6 +33,10 @@ local function sse_handler(conf, ctx, opts) local server = opts.server + -- mark the session as live before advertising its endpoint, so a message + -- sent right after the client learns the session id is accepted + server:register() + -- send endpoint event to advertise the message endpoint server.transport:send(conf.base_uri .. "/message?sessionId=" .. server.session_id, "endpoint") @@ -93,9 +97,13 @@ function _M.access(conf, ctx, opts) end if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then - -- TODO: check ctx.var.arg_sessionId - -- recover server instead of create - opts.server = mcp_server.new({ session_id = ctx.var.arg_sessionId }) + local session_id = ctx.var.arg_sessionId + -- only accept messages for a session that currently has a live SSE + -- connection, otherwise a request could queue work under any id + if not mcp_server.session_exists(session_id) then + return core.response.exit(404) + end + opts.server = mcp_server.new({ session_id = session_id }) return core.response.exit(message_handler(conf, ctx, opts)) end diff --git a/t/plugin/mcp-bridge.t b/t/plugin/mcp-bridge.t index 25369f02f821..2736a67fbf16 100644 --- a/t/plugin/mcp-bridge.t +++ b/t/plugin/mcp-bridge.t @@ -57,3 +57,57 @@ property "command" is required property "command" validation failed: wrong type: expected string, got number done property "args" validation failed: wrong type: expected array, got string + + + +=== TEST 2: message endpoint only accepts sessions with a live SSE connection +--- config + location /t { + content_by_lua_block { + local server = require("apisix.plugins.mcp.server") + local broker = require("apisix.plugins.mcp.broker.shared_dict") + + -- a missing or unknown session id is not accepted + ngx.say("nil: ", server.session_exists(nil)) + ngx.say("unknown: ", server.session_exists("bogus")) + + -- a registered session is visible to the message endpoint + local b = broker.new({ session_id = "sess-a" }) + b:register() + ngx.say("registered: ", server.session_exists("sess-a")) + + -- teardown removes it again + b:unregister() + ngx.say("after teardown: ", server.session_exists("sess-a")) + } + } +--- response_body +nil: false +unknown: false +registered: true +after teardown: false + + + +=== TEST 3: per-session queue is bounded +--- config + location /t { + content_by_lua_block { + local broker = require("apisix.plugins.mcp.broker.shared_dict") + local b = broker.new({ session_id = "sess-q" }) + + local ok, err + for _ = 1, 1024 do + ok, err = b:push("m") + end + ngx.say("at_cap ok=", tostring(ok), " err=", tostring(err)) + + ok, err = b:push("m") + ngx.say("over_cap ok=", tostring(ok), " err=", tostring(err)) + + b:unregister() + } + } +--- response_body +at_cap ok=true err=nil +over_cap ok=nil err=queue is full From 51da766b6644c4e64120842c44d0c00b7cb6b1b0 Mon Sep 17 00:00:00 2001 From: Shreemaan Abhishek Date: Mon, 15 Jun 2026 11:30:51 +0545 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- apisix/plugins/mcp/broker/shared_dict.lua | 16 ++++++++++------ apisix/plugins/mcp/server.lua | 8 +++++++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/apisix/plugins/mcp/broker/shared_dict.lua b/apisix/plugins/mcp/broker/shared_dict.lua index 69c7d10b0dbc..fe9ad416bcdd 100644 --- a/apisix/plugins/mcp/broker/shared_dict.lua +++ b/apisix/plugins/mcp/broker/shared_dict.lua @@ -90,14 +90,18 @@ function _M.push(self, message) return nil, "message is nil" end local key = self.session_id .. STORAGE_SUFFIX_QUEUE - local len = shared_dict:llen(key) - if len and len >= QUEUE_MAX_LENGTH then - return nil, "queue is full" - end - local ok, err = shared_dict:rpush(key, message) - if not ok then + + local new_len, err = shared_dict:rpush(key, message) + if not new_len then return nil, "failed to push message to queue: " .. err end + + if new_len > QUEUE_MAX_LENGTH then + -- keep the queue bounded even under concurrent pushers + shared_dict:rpop(key) + return nil, "queue is full" + end + -- keep the queue from outliving its session if teardown is missed shared_dict:expire(key, SESSION_TTL) return true diff --git a/apisix/plugins/mcp/server.lua b/apisix/plugins/mcp/server.lua index 6378074e5e8e..2cbf721f1366 100644 --- a/apisix/plugins/mcp/server.lua +++ b/apisix/plugins/mcp/server.lua @@ -106,7 +106,13 @@ function _M.start(self) end -- refresh the session marker so it stays alive as long as the -- connection does, and expires shortly after it goes away - self.message_broker:register() + local ok, err = self.message_broker:register() + if not ok then + core.log.error("session ", self.session_id, + " failed to refresh session marker: ", err) + self.need_exit = true + break + end ngx_sleep(30) end end)