Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apisix/plugins/mcp/broker/shared_dict.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ local mt = { __index = _M }


local STORAGE_SUFFIX_QUEUE = ":queue"
local STORAGE_SUFFIX_ALIVE = ":alive"

-- liveness marker TTL in seconds. it is refreshed on each keepalive so it
-- comfortably outlives the 30s ping interval, and self-expires if the worker
-- holding the session goes away without running teardown.
local ALIVE_TTL = 90


function _M.new(opts)
Expand Down Expand Up @@ -57,6 +63,26 @@ function _M.push(self, message)
end


function _M.mark_alive(self)
return shared_dict:set(self.session_id .. STORAGE_SUFFIX_ALIVE, true, ALIVE_TTL)
end


function _M.clear_alive(self)
shared_dict:delete(self.session_id .. STORAGE_SUFFIX_ALIVE)
end


-- whether an SSE session with this id is currently established on any worker.
-- called as a plain function (no instance) from the message endpoint.
function _M.is_alive(session_id)
if not session_id then
return false
end
return shared_dict:get(session_id .. STORAGE_SUFFIX_ALIVE) ~= nil
end


function _M.start(self)
self.thread = thread_spawn(function()
while not worker_exiting() do
Expand Down
20 changes: 19 additions & 1 deletion apisix/plugins/mcp/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local thread_kill = ngx.thread.kill
local worker_exiting = ngx.worker.exiting
local core = require("apisix.core")
local broker_utils = require("apisix.plugins.mcp.broker.utils")
local shared_dict_broker = require("apisix.plugins.mcp.broker.shared_dict")


local _M = {}
Expand All @@ -38,7 +39,7 @@ function _M.new(opts)
local session_id = opts.session_id or core.id.gen_uuid_v4()

-- TODO: configurable broker type
local message_broker = require("apisix.plugins.mcp.broker.shared_dict").new({
local message_broker = shared_dict_broker.new({
session_id = session_id,
})

Expand Down Expand Up @@ -70,6 +71,18 @@ function _M.on(self, event, cb)
end


-- mark this session as established so the message endpoint can find it.
function _M.mark_alive(self)
self.message_broker:mark_alive()
end


-- whether an established SSE session exists for the given id.
function _M.session_exists(session_id)
return shared_dict_broker.is_alive(session_id)
end


function _M.start(self)
self.message_broker:start()

Expand All @@ -80,6 +93,10 @@ function _M.start(self)
break
end

-- refresh liveness so the session stays discoverable by the
-- message endpoint for as long as the SSE connection is open
self.message_broker:mark_alive()

self.next_ping_id = self.next_ping_id + 1
local ok, err = self.transport:send(
'{"jsonrpc": "2.0","method": "ping","id":"ping:' .. self.next_ping_id .. '"}')
Expand All @@ -99,6 +116,7 @@ end

function _M.close(self)
if self.message_broker then
self.message_broker:clear_alive()
self.message_broker:close()
end
end
Expand Down
14 changes: 11 additions & 3 deletions apisix/plugins/mcp/server_wrapper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ local function sse_handler(conf, ctx, opts)

local server = opts.server

-- mark the session established before advertising the endpoint, so a
-- message posted right after the client learns the id is not rejected
server:mark_alive()

-- send endpoint event to advertise the message endpoint
server.transport:send(conf.base_uri .. "/message?sessionId=" .. server.session_id, "endpoint")

Expand Down Expand Up @@ -93,9 +97,13 @@ function _M.access(conf, ctx, opts)
end

if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then
-- TODO: check ctx.var.arg_sessionId
-- recover server instead of create
opts.server = mcp_server.new({ session_id = ctx.var.arg_sessionId })
local session_id = ctx.var.arg_sessionId
-- only deliver to an already established SSE session; a missing or
-- unknown sessionId must not create a new queue
if not mcp_server.session_exists(session_id) then
return core.response.exit(404)
end
opts.server = mcp_server.new({ session_id = session_id })
return core.response.exit(message_handler(conf, ctx, opts))
end

Expand Down
54 changes: 54 additions & 0 deletions t/plugin/mcp-bridge.t
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,57 @@ property "command" is required
property "command" validation failed: wrong type: expected string, got number
done
property "args" validation failed: wrong type: expected array, got string



=== TEST 2: set up a route with mcp-bridge
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"uri": "/mcp/*",
"plugins": {
"mcp-bridge": {
"command": "/bin/cat",
"base_uri": "/mcp"
}
},
"upstream": {
"type": "roundrobin",
"nodes": { "127.0.0.1:1": 1 }
}
}]]
)
if code >= 300 then
ngx.status = code
ngx.say(body)
return
end
ngx.say("passed")
}
}
--- response_body
passed



=== TEST 3: message endpoint rejects an unknown sessionId
--- request
POST /mcp/message?sessionId=00000000-0000-4000-8000-000000000000
{"jsonrpc":"2.0","id":1,"method":"tools/list"}
--- more_headers
Content-Type: application/json
--- error_code: 404



=== TEST 4: message endpoint rejects a missing sessionId
--- request
POST /mcp/message
{"jsonrpc":"2.0","id":1,"method":"tools/list"}
--- more_headers
Content-Type: application/json
--- error_code: 404
Loading