Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces an experimental MCP (Model Context Protocol) client implementation (including Streamable HTTP + legacy SSE fallback), updates the server to use a dedicated pkg/mcp/schema package, and adds a cmd/mcp-client CLI to exercise the client (including OAuth login helpers).
Changes:
- Refactor MCP protocol types/constants into
pkg/mcp/schemaand update the MCP server to use them. - Add a new
pkg/mcp/clientimplementation with Streamable HTTP + legacy SSE transport and helpers for tools/prompts/ping. - Add a
cmd/mcp-clientCLI plus integration-style tests gated byMCP_TEST.
Reviewed changes
Copilot reviewed 12 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/mcp/server.go | Switch server request/response/types/constants to pkg/mcp/schema. |
| pkg/mcp/schema/schema.go | Rename schema package and expand schema types (prompts, get prompt, logging capability, helpers). |
| pkg/mcp/client/client.go | Implement Streamable HTTP JSON-RPC client with session handling + notification listener. |
| pkg/mcp/client/sse.go | Add legacy SSE transport (GET stream + POST message endpoint). |
| pkg/mcp/client/list.go | Add ListTools/ListPrompts/ListResources paging + decode helper. |
| pkg/mcp/client/call.go | Add CallTool with JSON Schema argument validation. |
| pkg/mcp/client/ping.go | Add Ping API. |
| pkg/mcp/client/prompt.go | Add prompts/get API. |
| pkg/mcp/client/oauth.go | Add OAuth discovery, dynamic registration, PKCE, and token exchange helpers. |
| pkg/mcp/client/client_test.go | Add basic + integration-gated client tests. |
| pkg/mcp/client/call_test.go | Add integration-gated CallTool tests. |
| cmd/mcp-client/main.go | Add CLI for ping/tools/do/prompts/prompt and OAuth login flow. |
Comments suppressed due to low confidence (1)
pkg/mcp/schema/schema.go:3
- Changing the package name from
mcptoschemais a breaking API change for downstream code that imports this path without an explicit alias (the default package identifier changes). If compatibility matters, consider keeping the package name asmcp(even if the folder isschema), or providing a small compatibility shim package that re-exports these types/constants.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| c.sse = nil | ||
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | ||
| case <-ctx.Done(): | ||
| cancel() | ||
| c.sse = nil |
There was a problem hiding this comment.
connectSSE starts go c.sseReader(...), but on timeout/caller cancellation it sets c.sse = nil while the reader goroutine continues to dereference c.sse (e.g. defer c.sse.wg.Done(), c.sse.pending, etc). This can panic due to a nil pointer. Capture the transport in a local variable and pass it into the goroutine (and have the reader use that pointer), and avoid mutating c.sse until the goroutine has exited.
| c.sse = nil | |
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | |
| case <-ctx.Done(): | |
| cancel() | |
| c.sse = nil | |
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | |
| case <-ctx.Done(): | |
| cancel() |
| // Wait for the endpoint event, timeout, or caller cancellation | ||
| select { | ||
| case ep := <-endpointCh: | ||
| base, err := url.Parse(c.url) | ||
| if err != nil { | ||
| cancel() | ||
| return fmt.Errorf("SSE transport: %w", err) | ||
| } | ||
| ref, err := url.Parse(ep) | ||
| if err != nil { | ||
| cancel() | ||
| return fmt.Errorf("SSE transport: invalid endpoint %q: %w", ep, err) | ||
| } | ||
| c.sse.messageURL = base.ResolveReference(ref).String() | ||
| return nil | ||
| case <-time.After(30 * time.Second): | ||
| cancel() | ||
| c.sse = nil | ||
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | ||
| case <-ctx.Done(): | ||
| cancel() | ||
| c.sse = nil |
There was a problem hiding this comment.
On the error paths after receiving the endpoint event (e.g. URL parse failures) connectSSE calls cancel() but leaves c.sse populated and does not close resp.Body or wait for the reader goroutine. This can leak the connection and leave the client in a partially-initialized SSE state. Ensure all failure paths clean up consistently (close the body, signal/wait for the reader, and reset c.sse safely).
| // Wait for the endpoint event, timeout, or caller cancellation | |
| select { | |
| case ep := <-endpointCh: | |
| base, err := url.Parse(c.url) | |
| if err != nil { | |
| cancel() | |
| return fmt.Errorf("SSE transport: %w", err) | |
| } | |
| ref, err := url.Parse(ep) | |
| if err != nil { | |
| cancel() | |
| return fmt.Errorf("SSE transport: invalid endpoint %q: %w", ep, err) | |
| } | |
| c.sse.messageURL = base.ResolveReference(ref).String() | |
| return nil | |
| case <-time.After(30 * time.Second): | |
| cancel() | |
| c.sse = nil | |
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | |
| case <-ctx.Done(): | |
| cancel() | |
| c.sse = nil | |
| // Ensure we clean up the partially-initialized SSE transport on error. | |
| cleanupSSE := func() { | |
| if c.sse == nil { | |
| return | |
| } | |
| if c.sse.cancel != nil { | |
| c.sse.cancel() | |
| } | |
| if c.sse.body != nil { | |
| _ = c.sse.body.Close() | |
| } | |
| c.sse.wg.Wait() | |
| c.sse = nil | |
| } | |
| // Wait for the endpoint event, timeout, or caller cancellation | |
| select { | |
| case ep := <-endpointCh: | |
| base, err := url.Parse(c.url) | |
| if err != nil { | |
| cleanupSSE() | |
| return fmt.Errorf("SSE transport: %w", err) | |
| } | |
| ref, err := url.Parse(ep) | |
| if err != nil { | |
| cleanupSSE() | |
| return fmt.Errorf("SSE transport: invalid endpoint %q: %w", ep, err) | |
| } | |
| c.sse.messageURL = base.ResolveReference(ref).String() | |
| return nil | |
| case <-time.After(30 * time.Second): | |
| cleanupSSE() | |
| return fmt.Errorf("SSE transport: timeout waiting for endpoint event") | |
| case <-ctx.Done(): | |
| cleanupSSE() |
| // AuthorizationURL builds the authorization URL for the OAuth authorization code flow | ||
| // with PKCE. The caller should open this URL in a browser. | ||
| func (m *OAuthMetadata) AuthorizationURL(clientID, redirectURI string, pkce *PKCEChallenge, scopes ...string) string { | ||
| params := url.Values{ | ||
| "response_type": {"code"}, | ||
| "client_id": {clientID}, | ||
| "redirect_uri": {redirectURI}, | ||
| "code_challenge": {pkce.Challenge}, | ||
| "code_challenge_method": {pkce.Method}, | ||
| } | ||
| if len(scopes) > 0 { | ||
| params.Set("scope", strings.Join(scopes, " ")) | ||
| } | ||
| return m.AuthorizationEndpoint + "?" + params.Encode() | ||
| } |
There was a problem hiding this comment.
The OAuth authorization URL is missing a state parameter, and there is no corresponding validation on the callback. This makes the flow vulnerable to CSRF/callback injection. Generate a cryptographically-random state per login attempt, include it in AuthorizationURL, and verify it in the callback handler before accepting the code.
| mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) { | ||
| if errMsg := r.URL.Query().Get("error"); errMsg != "" { | ||
| desc := r.URL.Query().Get("error_description") | ||
| http.Error(w, "Authorization failed: "+errMsg, http.StatusBadRequest) | ||
| errCh <- fmt.Errorf("authorization failed: %s: %s", errMsg, desc) | ||
| return | ||
| } | ||
| code := r.URL.Query().Get("code") | ||
| if code == "" { | ||
| http.Error(w, "Missing authorization code", http.StatusBadRequest) | ||
| errCh <- fmt.Errorf("callback missing authorization code") | ||
| return | ||
| } | ||
| w.Header().Set("Content-Type", "text/html") | ||
| fmt.Fprint(w, "<html><body><h2>Authorization successful</h2><p>You can close this window.</p></body></html>") | ||
| codeCh <- code | ||
| }) |
There was a problem hiding this comment.
The /callback handler accepts any code without validating an OAuth state value. Also, sends to errCh/codeCh are blocking; a repeated callback request (or one arriving after the first send) can deadlock the handler because the channels are size 1. Include and validate state, and make channel sends non-blocking (or close over a sync.Once) to avoid handler hangs.
| var target schema.Error | ||
| if errors.As(err, &target) { | ||
| response.Err = &target |
There was a problem hiding this comment.
errors.As(err, &target) will not match errors returned as *schema.Error (e.g. from schema.NewError(...)), because target is a value type. As a result, JSON-RPC error codes/messages from handlers are lost and replaced with code 0. Use a *schema.Error target (or change NewError/handlers to return value errors) so response errors preserve the original code/message.
| var target schema.Error | |
| if errors.As(err, &target) { | |
| response.Err = &target | |
| var target *schema.Error | |
| if errors.As(err, &target) { | |
| response.Err = target |
This is the AI-Slop version of the MCP Client, in order to research what it would take to create one. Not to be merged with main!