Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions apisix/plugins/mcp/broker/shared_dict.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@ local _M = {}
local mt = { __index = _M }


local STORAGE_SUFFIX_QUEUE = ":queue"
local STORAGE_SUFFIX_QUEUE = ":queue"
local STORAGE_SUFFIX_SESSION = ":session"

-- a live SSE session refreshes its marker well within this window, so the
-- marker (and any queue left behind) expires on its own if the session goes
-- away without running its teardown path
local SESSION_TTL = 60
-- upper bound on the number of messages buffered for a single session, so one
-- session cannot by itself consume the whole shared dictionary
local QUEUE_MAX_LENGTH = 1024


function _M.new(opts)
Expand All @@ -45,14 +54,56 @@ function _M.on(self, event, cb)
end


-- record that this session has a live SSE connection on some worker. the
-- message endpoint consults this before queueing, so that a request can only
-- enqueue work for a session that actually exists. refreshed periodically by
-- the owning session and removed on teardown; the TTL is a backstop for an
-- unclean teardown.
function _M.register(self)
local ok, err = shared_dict:set(self.session_id .. STORAGE_SUFFIX_SESSION,
true, SESSION_TTL)
if not ok then
return nil, "failed to register session: " .. err
end
return true
end


function _M.unregister(self)
shared_dict:delete(self.session_id .. STORAGE_SUFFIX_SESSION)
shared_dict:delete(self.session_id .. STORAGE_SUFFIX_QUEUE)
end


-- whether a session currently has a live SSE connection. module-level by
-- design: the message endpoint checks this before creating a server.
function _M.session_exists(session_id)
if not session_id then
return false
end
return shared_dict:get(session_id .. STORAGE_SUFFIX_SESSION) ~= nil
end


function _M.push(self, message)
if not message then
return nil, "message is nil"
end
local ok, err = shared_dict:rpush(self.session_id .. STORAGE_SUFFIX_QUEUE, message)
if not ok then
local key = self.session_id .. STORAGE_SUFFIX_QUEUE

local new_len, err = shared_dict:rpush(key, message)
if not new_len then
return nil, "failed to push message to queue: " .. err
end

if new_len > QUEUE_MAX_LENGTH then
-- keep the queue bounded even under concurrent pushers
shared_dict:rpop(key)
return nil, "queue is full"
end

-- keep the queue from outliving its session if teardown is missed
shared_dict:expire(key, SESSION_TTL)
return true
end

Expand Down
27 changes: 26 additions & 1 deletion apisix/plugins/mcp/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local thread_kill = ngx.thread.kill
local worker_exiting = ngx.worker.exiting
local core = require("apisix.core")
local broker_utils = require("apisix.plugins.mcp.broker.utils")
local shared_dict_broker = require("apisix.plugins.mcp.broker.shared_dict")


local _M = {}
Expand All @@ -38,7 +39,7 @@ function _M.new(opts)
local session_id = opts.session_id or core.id.gen_uuid_v4()

-- TODO: configurable broker type
local message_broker = require("apisix.plugins.mcp.broker.shared_dict").new({
local message_broker = shared_dict_broker.new({
session_id = session_id,
})

Expand Down Expand Up @@ -70,6 +71,20 @@ function _M.on(self, event, cb)
end


-- mark this session as live so the message endpoint will accept messages for
-- it. called once the SSE connection is established and refreshed by the ping
-- loop while the connection stays open.
function _M.register(self)
return self.message_broker:register()
end


-- whether the given session currently has a live SSE connection.
function _M.session_exists(session_id)
return shared_dict_broker.session_exists(session_id)
end


function _M.start(self)
self.message_broker:start()

Expand All @@ -89,6 +104,15 @@ function _M.start(self)
self.need_exit = true
break
end
-- refresh the session marker so it stays alive as long as the
-- connection does, and expires shortly after it goes away
local ok, err = self.message_broker:register()
if not ok then
core.log.error("session ", self.session_id,
" failed to refresh session marker: ", err)
self.need_exit = true
break
end
ngx_sleep(30)
end
end)
Expand All @@ -99,6 +123,7 @@ end

function _M.close(self)
if self.message_broker then
self.message_broker:unregister()
self.message_broker:close()
end
end
Expand Down
14 changes: 11 additions & 3 deletions apisix/plugins/mcp/server_wrapper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ local function sse_handler(conf, ctx, opts)

local server = opts.server

-- mark the session as live before advertising its endpoint, so a message
-- sent right after the client learns the session id is accepted
server:register()

-- send endpoint event to advertise the message endpoint
Comment on lines +36 to 40
server.transport:send(conf.base_uri .. "/message?sessionId=" .. server.session_id, "endpoint")

Expand Down Expand Up @@ -93,9 +97,13 @@ function _M.access(conf, ctx, opts)
end

if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then
-- TODO: check ctx.var.arg_sessionId
-- recover server instead of create
opts.server = mcp_server.new({ session_id = ctx.var.arg_sessionId })
local session_id = ctx.var.arg_sessionId
-- only accept messages for a session that currently has a live SSE
-- connection, otherwise a request could queue work under any id
if not mcp_server.session_exists(session_id) then
return core.response.exit(404)
end
opts.server = mcp_server.new({ session_id = session_id })
return core.response.exit(message_handler(conf, ctx, opts))
end

Expand Down
54 changes: 54 additions & 0 deletions t/plugin/mcp-bridge.t
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,57 @@ property "command" is required
property "command" validation failed: wrong type: expected string, got number
done
property "args" validation failed: wrong type: expected array, got string



=== TEST 2: message endpoint only accepts sessions with a live SSE connection
--- config
location /t {
content_by_lua_block {
local server = require("apisix.plugins.mcp.server")
local broker = require("apisix.plugins.mcp.broker.shared_dict")

-- a missing or unknown session id is not accepted
ngx.say("nil: ", server.session_exists(nil))
ngx.say("unknown: ", server.session_exists("bogus"))

-- a registered session is visible to the message endpoint
local b = broker.new({ session_id = "sess-a" })
b:register()
ngx.say("registered: ", server.session_exists("sess-a"))

-- teardown removes it again
b:unregister()
ngx.say("after teardown: ", server.session_exists("sess-a"))
}
}
--- response_body
nil: false
unknown: false
registered: true
after teardown: false



=== TEST 3: per-session queue is bounded
--- config
location /t {
content_by_lua_block {
local broker = require("apisix.plugins.mcp.broker.shared_dict")
local b = broker.new({ session_id = "sess-q" })

local ok, err
for _ = 1, 1024 do
ok, err = b:push("m")
end
ngx.say("at_cap ok=", tostring(ok), " err=", tostring(err))

ok, err = b:push("m")
ngx.say("over_cap ok=", tostring(ok), " err=", tostring(err))

b:unregister()
}
}
--- response_body
at_cap ok=true err=nil
over_cap ok=nil err=queue is full
Loading