diff --git a/go/go.mod b/go/go.mod index 611f2461e..cdc8d2081 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,7 +16,7 @@ require ( github.com/jedib0t/go-pretty/v6 v6.6.8 github.com/kagent-dev/kmcp v0.2.2 github.com/kagent-dev/mockllm v0.0.3 - github.com/mark3labs/mcp-go v0.40.0 + github.com/modelcontextprotocol/go-sdk v1.2.0 github.com/muesli/reflow v0.3.0 github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.1 @@ -43,8 +43,6 @@ require ( github.com/atotto/clipboard v0.1.4 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/aymanbagabas/go-udiff v0.3.1 // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/charmbracelet/colorprofile v0.3.2 // indirect github.com/charmbracelet/x/ansi v0.10.1 // indirect @@ -53,7 +51,7 @@ require ( github.com/charmbracelet/x/term v0.2.1 // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/evanphx/json-patch v5.9.11+incompatible // indirect - github.com/invopop/jsonschema v0.13.0 // indirect + github.com/google/jsonschema-go v0.3.0 // indirect github.com/lucasb-eyer/go-colorful v1.3.0 // indirect github.com/mattn/go-localereader v0.0.2-0.20220822084749-2491eb6c1c75 // indirect github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect @@ -66,7 +64,6 @@ require ( github.com/tidwall/match v1.2.0 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/tidwall/sjson v1.2.5 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect diff --git a/go/go.sum b/go/go.sum index 8158577f6..e7c83f7f7 100644 --- a/go/go.sum +++ b/go/go.sum @@ -22,16 +22,12 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/aymanbagabas/go-udiff v0.3.1 h1:LV+qyBQ2pqe0u42ZsUEtPiCaUoqgA9gYRDs3vj1nolY= github.com/aymanbagabas/go-udiff v0.3.1/go.mod h1:G0fsKmG+P6ylD0r6N/KgQD/nWzgfnl8ZBcNLgcbrw8E= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/briandowns/spinner v1.23.2 h1:Zc6ecUnI+YzLmJniCfDNaMbW0Wid1d5+qcTq4L2FW8w= github.com/briandowns/spinner v1.23.2/go.mod h1:LaZeM4wm2Ywy6vO571mvhQNRcWfRUnXOs0RcKV0wYKM= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -128,6 +124,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/jsonschema-go v0.3.0 h1:6AH2TxVNtk3IlvkkhjrtbUc4S8AvO0Xii0DxIygDg+Q= +github.com/google/jsonschema-go v0.3.0/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8 h1:ZI8gCoCjGzPsum4L21jHdQs8shFBIQih1TM9Rd/c+EQ= github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -143,8 +141,6 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= -github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -193,8 +189,6 @@ github.com/lucasb-eyer/go-colorful v1.3.0 h1:2/yBRLdWBZKrf7gB40FoiKfAWYQ0lqNcbuQ github.com/lucasb-eyer/go-colorful v1.3.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mailru/easyjson v0.9.1 h1:LbtsOm5WAswyWbvTEOqhypdPeZzHavpZx96/n553mR8= github.com/mailru/easyjson v0.9.1/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= -github.com/mark3labs/mcp-go v0.40.0 h1:M0oqK412OHBKut9JwXSsj4KanSmEKpzoW8TcxoPOkAU= -github.com/mark3labs/mcp-go v0.40.0/go.mod h1:T7tUa2jO6MavG+3P25Oy/jR7iCeJPHImCZHRymCn39g= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= @@ -206,6 +200,8 @@ github.com/mattn/go-localereader v0.0.2-0.20220822084749-2491eb6c1c75/go.mod h1: github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.17 h1:78v8ZlW0bP43XfmAfPsdXcoNCelfMHsDmd/pkENfrjQ= github.com/mattn/go-runewidth v0.0.17/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/modelcontextprotocol/go-sdk v1.2.0 h1:Y23co09300CEk8iZ/tMxIX1dVmKZkzoSBZOpJwUnc/s= +github.com/modelcontextprotocol/go-sdk v1.2.0/go.mod h1:6fM3LCm3yV7pAs8isnKLn07oKtB0MP9LHd3DfAcKw10= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -299,8 +295,6 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd h1:dLuIF2kX9c+KknGJUdJi1Il1SDiTSK158/BB9kdgAew= -github.com/wk8/go-ordered-map/v2 v2.1.9-0.20240815153524-6ea36470d1bd/go.mod h1:DbzwytT4g/odXquuOCqroKvtxxldI4nb3nuesHF/Exo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= diff --git a/go/internal/controller/reconciler/reconciler.go b/go/internal/controller/reconciler/reconciler.go index 258fd62a9..df26103bb 100644 --- a/go/internal/controller/reconciler/reconciler.go +++ b/go/internal/controller/reconciler/reconciler.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "net/http" "reflect" "slices" "strings" @@ -27,9 +28,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/database" "github.com/kagent-dev/kagent/go/internal/utils" "github.com/kagent-dev/kagent/go/internal/version" - mcp_client "github.com/mark3labs/mcp-go/client" - "github.com/mark3labs/mcp-go/client/transport" - "github.com/mark3labs/mcp-go/mcp" + "github.com/modelcontextprotocol/go-sdk/mcp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -665,41 +664,73 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex return tools, nil } -func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServerSpec, namespace string) (transport.Interface, error) { +func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServerSpec, namespace string) (mcp.Transport, error) { headers, err := s.ResolveHeaders(ctx, a.kube, namespace) if err != nil { return nil, err } + httpClient := newHTTPClient(headers) + switch s.Protocol { case v1alpha2.RemoteMCPServerProtocolSse: - return transport.NewSSE(s.URL, transport.WithHeaders(headers)) + return &mcp.SSEClientTransport{ + Endpoint: s.URL, + HTTPClient: httpClient, + }, nil default: - return transport.NewStreamableHTTP(s.URL, transport.WithHTTPHeaders(headers)) + return &mcp.StreamableClientTransport{ + Endpoint: s.URL, + HTTPClient: httpClient, + }, nil } } -func (a *kagentReconciler) listTools(ctx context.Context, tsp transport.Interface, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) { - client := mcp_client.NewClient(tsp) - err := client.Start(ctx) - if err != nil { - return nil, fmt.Errorf("failed to start client for toolServer %s: %v", toolServer.Name, err) - } - defer client.Close() - _, err = client.Initialize(ctx, mcp.InitializeRequest{ - Params: mcp.InitializeParams{ - ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, - Capabilities: mcp.ClientCapabilities{}, - ClientInfo: mcp.Implementation{ - Name: "kagent-controller", - Version: version.Version, - }, +// go-sdk does not have a WithHeaders option when initializing transport +// so we need to create a custom HTTP client that adds headers to all requests. +func newHTTPClient(headers map[string]string) *http.Client { + if len(headers) == 0 { + return http.DefaultClient + } + return &http.Client{ + Transport: &headerTransport{ + headers: headers, + base: http.DefaultTransport, }, - }) + } +} + +// headerTransport is an http.RoundTripper that adds custom headers to requests. +type headerTransport struct { + headers map[string]string + base http.RoundTripper +} + +func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req = req.Clone(req.Context()) + for k, v := range t.headers { + req.Header.Set(k, v) + } + if t.base == nil { + t.base = http.DefaultTransport + } + return t.base.RoundTrip(req) +} + +func (a *kagentReconciler) listTools(ctx context.Context, tsp mcp.Transport, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) { + impl := &mcp.Implementation{ + Name: "kagent-controller", + Version: version.Version, + } + client := mcp.NewClient(impl, nil) + + session, err := client.Connect(ctx, tsp, nil) if err != nil { - return nil, fmt.Errorf("failed to initialize client for toolServer %s: %v", toolServer.Name, err) + return nil, fmt.Errorf("failed to connect client for toolServer %s: %v", toolServer.Name, err) } - result, err := client.ListTools(ctx, mcp.ListToolsRequest{}) + defer session.Close() + + result, err := session.ListTools(ctx, &mcp.ListToolsParams{}) if err != nil { return nil, fmt.Errorf("failed to list tools for toolServer %s: %v", toolServer.Name, err) } diff --git a/go/internal/httpserver/server.go b/go/internal/httpserver/server.go index 1e7896d57..5d992ec56 100644 --- a/go/internal/httpserver/server.go +++ b/go/internal/httpserver/server.go @@ -9,6 +9,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/a2a" "github.com/kagent-dev/kagent/go/internal/database" "github.com/kagent-dev/kagent/go/internal/httpserver/handlers" + "github.com/kagent-dev/kagent/go/internal/mcp" common "github.com/kagent-dev/kagent/go/internal/utils" "github.com/kagent-dev/kagent/go/internal/version" "github.com/kagent-dev/kagent/go/pkg/auth" @@ -35,6 +36,7 @@ const ( APIPathMemories = "/api/memories" APIPathNamespaces = "/api/namespaces" APIPathA2A = "/api/a2a" + APIPathMCP = "/mcp" APIPathFeedback = "/api/feedback" APIPathLangGraph = "/api/langgraph" APIPathCrewAI = "/api/crewai" @@ -51,6 +53,7 @@ type ServerConfig struct { BindAddr string KubeClient ctrl_client.Client A2AHandler a2a.A2AHandlerMux + MCPHandler *mcp.MCPHandler WatchedNamespaces []string DbClient database.Client Authenticator auth.AuthProvider @@ -225,6 +228,11 @@ func (s *HTTPServer) setupRoutes() { // A2A s.router.PathPrefix(APIPathA2A + "/{namespace}/{name}").Handler(s.config.A2AHandler) + // MCP + if s.config.MCPHandler != nil { + s.router.PathPrefix(APIPathMCP).Handler(s.config.MCPHandler) + } + // Use middleware for common functionality s.router.Use(auth.AuthnMiddleware(s.authenticator)) s.router.Use(contentTypeMiddleware) diff --git a/go/internal/mcp/mcp_handler.go b/go/internal/mcp/mcp_handler.go new file mode 100644 index 000000000..eb98d4280 --- /dev/null +++ b/go/internal/mcp/mcp_handler.go @@ -0,0 +1,317 @@ +package mcp + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/kagent-dev/kagent/go/api/v1alpha2" + "github.com/kagent-dev/kagent/go/internal/a2a" + authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth" + "github.com/kagent-dev/kagent/go/internal/version" + "github.com/kagent-dev/kagent/go/pkg/auth" + mcpsdk "github.com/modelcontextprotocol/go-sdk/mcp" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrllog "sigs.k8s.io/controller-runtime/pkg/log" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" + "trpc.group/trpc-go/trpc-a2a-go/protocol" +) + +// MCPHandler handles MCP requests and bridges them to A2A endpoints +type MCPHandler struct { + kubeClient client.Client + a2aBaseURL string + authenticator auth.AuthProvider + httpHandler *mcpsdk.StreamableHTTPHandler + server *mcpsdk.Server + a2aClients sync.Map +} + +// Input types for MCP tools +type ListAgentsInput struct{} + +type ListAgentsOutput struct { + Agents []AgentSummary `json:"agents"` +} + +type AgentSummary struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` +} + +type InvokeAgentInput struct { + Agent string `json:"agent" jsonschema:"Agent reference in format namespace/name"` + Task string `json:"task" jsonschema:"Task to run"` + ContextID string `json:"context_id,omitempty" jsonschema:"Optional A2A context ID to continue a conversation"` +} + +type InvokeAgentOutput struct { + Agent string `json:"agent"` + Text string `json:"text"` + ContextID string `json:"context_id,omitempty"` +} + +// NewMCPHandler creates a new MCP handler +// Wraps the StreamableHTTPHandler and adds A2A bridging and context management. +func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider) (*MCPHandler, error) { + handler := &MCPHandler{ + kubeClient: kubeClient, + a2aBaseURL: a2aBaseURL, + authenticator: authenticator, + } + + // Create MCP server + impl := &mcpsdk.Implementation{ + Name: "kagent-agents", + Version: version.Version, + } + server := mcpsdk.NewServer(impl, nil) + handler.server = server + + // Add list_agents tool + mcpsdk.AddTool[ListAgentsInput, ListAgentsOutput]( + server, + &mcpsdk.Tool{ + Name: "list_agents", + Description: "List invokable kagent agents (accepted + deploymentReady)", + }, + handler.handleListAgents, + ) + + // Add invoke_agent tool + mcpsdk.AddTool[InvokeAgentInput, InvokeAgentOutput]( + server, + &mcpsdk.Tool{ + Name: "invoke_agent", + Description: "Invoke a kagent agent via A2A", + }, + handler.handleInvokeAgent, + ) + + // Create HTTP handler + handler.httpHandler = mcpsdk.NewStreamableHTTPHandler( + func(*http.Request) *mcpsdk.Server { + return server + }, + nil, + ) + + return handler, nil +} + +// handleListAgents handles the list_agents MCP tool +func (h *MCPHandler) handleListAgents(ctx context.Context, req *mcpsdk.CallToolRequest, input ListAgentsInput) (*mcpsdk.CallToolResult, ListAgentsOutput, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents") + + agentList := &v1alpha2.AgentList{} + if err := h.kubeClient.List(ctx, agentList); err != nil { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to list agents: %v", err)}, + }, + IsError: true, + }, ListAgentsOutput{}, nil + } + + agents := make([]AgentSummary, 0) + for _, agent := range agentList.Items { + // Check if agent is accepted and deployment ready + deploymentReady := false + accepted := false + for _, condition := range agent.Status.Conditions { + if condition.Type == "Ready" && condition.Reason == "DeploymentReady" && condition.Status == "True" { + deploymentReady = true + } + if condition.Type == "Accepted" && condition.Status == "True" { + accepted = true + } + } + + if !accepted || !deploymentReady { + continue + } + + ref := agent.Namespace + "/" + agent.Name + description := agent.Spec.Description + agents = append(agents, AgentSummary{ + Ref: ref, + Description: description, + }) + } + + log.Info("Listed agents", "count", len(agents)) + + output := ListAgentsOutput{Agents: agents} + + var fallbackText strings.Builder + if len(agents) == 0 { + fallbackText.WriteString("No invokable agents found.") + } else { + for i, agent := range agents { + if i > 0 { + fallbackText.WriteByte('\n') + } + fallbackText.WriteString(agent.Ref) + if agent.Description != "" { + fallbackText.WriteString(" - ") + fallbackText.WriteString(agent.Description) + } + } + } + + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fallbackText.String()}, + }, + }, output, nil +} + +// handleInvokeAgent handles the invoke_agent MCP tool +func (h *MCPHandler) handleInvokeAgent(ctx context.Context, req *mcpsdk.CallToolRequest, input InvokeAgentInput) (*mcpsdk.CallToolResult, InvokeAgentOutput, error) { + log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent") + + // Parse agent reference (namespace/name or just name) + agentNS, agentName, ok := strings.Cut(input.Agent, "/") + if !ok { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: "agent must be in format 'namespace/name'"}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil + } + agentRef := agentNS + "/" + agentName + agentNns := types.NamespacedName{Namespace: agentNS, Name: agentName} + + // Get context ID from client request (stateless mode) + // If not provided, contextIDPtr will be nil and a new conversation will start + var contextIDPtr *string + if input.ContextID != "" { + contextIDPtr = &input.ContextID + log.V(1).Info("Using context_id from client request", "context_id", input.ContextID) + } + + // Get or create cached A2A client for this agent + a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef) + var a2aClient *a2aclient.A2AClient + + if cached, ok := h.a2aClients.Load(agentRef); ok { + if client, ok := cached.(*a2aclient.A2AClient); ok { + a2aClient = client + } + } + + // Create new client if not cached + if a2aClient == nil { + // Build A2A client options with authentication propagation + a2aOpts := []a2aclient.Option{ + a2aclient.WithTimeout(30 * time.Second), + a2aclient.WithHTTPReqHandler( + authimpl.A2ARequestHandler( + h.authenticator, + agentNns, + ), + ), + } + + newClient, err := a2aclient.NewA2AClient(a2aURL, a2aOpts...) + if err != nil { + log.Error(err, "Failed to create A2A client", "agent", agentRef) + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to create A2A client: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil + } + + // Cache the client + h.a2aClients.Store(agentRef, newClient) + a2aClient = newClient + } + + // Send message via A2A + result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{ + Message: protocol.Message{ + Kind: protocol.KindMessage, + Role: protocol.MessageRoleUser, + ContextID: contextIDPtr, + Parts: []protocol.Part{protocol.NewTextPart(input.Task)}, + }, + }) + if err != nil { + log.Error(err, "Failed to send A2A message", "agent", agentRef) + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to send A2A message: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil + } + + // Extract response text and context ID + var responseText, newContextID string + switch a2aResult := result.Result.(type) { + case *protocol.Message: + responseText = a2a.ExtractText(*a2aResult) + if a2aResult.ContextID != nil { + newContextID = *a2aResult.ContextID + } + // Kagent A2A only returns Task type for now + case *protocol.Task: + newContextID = a2aResult.ContextID + if a2aResult.Status.Message != nil { + responseText = a2a.ExtractText(*a2aResult.Status.Message) + } + for _, artifact := range a2aResult.Artifacts { + responseText += a2a.ExtractText(protocol.Message{Parts: artifact.Parts}) + } + } + + if responseText == "" { + raw, err := result.MarshalJSON() + if err != nil { + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: fmt.Sprintf("Failed to marshal result: %v", err)}, + }, + IsError: true, + }, InvokeAgentOutput{}, nil + } + responseText = string(raw) + } + + log.Info("Invoked agent", "agent", agentRef, "hasContextID", newContextID != "") + + // Return context_id in response so client can store it for stateless operation + output := InvokeAgentOutput{ + Agent: agentRef, + Text: responseText, + } + if newContextID != "" { + output.ContextID = newContextID + } + + return &mcpsdk.CallToolResult{ + Content: []mcpsdk.Content{ + &mcpsdk.TextContent{Text: responseText}, + }, + }, output, nil +} + +// ServeHTTP implements http.Handler interface +func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // The MCP HTTP handler handles all the routing internally + h.httpHandler.ServeHTTP(w, r) +} + +// Shutdown gracefully shuts down the MCP handler +func (h *MCPHandler) Shutdown(ctx context.Context) error { + // The new SDK doesn't have an explicit Shutdown method on StreamableHTTPHandler + // The server will be shut down when the context is cancelled + return nil +} diff --git a/go/pkg/app/app.go b/go/pkg/app/app.go index 8fa6762ee..813b3b1d3 100644 --- a/go/pkg/app/app.go +++ b/go/pkg/app/app.go @@ -38,6 +38,7 @@ import ( "github.com/kagent-dev/kagent/go/internal/a2a" "github.com/kagent-dev/kagent/go/internal/database" + "github.com/kagent-dev/kagent/go/internal/mcp" versionmetrics "github.com/kagent-dev/kagent/go/internal/metrics" "github.com/kagent-dev/kagent/go/internal/controller/reconciler" @@ -450,6 +451,17 @@ func Start(getExtensionConfig GetExtensionConfig) { os.Exit(1) } + // Create MCP handler that bridges to A2A + mcpHandler, err := mcp.NewMCPHandler( + mgr.GetClient(), + cfg.A2ABaseUrl+httpserver.APIPathA2A, + extensionCfg.Authenticator, + ) + if err != nil { + setupLog.Error(err, "unable to create MCP handler") + os.Exit(1) + } + // +kubebuilder:scaffold:builder if metricsCertWatcher != nil { setupLog.Info("Adding metrics certificate watcher to manager") @@ -486,6 +498,7 @@ func Start(getExtensionConfig GetExtensionConfig) { BindAddr: cfg.HttpServerAddr, KubeClient: mgr.GetClient(), A2AHandler: a2aHandler, + MCPHandler: mcpHandler, WatchedNamespaces: watchNamespacesList, DbClient: dbClient, Authorizer: extensionCfg.Authorizer, diff --git a/go/test/e2e/invoke_mcp_test.go b/go/test/e2e/invoke_mcp_test.go new file mode 100644 index 000000000..3bc5e2ddf --- /dev/null +++ b/go/test/e2e/invoke_mcp_test.go @@ -0,0 +1,187 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "os" + "strings" + "testing" + "time" + + "github.com/modelcontextprotocol/go-sdk/mcp" + "github.com/stretchr/testify/require" +) + +// mcpEndpointURL returns the URL for the MCP endpoint +func mcpEndpointURL() string { + kagentURL := os.Getenv("KAGENT_URL") + if kagentURL == "" { + // if running locally on kind, do "kubectl port-forward -n kagent deployments/kagent-controller 8083" + kagentURL = "http://localhost:8083" + } + return kagentURL + "/mcp" +} + +// setupMCPClient creates and initializes an MCP client for testing +func setupMCPClient(t *testing.T) *mcp.ClientSession { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + url := mcpEndpointURL() + transport := &mcp.StreamableClientTransport{ + Endpoint: url, + } + + impl := &mcp.Implementation{ + Name: "e2e-test", + Version: "0.0.0", + } + client := mcp.NewClient(impl, nil) + + session, err := client.Connect(ctx, transport, nil) + require.NoError(t, err, "Failed to connect MCP client") + + t.Cleanup(func() { + session.Close() + }) + + return session +} + +// TestE2EMCPEndpointListAgents tests the list_agents tool via the controller's MCP endpoint +// These tests use the kebab-agent deployed via push-test-agent in CI. +func TestE2EMCPEndpointListAgents(t *testing.T) { + ctx := context.Background() + session := setupMCPClient(t) + + // List tools + toolsResult, err := session.ListTools(ctx, &mcp.ListToolsParams{}) + require.NoError(t, err, "Should list tools") + + // Verify expected tools exist + toolNames := make([]string, 0, len(toolsResult.Tools)) + for _, tool := range toolsResult.Tools { + toolNames = append(toolNames, tool.Name) + } + require.Contains(t, toolNames, "list_agents", "Should have list_agents tool") + require.Contains(t, toolNames, "invoke_agent", "Should have invoke_agent tool") + + // Call list_agents tool + listAgentsResult, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "list_agents", + }) + require.NoError(t, err, "Should call list_agents tool") + require.NotEmpty(t, listAgentsResult.Content, "Should have content in response") + require.False(t, listAgentsResult.IsError, "Should not be an error") + + agentRef := "kagent/kebab-agent" + found := false + + // First check StructuredContent (preferred) + if listAgentsResult.StructuredContent != nil { + structuredBytes, err := json.Marshal(listAgentsResult.StructuredContent) + require.NoError(t, err, "Should marshal structured content") + var structuredData struct { + Agents []struct { + Ref string `json:"ref"` + Description string `json:"description,omitempty"` + } `json:"agents"` + } + if err := json.Unmarshal(structuredBytes, &structuredData); err == nil { + for _, a := range structuredData.Agents { + if a.Ref == agentRef { + found = true + break + } + } + } + } + + // Check text format for fallback + if !found { + for _, content := range listAgentsResult.Content { + if textContent, ok := content.(*mcp.TextContent); ok { + if strings.Contains(textContent.Text, agentRef) { + found = true + break + } + } + } + } + + require.True(t, found, "Should find agent %s in list", agentRef) +} + +// TestE2EMCPEndpointInvokeAgent tests the invoke_agent tool via the controller's MCP endpoint +func TestE2EMCPEndpointInvokeAgent(t *testing.T) { + ctx := context.Background() + session := setupMCPClient(t) + + // Invoke kebab-agent + agentRef := "kagent/kebab-agent" + invokeResult, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": agentRef, + "task": "What can you do?", + }, + }) + require.NoError(t, err, "Should call invoke_agent tool") + require.NotEmpty(t, invokeResult.Content, "Should have content in response") + require.False(t, invokeResult.IsError, "Should not be an error") + + foundText := false + + if invokeResult.StructuredContent != nil { + structuredBytes, err := json.Marshal(invokeResult.StructuredContent) + require.NoError(t, err, "Should marshal structured content") + var structuredData struct { + Agent string `json:"agent"` + Text string `json:"text"` + } + if err := json.Unmarshal(structuredBytes, &structuredData); err == nil { + if strings.Contains(strings.ToLower(structuredData.Text), "kebab") { + foundText = true + } + } + } + + if !foundText { + for _, content := range invokeResult.Content { + if textContent, ok := content.(*mcp.TextContent); ok && textContent.Text != "" { + if strings.Contains(strings.ToLower(textContent.Text), "kebab") { + foundText = true + break + } + } + } + } + + require.True(t, foundText, "Should have text content containing 'kebab' in response") +} + +// TestE2EMCPEndpointErrorHandling tests error handling in the MCP endpoint +func TestE2EMCPEndpointErrorHandling(t *testing.T) { + ctx := context.Background() + session := setupMCPClient(t) + + // Try to invoke a non-existent agent + result, err := session.CallTool(ctx, &mcp.CallToolParams{ + Name: "invoke_agent", + Arguments: map[string]any{ + "agent": "nonexistent/agent", + "task": "test", + }, + }) + require.NoError(t, err, "CallTool should not return protocol error") + require.True(t, result.IsError, "Should return error") + // This content is the error text for the LLM to know what went wrong + require.NotEmpty(t, result.Content, "Should have error content") + + // Try to call a non-existent tool + _, err = session.CallTool(ctx, &mcp.CallToolParams{ + Name: "nonexistent_tool", + }) + // Should return an error + require.Error(t, err, "Should return error for non-existent tool") +}