diff --git a/Cargo.lock b/Cargo.lock index c1f92b8..73c4d89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,27 @@ dependencies = [ "syn", ] +[[package]] +name = "agent-client-protocol-polyfill" +version = "0.11.1" +dependencies = [ + "agent-client-protocol", + "agent-client-protocol-schema", + "anyhow", + "async-stream", + "axum", + "futures", + "futures-concurrency", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "agent-client-protocol-rmcp" version = "0.11.1" diff --git a/Cargo.toml b/Cargo.toml index 52e5811..24d9c23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/agent-client-protocol-conductor", "src/agent-client-protocol-cookbook", "src/agent-client-protocol-derive", + "src/agent-client-protocol-polyfill", "src/agent-client-protocol-rmcp", "src/agent-client-protocol-test", "src/agent-client-protocol-trace-viewer", diff --git a/md/conductor.md b/md/conductor.md index a82d60d..7fe9531 100644 --- a/md/conductor.md +++ b/md/conductor.md @@ -93,12 +93,12 @@ See [Proxy Mode](#proxy-mode) below for hierarchical chain details. When components provide MCP servers with ACP transport (`"url": "acp:$UUID"`): -**If agent has `mcp_acp_transport` capability:** +**If agent has `mcpCapabilities.acp` capability:** - Pass through MCP server declarations unchanged - Agent handles `_mcp/*` messages natively -**If agent lacks `mcp_acp_transport` capability:** +**If agent lacks `mcpCapabilities.acp` capability:** - Bind TCP port for each ACP-transport MCP server - Transform MCP server spec to use `conductor mcp $port` diff --git a/md/mcp-bridge.md b/md/mcp-bridge.md index bdb353b..32b4342 100644 --- a/md/mcp-bridge.md +++ b/md/mcp-bridge.md @@ -53,7 +53,7 @@ sequenceDiagram Proxy->>Conductor: session/new {
mcp_servers: [{
name: "research-tools",
url: "acp:uuid-123"
}]
} - Note over Conductor: Detects acp: transport
Agent lacks mcp_acp_transport capability + Note over Conductor: Detects acp: transport
Agent lacks mcpCapabilities.acp Conductor->>Conductor: Bind TCP listener on port 54321 diff --git a/md/protocol.md b/md/protocol.md index 7746c57..c904e7d 100644 --- a/md/protocol.md +++ b/md/protocol.md @@ -286,21 +286,23 @@ Send an MCP notification over the ACP connection. Bidirectional like `_mcp/reque } ``` -### Agent Capability: `mcp_acp_transport` +### Agent Capability: `mcpCapabilities.acp` Agents that natively support MCP-over-ACP declare this capability: ```json { - "_meta": { - "mcp_acp_transport": true + "agentCapabilities": { + "mcpCapabilities": { + "acp": true + } } } ``` **Conductor behavior:** -- If the agent has `mcp_acp_transport: true`, conductor passes MCP server declarations through unchanged +- If the agent has `mcpCapabilities.acp: true`, conductor passes MCP server declarations through unchanged - If the agent lacks this capability, conductor performs **bridging adaptation**: 1. Binds a TCP port (e.g., `localhost:54321`) 2. Transforms MCP server to use `conductor mcp PORT` command with stdio transport diff --git a/md/proxying-acp.md b/md/proxying-acp.md index 020bbea..4271cf3 100644 --- a/md/proxying-acp.md +++ b/md/proxying-acp.md @@ -108,7 +108,7 @@ P/ACP's orchestrator is called the **Conductor** (binary name: `conductor`). The **Key adaptation: MCP Bridge** -- If the agent supports `mcp_acp_transport`, conductor passes MCP servers with ACP transport through unchanged +- If the agent supports `mcpCapabilities.acp`, conductor passes MCP servers with ACP transport through unchanged - If not, conductor spawns `conductor mcp $port` processes to bridge between stdio (MCP) and ACP messages - Components can provide MCP servers without requiring agent modifications - See "MCP Bridge" section in Implementation Details for full protocol @@ -216,9 +216,7 @@ An P/ACP-aware editor provides the following capability during ACP initializatio /// supports symposium proxy initialization. "_meta": { "symposium": { - "version": "1.0", - "html_panel": true, // or false, if this is the ToEditor proxy - "file_comment": true, // or false, if this is the ToEditor proxy + "version": "1.0" } } ``` @@ -358,26 +356,28 @@ Components declare MCP servers with ACP transport by using the HTTP MCP server f The `acp:$UUID` URL signals ACP transport. The component generates the UUID to identify which component handles calls to this MCP server. -#### Agent Capability: `mcp_acp_transport` +#### Agent Capability: `mcpCapabilities.acp` Agents that natively support MCP-over-ACP declare this capability: ```json { - "_meta": { - "mcp_acp_transport": true + "agentCapabilities": { + "mcpCapabilities": { + "acp": true + } } } ``` **Conductor behavior:** -- If the final agent has `mcp_acp_transport: true`, conductor passes MCP server declarations through unchanged +- If the final agent has `mcpCapabilities.acp: true`, conductor passes MCP server declarations through unchanged - If the final agent lacks this capability, conductor performs **bridging adaptation**: 1. Binds a fresh TCP port (e.g., `localhost:54321`) 2. Transforms the MCP server declaration to use `conductor mcp $port` as the command 3. Spawns `conductor mcp $port` which connects back via TCP and bridges to ACP messages - 4. Always advertises `mcp_acp_transport: true` to intermediate components + 4. Always advertises `mcpCapabilities.acp: true` to intermediate components #### Bridging Transformation Example @@ -393,7 +393,7 @@ Agents that natively support MCP-over-ACP declare this capability: } ``` -**Transformed spec (passed to agent without `mcp_acp_transport`):** +**Transformed spec (passed to agent without `mcpCapabilities.acp`):** ```json { @@ -588,12 +588,12 @@ These extensions are beyond the scope of this initial RFD and will be defined as **Phase 2: Conductor Agent Mode - MCP Detection & Bridging** - [ ] Detect `"transport": "http", "url": "acp:$UUID"` MCP servers in initialization -- [ ] Check final agent for `mcp_acp_transport` capability +- [ ] Check final agent for `mcpCapabilities.acp` capability - [ ] Bind ephemeral TCP ports when bridging needed - [ ] Transform MCP server specs to use `conductor mcp $port` - [ ] Spawn `conductor mcp $port` subprocess per ACP-transport MCP server - [ ] Store mapping: `UUID → TCP port → bridge process` -- [ ] Always advertise `mcp_acp_transport: true` to intermediate components +- [ ] Always advertise `mcpCapabilities.acp: true` to intermediate components - [ ] Integration test: full chain with MCP bridging **Phase 3: `_mcp/*` Message Routing** diff --git a/src/agent-client-protocol-conductor/CHANGELOG.md b/src/agent-client-protocol-conductor/CHANGELOG.md index 98aa90f..dff6406 100644 --- a/src/agent-client-protocol-conductor/CHANGELOG.md +++ b/src/agent-client-protocol-conductor/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Breaking Changes + +- **Removed `McpBridgeMode`** and the `mcp_bridge_mode` parameter from `ConductorImpl::new`, `new_agent`, and `new_proxy`. MCP-over-ACP bridging is no longer built into the conductor. Use `agent-client-protocol-polyfill::mcp_over_acp::McpOverAcpPolyfill` as a proxy in the chain instead. +- **Removed `conductor mcp $port` CLI subcommand.** The stdio↔TCP bridge subprocess is no longer needed. + ## [0.11.1](https://github.com/agentclientprotocol/rust-sdk/compare/agent-client-protocol-conductor-v0.11.0...agent-client-protocol-conductor-v0.11.1) - 2026-04-21 ### Other diff --git a/src/agent-client-protocol-conductor/Cargo.toml b/src/agent-client-protocol-conductor/Cargo.toml index 320dbbc..a584148 100644 --- a/src/agent-client-protocol-conductor/Cargo.toml +++ b/src/agent-client-protocol-conductor/Cargo.toml @@ -21,8 +21,6 @@ test-support = [] agent-client-protocol.workspace = true agent-client-protocol-trace-viewer.workspace = true anyhow.workspace = true -async-stream.workspace = true -axum.workspace = true chrono.workspace = true clap.workspace = true futures.workspace = true @@ -31,7 +29,6 @@ rustc-hash.workspace = true serde.workspace = true serde_json.workspace = true strip-ansi-escapes.workspace = true -thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true @@ -39,12 +36,14 @@ tracing-subscriber.workspace = true uuid.workspace = true [dev-dependencies] +agent-client-protocol = { workspace = true, features = ["unstable_mcp_over_acp"] } agent-client-protocol-test.workspace = true yopo.workspace = true expect-test.workspace = true regex.workspace = true rmcp = { workspace = true, features = ["client", "server", "transport-io", "transport-child-process"] } schemars.workspace = true +agent-client-protocol-polyfill = { path = "../agent-client-protocol-polyfill" } [lints] workspace = true diff --git a/src/agent-client-protocol-conductor/src/conductor.rs b/src/agent-client-protocol-conductor/src/conductor.rs index 25ae367..a12ed7f 100644 --- a/src/agent-client-protocol-conductor/src/conductor.rs +++ b/src/agent-client-protocol-conductor/src/conductor.rs @@ -110,39 +110,27 @@ //! - Modified `InitializeRequest` to forward downstream //! - `Vec` of spawned components -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use agent_client_protocol::{ Agent, BoxFuture, Client, Conductor, ConnectTo, Dispatch, DynConnectTo, Error, JsonRpcMessage, Proxy, Role, RunWithConnectionTo, role::HasPeer, util::MatchDispatch, }; use agent_client_protocol::{ - Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest, UntypedMessage, + Builder, ConnectionTo, JsonRpcNotification, JsonRpcRequest, SentRequest, }; use agent_client_protocol::{ HandleDispatchFrom, - schema::{InitializeProxyRequest, InitializeRequest, NewSessionRequest}, + schema::{InitializeProxyRequest, InitializeRequest}, util::MatchDispatchFrom, }; -use agent_client_protocol::{ - Handled, - schema::{ - McpConnectRequest, McpConnectResponse, McpDisconnectNotification, McpOverAcpMessage, - SuccessorMessage, - }, -}; +use agent_client_protocol::{Handled, schema::SuccessorMessage}; use futures::{ SinkExt, StreamExt, channel::mpsc::{self}, }; use tracing::{debug, info}; -use crate::conductor::mcp_bridge::{ - McpBridgeConnection, McpBridgeConnectionActor, McpBridgeListeners, -}; - -mod mcp_bridge; - /// The conductor manages the proxy chain lifecycle and message routing. /// /// It maintains connections to all components in the chain and routes messages @@ -153,22 +141,15 @@ pub struct ConductorImpl { host: Host, name: String, instantiator: Host::Instantiator, - mcp_bridge_mode: crate::McpBridgeMode, trace_writer: Option, } impl ConductorImpl { - pub fn new( - host: Host, - name: impl ToString, - instantiator: Host::Instantiator, - mcp_bridge_mode: crate::McpBridgeMode, - ) -> Self { + pub fn new(host: Host, name: impl ToString, instantiator: Host::Instantiator) -> Self { ConductorImpl { name: name.to_string(), host, instantiator, - mcp_bridge_mode, trace_writer: None, } } @@ -179,20 +160,15 @@ impl ConductorImpl { pub fn new_agent( name: impl ToString, instantiator: impl InstantiateProxiesAndAgent + 'static, - mcp_bridge_mode: crate::McpBridgeMode, ) -> Self { - ConductorImpl::new(Agent, name, Box::new(instantiator), mcp_bridge_mode) + ConductorImpl::new(Agent, name, Box::new(instantiator)) } } impl ConductorImpl { /// Create a conductor in proxy mode (forwards to another conductor). - pub fn new_proxy( - name: impl ToString, - instantiator: impl InstantiateProxies + 'static, - mcp_bridge_mode: crate::McpBridgeMode, - ) -> Self { - ConductorImpl::new(Proxy, name, Box::new(instantiator), mcp_bridge_mode) + pub fn new_proxy(name: impl ToString, instantiator: impl InstantiateProxies + 'static) -> Self { + ConductorImpl::new(Proxy, name, Box::new(instantiator)) } } @@ -244,9 +220,6 @@ impl ConductorImpl { conductor_rx, conductor_tx: conductor_tx.clone(), instantiator: Some(self.instantiator), - bridge_listeners: McpBridgeListeners::default(), - bridge_connections: HashMap::default(), - mcp_bridge_mode: self.mcp_bridge_mode, proxies: Vec::default(), successor: Arc::new(agent_client_protocol::util::internal_error( "successor not initialized", @@ -341,12 +314,6 @@ where conductor_tx: mpsc::Sender, - /// Manages the TCP listeners for MCP connections that will be proxied over ACP. - bridge_listeners: McpBridgeListeners, - - /// Manages active connections to MCP clients. - bridge_connections: HashMap, - /// The instantiator for lazy initialization. /// Set to None after components are instantiated. instantiator: Option, @@ -361,9 +328,6 @@ where /// Populated lazily when the first Initialize request is received; the initial value just returns errors. successor: Arc>, - /// Mode for the MCP bridge (determines how to spawn bridge processes). - mcp_bridge_mode: crate::McpBridgeMode, - /// Optional trace handle for sequence diagram visualization. trace_handle: Option, @@ -379,10 +343,7 @@ where f.debug_struct("ConductorResponder") .field("conductor_rx", &self.conductor_rx) .field("conductor_tx", &self.conductor_tx) - .field("bridge_listeners", &self.bridge_listeners) - .field("bridge_connections", &self.bridge_connections) .field("proxies", &self.proxies) - .field("mcp_bridge_mode", &self.mcp_bridge_mode) .field("trace_handle", &self.trace_handle) .field("host", &self.host) .finish_non_exhaustive() @@ -470,99 +431,6 @@ where ); self.send_message_to_predecessor_of(client, source_component_index, message) } - - // New MCP connection request. Send it back along the chain to get a connection id. - // When the connection id arrives, send a message back into this conductor loop with - // the connection id and the (as yet unspawned) actor. - ConductorMessage::McpConnectionReceived { - acp_url, - connection, - actor, - } => { - // MCP connection requests always come from the agent - // (we must be in agent mode, in fact), so send the MCP request - // to the final proxy. - self.send_request_to_predecessor_of( - client, - self.proxies.len(), - McpConnectRequest { - acp_url, - meta: None, - }, - ) - .on_receiving_result({ - let mut conductor_tx = self.conductor_tx.clone(); - async move |result| { - match result { - Ok(response) => conductor_tx - .send(ConductorMessage::McpConnectionEstablished { - response, - actor, - connection, - }) - .await - .map_err(|_| agent_client_protocol::Error::internal_error()), - Err(_) => { - // Error occurred, just drop the connection. - Ok(()) - } - } - } - }) - } - - // MCP connection successfully established. Spawn the actor - // and insert the connection into our map for future reference. - ConductorMessage::McpConnectionEstablished { - response: McpConnectResponse { connection_id, .. }, - actor, - connection, - } => { - self.bridge_connections - .insert(connection_id.clone(), connection); - client.spawn(actor.run(connection_id)) - } - - // Message meant for the MCP client received. Forward it to the appropriate actor's mailbox. - ConductorMessage::McpClientToMcpServer { - connection_id, - message, - } => { - let wrapped = message.map( - |request, responder| { - ( - McpOverAcpMessage { - connection_id: connection_id.clone(), - message: request, - meta: None, - }, - responder, - ) - }, - |notification| McpOverAcpMessage { - connection_id: connection_id.clone(), - message: notification, - meta: None, - }, - ); - - // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent, - // so send them to the final proxy. - self.send_message_to_predecessor_of( - client, - SourceComponentIndex::Successor, - wrapped, - ) - } - - // MCP client disconnected. Remove it from our map and send the - // notification backwards along the chain. - ConductorMessage::McpConnectionDisconnected { notification } => { - // We only get MCP-over-ACP requests when we are in bridging MCP for the final agent. - - self.bridge_connections.remove(¬ification.connection_id); - self.send_notification_to_predecessor_of(client, self.proxies.len(), notification) - } } } @@ -916,7 +784,7 @@ where /// running as a proxy). async fn forward_message_to_agent( &mut self, - client_connection: ConnectionTo, + _client_connection: ConnectionTo, message: Dispatch, agent_connection: ConnectionTo, ) -> Result<(), Error> { @@ -928,63 +796,8 @@ where ) }) .await - .if_request(async |mut request: NewSessionRequest, responder| { - // When forwarding "session/new" to the agent, - // we adjust MCP servers to manage "acp:" URLs. - for mcp_server in &mut request.mcp_servers { - self.bridge_listeners - .transform_mcp_server( - client_connection.clone(), - mcp_server, - &self.conductor_tx, - &self.mcp_bridge_mode, - ) - .await?; - } - - agent_connection - .send_request(request) - .forward_response_to(responder) - }) - .await - .if_request( - async |request: McpOverAcpMessage, responder| { - let McpOverAcpMessage { - connection_id, - message: mcp_request, - .. - } = request; - self.bridge_connections - .get_mut(&connection_id) - .ok_or_else(|| { - agent_client_protocol::util::internal_error(format!( - "unknown connection id: {connection_id}" - )) - })? - .send(Dispatch::Request(mcp_request, responder)) - .await - }, - ) - .await - .if_notification(async |notification: McpOverAcpMessage| { - let McpOverAcpMessage { - connection_id, - message: mcp_notification, - .. - } = notification; - self.bridge_connections - .get_mut(&connection_id) - .ok_or_else(|| { - agent_client_protocol::util::internal_error(format!( - "unknown connection id: {connection_id}" - )) - })? - .send(Dispatch::Notification(mcp_notification)) - .await - }) - .await .otherwise(async |message| { - // Otherwise, just send the message along "as is". + // Forward all other messages to the agent as-is. agent_connection.send_proxied_message_to(Agent, message) }) .await @@ -1279,47 +1092,6 @@ pub enum ConductorMessage { source_component_index: SourceComponentIndex, message: Dispatch, }, - - /// A pending MCP bridge connection request request. - /// The request must be sent back over ACP to receive the connection-id. - /// Once the connection-id is received, the actor must be spawned. - McpConnectionReceived { - /// The acp:$UUID URL identifying this bridge - acp_url: String, - - /// The actor that should be spawned once the connection-id is available. - actor: McpBridgeConnectionActor, - - /// The connection to the bridge - connection: McpBridgeConnection, - }, - - /// A pending MCP bridge connection request request. - /// The request must be sent back over ACP to receive the connection-id. - /// Once the connection-id is received, the actor must be spawned. - McpConnectionEstablished { - response: McpConnectResponse, - - /// The actor that should be spawned once the connection-id is available. - actor: McpBridgeConnectionActor, - - /// The connection to the bridge - connection: McpBridgeConnection, - }, - - /// MCP message (request or notification) received from a bridge that needs to be routed to the final proxy. - /// - /// Sent when the bridge receives an MCP tool call from the agent and forwards it - /// to the conductor via TCP. The conductor routes this to the appropriate proxy component. - McpClientToMcpServer { - connection_id: String, - message: Dispatch, - }, - - /// Message sent when MCP client disconnects - McpConnectionDisconnected { - notification: McpDisconnectNotification, - }, } /// Trait implemented for the two links the conductor can use: diff --git a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge.rs b/src/agent-client-protocol-conductor/src/conductor/mcp_bridge.rs deleted file mode 100644 index ed4b07a..0000000 --- a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge.rs +++ /dev/null @@ -1,170 +0,0 @@ -pub mod actor; -pub mod http; -pub mod stdio; - -use std::collections::HashMap; -use std::path::PathBuf; - -use agent_client_protocol::schema::{McpServer, McpServerHttp, McpServerStdio}; -use agent_client_protocol::{ConnectionTo, Dispatch, Role}; -use futures::{SinkExt, channel::mpsc}; -use tokio::net::TcpListener; -use tracing::info; - -pub use self::actor::McpBridgeConnectionActor; -use crate::conductor::ConductorMessage; - -/// Maintains bridges for MCP message routing. -#[derive(Default, Debug)] -pub struct McpBridgeListeners { - /// Mapping of acp:$UUID URLs to TCP bridge information for MCP message routing - listeners: HashMap, -} - -/// Information about an MCP bridge that is listening for connections from MCP clients. -#[derive(Clone, Debug)] -pub(super) struct McpBridgeListener { - /// The replacement MCP server - pub server: McpServer, -} - -/// Connection handle for sending messages to an MCP client. -#[derive(Clone, Debug)] -pub struct McpBridgeConnection { - /// Channel to send messages from MCP server (ACP proxy) to the MCP client (ACP agent). - to_mcp_client_tx: mpsc::Sender, -} - -impl McpBridgeConnection { - pub fn new(to_mcp_client_tx: mpsc::Sender) -> Self { - Self { to_mcp_client_tx } - } - - pub async fn send(&mut self, message: Dispatch) -> Result<(), agent_client_protocol::Error> { - self.to_mcp_client_tx - .send(message) - .await - .map_err(|_| agent_client_protocol::Error::internal_error()) - } -} - -impl McpBridgeListeners { - /// Transforms MCP servers with `acp:$UUID` URLs for agents that need bridging. - /// - /// For each MCP server with an `acp:` URL: - /// 1. Spawns a TCP listener on an ephemeral port - /// 2. Stores the mapping for message routing - /// 3. Transforms the server to use either stdio or HTTP transport depending on bridge mode - /// - /// Other MCP servers are left unchanged. - pub async fn transform_mcp_server( - &mut self, - connection: ConnectionTo, - mcp_server: &mut McpServer, - conductor_tx: &mpsc::Sender, - mcp_bridge_mode: &crate::McpBridgeMode, - ) -> Result<(), agent_client_protocol::Error> { - use agent_client_protocol::schema::McpServer; - - let McpServer::Http(http) = mcp_server else { - return Ok(()); - }; - - if !http.url.starts_with("acp:") { - return Ok(()); - } - - if !http.headers.is_empty() { - return Err(agent_client_protocol::Error::internal_error()); - } - - let name = &http.name; - let url = &http.url; - - info!( - server_name = name, - acp_url = url, - "Detected MCP server with ACP transport, spawning TCP bridge" - ); - - // Create oneshot channel for session_id delivery - let transformed = self - .spawn_bridge(connection, name, url, conductor_tx, mcp_bridge_mode) - .await?; - *mcp_server = transformed; - Ok(()) - } - - /// Spawn a bridge listener (HTTP or stdio) for an MCP server with ACP transport - async fn spawn_bridge( - &mut self, - connection: ConnectionTo, - server_name: &str, - acp_url: &str, - conductor_tx: &mpsc::Sender, - mcp_bridge_mode: &crate::McpBridgeMode, - ) -> anyhow::Result { - // If there is already a listener for the ACP URL, return its server - if let Some(listener) = self.listeners.get(acp_url) { - return Ok(listener.server.clone()); - } - - // Bind to ephemeral port - let tcp_listener = TcpListener::bind("127.0.0.1:0").await?; - let tcp_port = tcp_listener.local_addr()?.port(); - - info!(acp_url = acp_url, tcp_port, "Bound listener for MCP bridge"); - - let new_server = match mcp_bridge_mode { - crate::McpBridgeMode::Stdio { conductor_command } => McpServer::Stdio( - McpServerStdio::new( - server_name.to_string(), - PathBuf::from(&conductor_command[0]), - ) - .args( - conductor_command[1..] - .iter() - .cloned() - .chain(vec!["mcp".to_string(), format!("{tcp_port}")]) - .collect::>(), - ), - ), - - crate::McpBridgeMode::Http => McpServer::Http(McpServerHttp::new( - server_name.to_string(), - format!("http://localhost:{tcp_port}"), - )), - }; - - // remember for later - self.listeners.insert( - acp_url.to_string(), - McpBridgeListener { - server: new_server.clone(), - }, - ); - - connection.spawn({ - let acp_url = acp_url.to_string(); - let conductor_tx = conductor_tx.clone(); - let mcp_bridge_mode = mcp_bridge_mode.clone(); - async move { - info!( - acp_url = acp_url, - tcp_port, "now accepting bridge connections" - ); - - match mcp_bridge_mode { - crate::McpBridgeMode::Stdio { - conductor_command: _, - } => stdio::run_tcp_listener(tcp_listener, acp_url, conductor_tx).await, - crate::McpBridgeMode::Http => { - http::run_http_listener(tcp_listener, acp_url, conductor_tx).await - } - } - } - })?; - - Ok(new_server) - } -} diff --git a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/stdio.rs b/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/stdio.rs deleted file mode 100644 index fd85ee4..0000000 --- a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/stdio.rs +++ /dev/null @@ -1,54 +0,0 @@ -use agent_client_protocol::Dispatch; -use futures::{SinkExt, channel::mpsc}; -use tokio::net::{TcpListener, TcpStream}; -use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; - -use crate::conductor::ConductorMessage; - -use super::{McpBridgeConnection, McpBridgeConnectionActor}; - -/// Runs the stdio bridge TCP listener, accepting connections and creating bridge actors for each. -/// -/// Loops indefinitely, accepting incoming TCP connections and spawning an MCP bridge actor -/// for each connection to handle bidirectional message forwarding between the MCP client -/// and the conductor. -pub async fn run_tcp_listener( - tcp_listener: TcpListener, - acp_url: String, - mut conductor_tx: mpsc::Sender, -) -> Result<(), agent_client_protocol::Error> { - // Accept connections - loop { - let (stream, _addr) = tcp_listener - .accept() - .await - .map_err(agent_client_protocol::Error::into_internal_error)?; - - let (to_mcp_client_tx, to_mcp_client_rx) = mpsc::channel(128); - - conductor_tx - .send(ConductorMessage::McpConnectionReceived { - acp_url: acp_url.clone(), - actor: make_stdio_actor(stream, conductor_tx.clone(), to_mcp_client_rx), - connection: McpBridgeConnection::new(to_mcp_client_tx), - }) - .await - .map_err(|_| agent_client_protocol::Error::internal_error())?; - } -} - -fn make_stdio_actor( - stream: TcpStream, - conductor_tx: mpsc::Sender, - to_mcp_client_rx: mpsc::Receiver, -) -> McpBridgeConnectionActor { - let (read_half, write_half) = stream.into_split(); - - // Establish bidirectional JSON-RPC connection - // The bridge will send MCP requests (tools/call, etc.) to the conductor - // The conductor can also send responses back - let transport = - agent_client_protocol::ByteStreams::new(write_half.compat_write(), read_half.compat()); - - McpBridgeConnectionActor::new(transport, conductor_tx, to_mcp_client_rx) -} diff --git a/src/agent-client-protocol-conductor/src/lib.rs b/src/agent-client-protocol-conductor/src/lib.rs index d82b63a..b886f40 100644 --- a/src/agent-client-protocol-conductor/src/lib.rs +++ b/src/agent-client-protocol-conductor/src/lib.rs @@ -28,17 +28,6 @@ //! 3. Presents as a single agent on stdin/stdout //! 4. Manages the lifecycle of all processes //! -//! ### MCP Bridge Mode -//! -//! Connect stdio to a TCP-based MCP server: -//! -//! ```bash -//! # Bridge stdio to MCP server on localhost:8080 -//! agent-client-protocol-conductor mcp 8080 -//! ``` -//! -//! This allows stdio-based tools to communicate with TCP MCP servers. -//! //! ## How It Works //! //! **Component Communication:** @@ -81,8 +70,6 @@ use std::str::FromStr; mod conductor; /// Debug logging for conductor mod debug_logger; -/// MCP bridge functionality for TCP-based MCP servers -mod mcp_bridge; mod snoop; /// Trace event types for sequence diagram viewer pub mod trace; @@ -168,21 +155,6 @@ impl trace::WriteEvent for TraceHandleWriter { } } -/// Mode for the MCP bridge. -#[derive(Debug, Clone, Default)] -pub enum McpBridgeMode { - /// Use stdio-based MCP bridge with a conductor subprocess. - Stdio { - /// Command and args to spawn conductor MCP bridge processes. - /// E.g., vec!["conductor"] or vec!["cargo", "run", "-p", "conductor", "--"] - conductor_command: Vec, - }, - - /// Use HTTP-based MCP bridge - #[default] - Http, -} - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] pub struct ConductorArgs { @@ -234,12 +206,6 @@ pub enum ConductorCommand { /// List of proxy commands to chain together proxies: Vec, }, - - /// Run as MCP bridge connecting stdio to TCP - Mcp { - /// TCP port to connect to on localhost - port: u16, - }, } impl ConductorArgs { @@ -255,7 +221,6 @@ impl ConductorArgs { let components = match &self.command { ConductorCommand::Agent { components, .. } => components.clone(), ConductorCommand::Proxy { proxies, .. } => proxies.clone(), - ConductorCommand::Mcp { .. } => Vec::new(), }; // Create debug logger @@ -348,7 +313,6 @@ impl ConductorArgs { ) .await } - ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await, } } } @@ -358,11 +322,7 @@ async fn initialize_conductor( trace_writer: Option, name: String, components: Vec, - new_conductor: impl FnOnce( - String, - CommandLineComponents, - crate::McpBridgeMode, - ) -> ConductorImpl, + new_conductor: impl FnOnce(String, CommandLineComponents) -> ConductorImpl, ) -> Result<(), agent_client_protocol::Error> { // Parse agents and optionally wrap with debug callbacks let providers: Vec = components @@ -385,11 +345,7 @@ async fn initialize_conductor( }; // Create conductor with optional trace writer - let mut conductor = new_conductor( - name, - CommandLineComponents(providers), - McpBridgeMode::default(), - ); + let mut conductor = new_conductor(name, CommandLineComponents(providers)); if let Some(writer) = trace_writer { conductor = conductor.with_trace_writer(writer); } diff --git a/src/agent-client-protocol-conductor/src/mcp_bridge.rs b/src/agent-client-protocol-conductor/src/mcp_bridge.rs deleted file mode 100644 index 788beda..0000000 --- a/src/agent-client-protocol-conductor/src/mcp_bridge.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! MCP Bridge: Bridges MCP JSON-RPC over stdio to TCP connection -//! -//! This module implements `conductor mcp $port` mode, which acts as an MCP server -//! over stdio but forwards all messages to/from a TCP connection on localhost:$port. -//! -//! The main conductor (in agent mode) listens on the TCP port and translates between -//! TCP (raw JSON-RPC) and ACP `_mcp/*` extension messages. - -use anyhow::Context; -use serde_json::Value; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::TcpStream; - -/// Run the MCP bridge: stdio ↔ TCP -/// -/// Reads MCP JSON-RPC messages from stdin, forwards to TCP connection. -/// Reads responses from TCP, writes to stdout. -pub async fn run_mcp_bridge(port: u16) -> Result<(), agent_client_protocol::Error> { - tracing::info!("MCP bridge starting, connecting to localhost:{}", port); - - // Connect to the main conductor via TCP - let stream = connect_with_retry(port).await?; - let (tcp_read, mut tcp_write) = stream.into_split(); - - // Set up stdio - let stdin = tokio::io::stdin(); - let stdout = tokio::io::stdout(); - let mut stdin_reader = BufReader::new(stdin); - let mut stdout_writer = stdout; - let mut tcp_reader = BufReader::new(tcp_read); - - // Prepare line buffers - let mut stdin_line = String::new(); - let mut tcp_line = String::new(); - - tracing::info!("MCP bridge connected, starting message loop"); - - loop { - tokio::select! { - // Read from stdin → send to TCP - result = stdin_reader.read_line(&mut stdin_line) => { - let n = result.context("Failed to read from stdin")?; - - if n == 0 { - tracing::info!("Stdin closed, shutting down bridge"); - break; - } - - // Parse to validate JSON - drop(serde_json::from_str::(stdin_line.trim()) - .context("Invalid JSON from stdin")?); - - tracing::debug!("Bridge: stdin → TCP: {}", stdin_line.trim()); - - // Forward to TCP - tcp_write.write_all(stdin_line.as_bytes()).await - .context("Failed to write to TCP")?; - tcp_write.flush().await - .context("Failed to flush TCP")?; - - stdin_line.clear(); - } - - // Read from TCP → send to stdout - result = tcp_reader.read_line(&mut tcp_line) => { - let n = result.context("Failed to read from TCP")?; - - if n == 0 { - tracing::info!("TCP connection closed, shutting down bridge"); - break; - } - - // Parse to validate JSON - drop(serde_json::from_str::(tcp_line.trim()) - .context("Invalid JSON from TCP")?); - - tracing::debug!("Bridge: TCP → stdout: {}", tcp_line.trim()); - - // Forward to stdout - stdout_writer.write_all(tcp_line.as_bytes()).await - .context("Failed to write to stdout")?; - stdout_writer.flush().await - .context("Failed to flush stdout")?; - - tcp_line.clear(); - } - } - } - - tracing::info!("MCP bridge shutting down"); - Ok(()) -} - -/// Connect to TCP port with retry logic -async fn connect_with_retry(port: u16) -> Result { - let max_retries = 10; - let mut retry_delay_ms = 50; - let mut last_error = None; - - for attempt in 1..=max_retries { - match TcpStream::connect(format!("127.0.0.1:{port}")).await { - Ok(stream) => { - tracing::info!("Connected to localhost:{} on attempt {}", port, attempt); - return Ok(stream); - } - Err(e) => { - tracing::debug!( - "Connection attempt {} failed: {}, retrying in {}ms", - attempt, - e, - retry_delay_ms - ); - last_error = Some(e); - if attempt < max_retries { - tokio::time::sleep(tokio::time::Duration::from_millis(retry_delay_ms)).await; - retry_delay_ms = (retry_delay_ms * 2).min(1000); - } - } - } - } - - Err(agent_client_protocol::Error::into_internal_error( - last_error.expect("loop ran at least once"), - )) -} diff --git a/src/agent-client-protocol-conductor/tests/arrow_proxy_eliza.rs b/src/agent-client-protocol-conductor/tests/arrow_proxy_eliza.rs index c39266b..e64f9e4 100644 --- a/src/agent-client-protocol-conductor/tests/arrow_proxy_eliza.rs +++ b/src/agent-client-protocol-conductor/tests/arrow_proxy_eliza.rs @@ -8,7 +8,7 @@ //! Run `just prep-tests` before running this test. use agent_client_protocol::AcpAgent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::test_binaries::{arrow_proxy_example, testy}; use agent_client_protocol_test::testy::TestyCommand; use tokio::io::duplex; @@ -32,7 +32,6 @@ async fn test_conductor_with_arrow_proxy_and_test_agent() -> Result<(), agent_cl ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(test_agent).proxy(arrow_proxy_agent), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/empty_conductor_eliza.rs b/src/agent-client-protocol-conductor/tests/empty_conductor_eliza.rs index c1f87a8..87faa5a 100644 --- a/src/agent-client-protocol-conductor/tests/empty_conductor_eliza.rs +++ b/src/agent-client-protocol-conductor/tests/empty_conductor_eliza.rs @@ -7,7 +7,7 @@ //! 4. The full chain works end-to-end use agent_client_protocol::{Conductor, ConnectTo, Proxy}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use tokio::io::duplex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; @@ -24,11 +24,7 @@ impl ConnectTo for MockEmptyConductor { // Create an empty conductor with no components - it should act as a passthrough let empty_components: Vec> = vec![]; ConnectTo::::connect_to( - ConductorImpl::new_proxy( - "empty-conductor".to_string(), - empty_components, - McpBridgeMode::default(), - ), + ConductorImpl::new_proxy("empty-conductor".to_string(), empty_components), client, ) .await @@ -57,7 +53,6 @@ async fn test_conductor_with_empty_conductor_and_test_agent() ConductorImpl::new_agent( "outer-conductor".to_string(), ProxiesAndAgent::new(Testy::new()).proxy(MockEmptyConductor), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/initialization_sequence.rs b/src/agent-client-protocol-conductor/tests/initialization_sequence.rs index dcf0a5a..03e9b34 100644 --- a/src/agent-client-protocol-conductor/tests/initialization_sequence.rs +++ b/src/agent-client-protocol-conductor/tests/initialization_sequence.rs @@ -10,7 +10,7 @@ use agent_client_protocol::schema::{ ProtocolVersion, }; use agent_client_protocol::{Agent, Client, Conductor, ConnectTo, DynConnectTo, Proxy}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::testy::Testy; use std::sync::Arc; use std::sync::Mutex; @@ -137,7 +137,6 @@ async fn run_test_with_components( ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(Testy::new()).proxies(proxies), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_out.compat_write(), @@ -305,7 +304,6 @@ async fn run_bad_proxy_test( ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(agent).proxies(proxies), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_out.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/mcp-integration.rs b/src/agent-client-protocol-conductor/tests/mcp-integration.rs index 174f0b9..1464c22 100644 --- a/src/agent-client-protocol-conductor/tests/mcp-integration.rs +++ b/src/agent-client-protocol-conductor/tests/mcp-integration.rs @@ -12,8 +12,8 @@ use agent_client_protocol::schema::{ ContentBlock, InitializeRequest, NewSessionRequest, PromptRequest, ProtocolVersion, SessionNotification, TextContent, }; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; -use agent_client_protocol_test::test_binaries; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use futures::{SinkExt, StreamExt, channel::mpsc}; @@ -33,13 +33,7 @@ async fn recv( .map_err(|_| agent_client_protocol::Error::internal_error())? } -fn conductor_command() -> Vec { - let binary_path = test_binaries::conductor_binary(); - vec![binary_path.to_string_lossy().to_string()] -} - async fn run_test_with_mode( - mode: McpBridgeMode, components: ProxiesAndAgent, editor_task: impl AsyncFnOnce( agent_client_protocol::ConnectionTo, @@ -64,7 +58,7 @@ async fn run_test_with_mode( .builder() .name("editor-to-connector") .with_spawned(|_cx| async move { - ConductorImpl::new_agent("conductor".to_string(), components, mode) + ConductorImpl::new_agent("conductor".to_string(), components) .run(agent_client_protocol::ByteStreams::new( conductor_out.compat_write(), conductor_in.compat(), @@ -79,10 +73,9 @@ async fn run_test_with_mode( #[tokio::test] async fn test_proxy_provides_mcp_tools_stdio() -> Result<(), agent_client_protocol::Error> { run_test_with_mode( - McpBridgeMode::Stdio { - conductor_command: conductor_command(), - }, - ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent), + ProxiesAndAgent::new(Testy::new()) + .proxy(mcp_integration::proxy::ProxyComponent) + .proxy(McpOverAcpPolyfill::http()), async |connection_to_editor| { // Send initialization request let init_response = recv( @@ -123,8 +116,9 @@ async fn test_proxy_provides_mcp_tools_stdio() -> Result<(), agent_client_protoc #[tokio::test] async fn test_proxy_provides_mcp_tools_http() -> Result<(), agent_client_protocol::Error> { run_test_with_mode( - McpBridgeMode::Http, - ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent), + ProxiesAndAgent::new(Testy::new()) + .proxy(mcp_integration::proxy::ProxyComponent) + .proxy(McpOverAcpPolyfill::http()), async |connection_to_editor| { // Send initialization request let init_response = recv( @@ -182,8 +176,9 @@ async fn test_agent_handles_prompt() -> Result<(), agent_client_protocol::Error> let conductor_handle = tokio::spawn(async move { ConductorImpl::new_agent( "mcp-integration-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(mcp_integration::proxy::ProxyComponent) + .proxy(McpOverAcpPolyfill::http()), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs b/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs index c2b1f9c..a55b404 100644 --- a/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs +++ b/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs @@ -11,7 +11,7 @@ use agent_client_protocol::schema::{ NewSessionResponse, ProtocolVersion, SessionId, }; use agent_client_protocol::{Agent, Client, Conductor, ConnectTo, DynConnectTo, Proxy}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -172,7 +172,6 @@ async fn run_test( ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(agent).proxies(proxies), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_out.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/nested_arrow_proxy.rs b/src/agent-client-protocol-conductor/tests/nested_arrow_proxy.rs index 78fc58f..6539226 100644 --- a/src/agent-client-protocol-conductor/tests/nested_arrow_proxy.rs +++ b/src/agent-client-protocol-conductor/tests/nested_arrow_proxy.rs @@ -15,7 +15,7 @@ //! Run `just prep-tests` before running this test. use agent_client_protocol::AcpAgent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::test_binaries::{arrow_proxy_example, testy}; use agent_client_protocol_test::testy::TestyCommand; use tokio::io::duplex; @@ -41,7 +41,6 @@ async fn test_conductor_with_two_external_arrow_proxies() -> Result<(), agent_cl ProxiesAndAgent::new(agent) .proxy(arrow_proxy1) .proxy(arrow_proxy2), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/nested_conductor.rs b/src/agent-client-protocol-conductor/tests/nested_conductor.rs index db53d96..c6ee377 100644 --- a/src/agent-client-protocol-conductor/tests/nested_conductor.rs +++ b/src/agent-client-protocol-conductor/tests/nested_conductor.rs @@ -21,7 +21,7 @@ use agent_client_protocol::AcpAgent; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::arrow_proxy::run_arrow_proxy; use agent_client_protocol_test::test_binaries::{arrow_proxy_example, conductor_binary, testy}; use agent_client_protocol_test::testy::{Testy, TestyCommand}; @@ -70,7 +70,6 @@ impl ConnectTo for MockInnerConductor { agent_client_protocol_conductor::ConductorImpl::new_proxy( "inner-conductor".to_string(), components, - McpBridgeMode::default(), ), client, ) @@ -93,7 +92,6 @@ async fn test_nested_conductor_with_arrow_proxies() -> Result<(), agent_client_p ConductorImpl::new_agent( "outer-conductor".to_string(), ProxiesAndAgent::new(Testy::new()).proxy(MockInnerConductor::new(2)), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), @@ -162,7 +160,6 @@ async fn test_nested_conductor_with_external_arrow_proxies() ConductorImpl::new_agent( "outer-conductor".to_string(), ProxiesAndAgent::new(agent).proxy(inner_conductor), - McpBridgeMode::default(), ) .run(agent_client_protocol::ByteStreams::new( conductor_write.compat_write(), diff --git a/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs b/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs index adb1f07..549beca 100644 --- a/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs +++ b/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs @@ -6,7 +6,8 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Agent, Conductor, ConnectTo, Proxy, Role, RunWithConnectionTo}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -20,8 +21,9 @@ use std::sync::Mutex; async fn test_scoped_mcp_server_through_proxy() -> Result<(), agent_client_protocol::Error> { let conductor = ConductorImpl::new_agent( "conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(ScopedProxy), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(ScopedProxy) + .proxy(McpOverAcpPolyfill::http()), ); let result = yopo::prompt( @@ -54,8 +56,7 @@ async fn test_scoped_mcp_server_through_session() -> Result<(), agent_client_pro .connect_with( ConductorImpl::new_agent( "conductor".to_string(), - ProxiesAndAgent::new(Testy::new()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()).proxy(McpOverAcpPolyfill::http()), ), async |cx| { // Initialize first diff --git a/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs b/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs index 5b65118..791f069 100644 --- a/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs +++ b/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs @@ -5,7 +5,8 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -60,8 +61,9 @@ async fn test_tool_returning_string() -> Result<(), agent_client_protocol::Error let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_test_proxy()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_test_proxy()) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "test_server".to_string(), @@ -86,8 +88,9 @@ async fn test_tool_returning_integer() -> Result<(), agent_client_protocol::Erro let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_test_proxy()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_test_proxy()) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "test_server".to_string(), diff --git a/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs b/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs index 27682fa..5d6a5f5 100644 --- a/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs +++ b/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs @@ -11,7 +11,8 @@ use agent_client_protocol::RunWithConnectionTo; use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -23,7 +24,7 @@ struct EchoInput {} /// Output from the echo tool containing the session_id #[derive(Debug, Serialize, Deserialize, JsonSchema)] struct EchoOutput { - acp_url: String, + acp_id: String, } /// Create a proxy that provides an MCP server with a session_id echo tool @@ -36,7 +37,7 @@ fn create_echo_proxy() -> DynConnectTo { "Returns the current session_id", async |_input: EchoInput, context| { Ok(EchoOutput { - acp_url: context.acp_url(), + acp_id: context.acp_id(), }) }, agent_client_protocol::tool_fn_mut!(), @@ -74,8 +75,9 @@ async fn test_list_tools_from_mcp_server() -> Result<(), agent_client_protocol:: let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_echo_proxy()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_echo_proxy()) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::ListTools { server: "echo_server".to_string(), @@ -98,8 +100,9 @@ async fn test_session_id_delivered_to_mcp_tools() -> Result<(), agent_client_pro let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_echo_proxy()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_echo_proxy()) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "echo_server".to_string(), @@ -110,7 +113,7 @@ async fn test_session_id_delivered_to_mcp_tools() -> Result<(), agent_client_pro ) .await?; - let pattern = regex::Regex::new(r#""acp_url":\s*String\("acp:[0-9a-f-]+"\)"#).unwrap(); + let pattern = regex::Regex::new(r#""acp_id":\s*String\("acp:[0-9a-f-]+"\)"#).unwrap(); assert!(pattern.is_match(&result), "unexpected result: {result}"); Ok(()) diff --git a/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs b/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs index cb1b865..6c68b7b 100644 --- a/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs +++ b/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs @@ -5,7 +5,8 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -111,8 +112,9 @@ async fn test_list_tools_excludes_disabled() -> Result<(), agent_client_protocol let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_disabled_tool()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_disabled_tool()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::ListTools { server: "test_server".to_string(), @@ -137,8 +139,9 @@ async fn test_enabled_tool_can_be_called() -> Result<(), agent_client_protocol:: let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_disabled_tool()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_disabled_tool()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "test_server".to_string(), @@ -162,8 +165,9 @@ async fn test_disabled_tool_returns_not_found() -> Result<(), agent_client_proto let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_disabled_tool()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_disabled_tool()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "test_server".to_string(), @@ -192,8 +196,9 @@ async fn test_allowlist_only_shows_enabled_tools() -> Result<(), agent_client_pr let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_allowlist()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_allowlist()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::ListTools { server: "allowlist_server".to_string(), @@ -221,8 +226,9 @@ async fn test_allowlist_enabled_tool_works() -> Result<(), agent_client_protocol let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_allowlist()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_allowlist()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "allowlist_server".to_string(), @@ -247,8 +253,9 @@ async fn test_allowlist_non_enabled_tool_returns_not_found() let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_proxy_with_allowlist()?), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_proxy_with_allowlist()?) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "allowlist_server".to_string(), diff --git a/src/agent-client-protocol-conductor/tests/test_tool_fn.rs b/src/agent-client-protocol-conductor/tests/test_tool_fn.rs index b0fbb43..b912847 100644 --- a/src/agent-client-protocol-conductor/tests/test_tool_fn.rs +++ b/src/agent-client-protocol-conductor/tests/test_tool_fn.rs @@ -5,7 +5,8 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -58,8 +59,9 @@ async fn test_tool_fn_greet() -> Result<(), agent_client_protocol::Error> { let result = yopo::prompt( ConductorImpl::new_agent( "test-conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(create_greet_proxy()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(create_greet_proxy()) + .proxy(McpOverAcpPolyfill::http()), ), TestyCommand::CallTool { server: "greet_server".to_string(), diff --git a/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs b/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs index dd2e7f8..1bf93c4 100644 --- a/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs +++ b/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs @@ -13,7 +13,8 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion}; use agent_client_protocol::{Client, Role, RunWithConnectionTo}; use agent_client_protocol_conductor::trace::TraceEvent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use expect_test::expect; use futures::StreamExt; @@ -123,7 +124,7 @@ impl EventNormalizer { } else { self.normalize_json(v) } - } else if k == "url" || k == "acp_url" { + } else if k == "url" || k == "acp_id" { if let serde_json::Value::String(s) = &v { if s.starts_with("acp:") || s.starts_with("http://localhost:") { serde_json::Value::String(self.normalize_acp_url(s)) @@ -229,8 +230,7 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err let conductor_handle = tokio::spawn(async move { ConductorImpl::new_agent( "conductor".to_string(), - ProxiesAndAgent::new(Testy::new()), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()).proxy(McpOverAcpPolyfill::http()), ) .trace_to(trace_tx) .run(agent_client_protocol::ByteStreams::new( @@ -310,15 +310,12 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err ts: 0.0, protocol: Acp, from: "Client", - to: "Agent", + to: "Proxy(0)", id: String("id:0"), - method: "initialize", + method: "_proxy/initialize", session: None, params: Object { "clientCapabilities": Object { - "auth": Object { - "terminal": Bool(false), - }, "fs": Object { "readTextFile": Bool(false), "writeTextFile": Bool(false), @@ -332,15 +329,15 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err Response( ResponseEvent { ts: 0.0, - from: "Agent", + from: "Proxy(0)", to: "Client", id: String("id:0"), is_error: false, payload: Object { "agentCapabilities": Object { - "auth": Object {}, "loadSession": Bool(false), "mcpCapabilities": Object { + "acp": Bool(true), "http": Bool(false), "sse": Bool(false), }, @@ -361,7 +358,7 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err ts: 0.0, protocol: Acp, from: "Client", - to: "Agent", + to: "Proxy(0)", id: String("id:1"), method: "session/new", session: None, @@ -378,24 +375,10 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err }, }, ), - Request( - RequestEvent { - ts: 0.0, - protocol: Acp, - from: "Agent", - to: "Client", - id: String("id:2"), - method: "_mcp/connect", - session: None, - params: Object { - "acp_url": String("acp:url:0"), - }, - }, - ), Response( ResponseEvent { ts: 0.0, - from: "Agent", + from: "Proxy(0)", to: "Client", id: String("id:1"), is_error: false, @@ -409,8 +392,8 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err ts: 0.0, protocol: Acp, from: "Client", - to: "Agent", - id: String("id:3"), + to: "Proxy(0)", + id: String("id:2"), method: "session/prompt", session: None, params: Object { @@ -424,116 +407,12 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err }, }, ), - Response( - ResponseEvent { - ts: 0.0, - from: "Client", - to: "Agent", - id: String("id:2"), - is_error: false, - payload: Object { - "connection_id": String("connection:0"), - }, - }, - ), - Request( - RequestEvent { - ts: 0.0, - protocol: Mcp, - from: "Agent", - to: "Client", - id: String("id:4"), - method: "initialize", - session: None, - params: Object { - "capabilities": Object {}, - "clientInfo": Object { - "name": String("rmcp"), - "version": String("1.5.0"), - }, - "protocolVersion": String("2025-11-25"), - }, - }, - ), - Response( - ResponseEvent { - ts: 0.0, - from: "Client", - to: "Agent", - id: String("id:4"), - is_error: false, - payload: Object { - "capabilities": Object { - "tools": Object {}, - }, - "instructions": String("A test MCP server hosted by the client"), - "protocolVersion": String("2025-11-25"), - "serverInfo": Object { - "name": String("rmcp"), - "version": String("1.5.0"), - }, - }, - }, - ), - Notification( - NotificationEvent { - ts: 0.0, - protocol: Mcp, - from: "Agent", - to: "Client", - method: "notifications/initialized", - session: None, - params: Null, - }, - ), - Request( - RequestEvent { - ts: 0.0, - protocol: Mcp, - from: "Agent", - to: "Client", - id: String("id:5"), - method: "tools/call", - session: None, - params: Object { - "_meta": Object { - "progressToken": Number(0), - }, - "arguments": Object { - "message": String("Hello from client test!"), - }, - "name": String("echo"), - }, - }, - ), - Response( - ResponseEvent { - ts: 0.0, - from: "Client", - to: "Agent", - id: String("id:5"), - is_error: false, - payload: Object { - "content": Array [ - Object { - "text": String("{\"call_number\":1,\"echoed\":\"Client echoes: Hello from client test!\"}"), - "type": String("text"), - }, - ], - "isError": Bool(false), - "structuredContent": Object { - "call_number": Number(1), - "echoed": String("Client echoes: Hello from client test!"), - }, - }, - }, - ), Notification( NotificationEvent { ts: 0.0, protocol: Acp, - from: "Agent", - to: "Client", + from: "Proxy(1)", + to: "Proxy(0)", method: "session/update", session: None, params: Object { @@ -551,9 +430,9 @@ async fn test_trace_client_mcp_server() -> Result<(), agent_client_protocol::Err Response( ResponseEvent { ts: 0.0, - from: "Agent", + from: "Proxy(0)", to: "Client", - id: String("id:3"), + id: String("id:2"), is_error: false, payload: Object { "stopReason": String("end_turn"), diff --git a/src/agent-client-protocol-conductor/tests/trace_generation.rs b/src/agent-client-protocol-conductor/tests/trace_generation.rs index 4bf5338..074bf54 100644 --- a/src/agent-client-protocol-conductor/tests/trace_generation.rs +++ b/src/agent-client-protocol-conductor/tests/trace_generation.rs @@ -8,7 +8,7 @@ //! Run `just prep-tests` before running this test. use agent_client_protocol::AcpAgent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::test_binaries::{arrow_proxy_example, testy}; use agent_client_protocol_test::testy::TestyCommand; use tokio::io::duplex; @@ -43,7 +43,6 @@ async fn test_trace_generation() -> Result<(), agent_client_protocol::Error> { ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(eliza_agent).proxy(arrow_proxy_agent), - McpBridgeMode::default(), ) .trace_to_path(&trace_path_clone) .expect("Failed to create trace writer") diff --git a/src/agent-client-protocol-conductor/tests/trace_mcp_tool_call.rs b/src/agent-client-protocol-conductor/tests/trace_mcp_tool_call.rs index bdab7f5..552fd28 100644 --- a/src/agent-client-protocol-conductor/tests/trace_mcp_tool_call.rs +++ b/src/agent-client-protocol-conductor/tests/trace_mcp_tool_call.rs @@ -15,7 +15,8 @@ use agent_client_protocol::schema::{ SessionNotification, TextContent, }; use agent_client_protocol_conductor::trace::TraceEvent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use expect_test::expect; use futures::channel::mpsc; @@ -122,7 +123,7 @@ impl EventNormalizer { } else { self.normalize_json(v) } - } else if k == "url" || k == "acp_url" { + } else if k == "url" || k == "acp_id" { if let serde_json::Value::String(s) = &v { if s.starts_with("acp:") || s.starts_with("http://localhost:") { serde_json::Value::String(self.normalize_acp_url(s)) @@ -214,8 +215,9 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> let conductor_handle = tokio::spawn(async move { ConductorImpl::new_agent( "conductor".to_string(), - ProxiesAndAgent::new(Testy::new()).proxy(mcp_integration::proxy::ProxyComponent), - McpBridgeMode::default(), + ProxiesAndAgent::new(Testy::new()) + .proxy(mcp_integration::proxy::ProxyComponent) + .proxy(McpOverAcpPolyfill::http()), ) .trace_to(trace_tx) .run(agent_client_protocol::ByteStreams::new( @@ -307,9 +309,27 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> session: None, params: Object { "clientCapabilities": Object { - "auth": Object { - "terminal": Bool(false), + "fs": Object { + "readTextFile": Bool(false), + "writeTextFile": Bool(false), }, + "terminal": Bool(false), + }, + "protocolVersion": Number(1), + }, + }, + ), + Request( + RequestEvent { + ts: 0.0, + protocol: Acp, + from: "Proxy(0)", + to: "Proxy(1)", + id: String("id:1"), + method: "_proxy/initialize", + session: None, + params: Object { + "clientCapabilities": Object { "fs": Object { "readTextFile": Bool(false), "writeTextFile": Bool(false), @@ -320,6 +340,33 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> }, }, ), + Response( + ResponseEvent { + ts: 0.0, + from: "Proxy(1)", + to: "Proxy(0)", + id: String("id:1"), + is_error: false, + payload: Object { + "agentCapabilities": Object { + "loadSession": Bool(false), + "mcpCapabilities": Object { + "acp": Bool(true), + "http": Bool(false), + "sse": Bool(false), + }, + "promptCapabilities": Object { + "audio": Bool(false), + "embeddedContext": Bool(false), + "image": Bool(false), + }, + "sessionCapabilities": Object {}, + }, + "authMethods": Array [], + "protocolVersion": Number(1), + }, + }, + ), Response( ResponseEvent { ts: 0.0, @@ -329,9 +376,9 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> is_error: false, payload: Object { "agentCapabilities": Object { - "auth": Object {}, "loadSession": Bool(false), "mcpCapabilities": Object { + "acp": Bool(true), "http": Bool(false), "sse": Bool(false), }, @@ -353,7 +400,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> protocol: Acp, from: "Client", to: "Proxy(0)", - id: String("id:1"), + id: String("id:2"), method: "session/new", session: None, params: Object { @@ -362,17 +409,39 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> }, }, ), + Request( + RequestEvent { + ts: 0.0, + protocol: Acp, + from: "Proxy(0)", + to: "Proxy(1)", + id: String("id:3"), + method: "session/new", + session: None, + params: Object { + "cwd": String("/"), + "mcpServers": Array [ + Object { + "headers": Array [], + "name": String("test"), + "type": String("http"), + "url": String("acp:url:0"), + }, + ], + }, + }, + ), Request( RequestEvent { ts: 0.0, protocol: Acp, from: "Proxy(1)", to: "Proxy(0)", - id: String("id:2"), + id: String("id:4"), method: "_mcp/connect", session: None, params: Object { - "acp_url": String("acp:url:0"), + "acp_id": String("acp:url:0"), }, }, ), @@ -381,19 +450,31 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> ts: 0.0, from: "Proxy(0)", to: "Proxy(1)", - id: String("id:2"), + id: String("id:4"), is_error: false, payload: Object { "connection_id": String("connection:0"), }, }, ), + Response( + ResponseEvent { + ts: 0.0, + from: "Proxy(1)", + to: "Proxy(0)", + id: String("id:3"), + is_error: false, + payload: Object { + "sessionId": String("session:0"), + }, + }, + ), Response( ResponseEvent { ts: 0.0, from: "Proxy(0)", to: "Client", - id: String("id:1"), + id: String("id:2"), is_error: false, payload: Object { "sessionId": String("session:0"), @@ -406,7 +487,27 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> protocol: Acp, from: "Client", to: "Proxy(0)", - id: String("id:3"), + id: String("id:5"), + method: "session/prompt", + session: None, + params: Object { + "prompt": Array [ + Object { + "text": String("{\"command\":\"call_tool\",\"server\":\"test\",\"tool\":\"echo\",\"params\":{\"message\":\"Hello from trace test!\"}}"), + "type": String("text"), + }, + ], + "sessionId": String("session:0"), + }, + }, + ), + Request( + RequestEvent { + ts: 0.0, + protocol: Acp, + from: "Proxy(0)", + to: "Proxy(1)", + id: String("id:6"), method: "session/prompt", session: None, params: Object { @@ -426,7 +527,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> protocol: Mcp, from: "Proxy(1)", to: "Proxy(0)", - id: String("id:4"), + id: String("id:7"), method: "initialize", session: None, params: Object { @@ -444,7 +545,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> ts: 0.0, from: "Proxy(0)", to: "Proxy(1)", - id: String("id:4"), + id: String("id:7"), is_error: false, payload: Object { "capabilities": Object { @@ -476,7 +577,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> protocol: Mcp, from: "Proxy(1)", to: "Proxy(0)", - id: String("id:5"), + id: String("id:8"), method: "tools/call", session: None, params: Object { @@ -495,7 +596,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> ts: 0.0, from: "Proxy(0)", to: "Proxy(1)", - id: String("id:5"), + id: String("id:8"), is_error: false, payload: Object { "content": Array [ @@ -511,6 +612,38 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> }, }, ), + Notification( + NotificationEvent { + ts: 0.0, + protocol: Acp, + from: "Proxy(2)", + to: "Proxy(1)", + method: "session/update", + session: None, + params: Object { + "sessionId": String("session:0"), + "update": Object { + "content": Object { + "text": String("OK: CallToolResult { content: [Annotated { raw: Text(RawTextContent { text: \"{\\\"result\\\":\\\"Echo: Hello from trace test!\\\"}\", meta: None }), annotations: None }], structured_content: Some(Object {\"result\": String(\"Echo: Hello from trace test!\")}), is_error: Some(false), meta: None }"), + "type": String("text"), + }, + "sessionUpdate": String("agent_message_chunk"), + }, + }, + }, + ), + Response( + ResponseEvent { + ts: 0.0, + from: "Proxy(1)", + to: "Proxy(0)", + id: String("id:6"), + is_error: false, + payload: Object { + "stopReason": String("end_turn"), + }, + }, + ), Notification( NotificationEvent { ts: 0.0, @@ -536,7 +669,7 @@ async fn test_trace_mcp_tool_call() -> Result<(), agent_client_protocol::Error> ts: 0.0, from: "Proxy(0)", to: "Client", - id: String("id:3"), + id: String("id:5"), is_error: false, payload: Object { "stopReason": String("end_turn"), diff --git a/src/agent-client-protocol-conductor/tests/trace_snapshot.rs b/src/agent-client-protocol-conductor/tests/trace_snapshot.rs index 2a78857..a096c98 100644 --- a/src/agent-client-protocol-conductor/tests/trace_snapshot.rs +++ b/src/agent-client-protocol-conductor/tests/trace_snapshot.rs @@ -7,7 +7,7 @@ use agent_client_protocol::AcpAgent; use agent_client_protocol_conductor::trace::TraceEvent; -use agent_client_protocol_conductor::{ConductorImpl, McpBridgeMode, ProxiesAndAgent}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_test::test_binaries::{arrow_proxy_example, testy}; use agent_client_protocol_test::testy::TestyCommand; use expect_test::expect; @@ -147,7 +147,6 @@ async fn test_trace_snapshot() -> Result<(), agent_client_protocol::Error> { ConductorImpl::new_agent( "conductor".to_string(), ProxiesAndAgent::new(eliza_agent).proxy(arrow_proxy_agent), - McpBridgeMode::default(), ) .trace_to(tx) .run(agent_client_protocol::ByteStreams::new( @@ -192,9 +191,6 @@ async fn test_trace_snapshot() -> Result<(), agent_client_protocol::Error> { session: None, params: Object { "clientCapabilities": Object { - "auth": Object { - "terminal": Bool(false), - }, "fs": Object { "readTextFile": Bool(false), "writeTextFile": Bool(false), @@ -214,9 +210,9 @@ async fn test_trace_snapshot() -> Result<(), agent_client_protocol::Error> { is_error: false, payload: Object { "agentCapabilities": Object { - "auth": Object {}, "loadSession": Bool(false), "mcpCapabilities": Object { + "acp": Bool(false), "http": Bool(false), "sse": Bool(false), }, diff --git a/src/agent-client-protocol-polyfill/Cargo.toml b/src/agent-client-protocol-polyfill/Cargo.toml new file mode 100644 index 0000000..ebde699 --- /dev/null +++ b/src/agent-client-protocol-polyfill/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "agent-client-protocol-polyfill" +version = "0.11.1" +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "Polyfill proxies for Agent Client Protocol backward compatibility" +keywords = ["acp", "agent", "mcp", "polyfill"] +categories = ["development-tools"] + +[dependencies] +agent-client-protocol = { workspace = true, features = ["unstable_mcp_over_acp"] } +anyhow.workspace = true +async-stream.workspace = true +axum.workspace = true +futures.workspace = true +futures-concurrency.workspace = true +rustc-hash.workspace = true +serde_json.workspace = true +thiserror = "2.0" +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true +uuid.workspace = true + +[lints] +workspace = true diff --git a/src/agent-client-protocol-polyfill/src/lib.rs b/src/agent-client-protocol-polyfill/src/lib.rs new file mode 100644 index 0000000..5c62955 --- /dev/null +++ b/src/agent-client-protocol-polyfill/src/lib.rs @@ -0,0 +1,13 @@ +//! # agent-client-protocol-polyfill +//! +//! Polyfill proxies for backward compatibility with agents that don't support +//! newer ACP features natively. +//! +//! ## MCP-over-ACP Polyfill +//! +//! The [`mcp_over_acp`] module provides a proxy that bridges MCP-over-ACP transport +//! for agents that don't support `mcpCapabilities.acp`. It intercepts `NewSessionRequest` +//! to transform `McpServer::Http` entries with `acp:` URLs into localhost TCP bridges, +//! and handles `_mcp/*` messages by routing them through those bridges. + +pub mod mcp_over_acp; diff --git a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/actor.rs b/src/agent-client-protocol-polyfill/src/mcp_over_acp/actor.rs similarity index 62% rename from src/agent-client-protocol-conductor/src/conductor/mcp_bridge/actor.rs rename to src/agent-client-protocol-polyfill/src/mcp_over_acp/actor.rs index 557e08c..65f8208 100644 --- a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/actor.rs +++ b/src/agent-client-protocol-polyfill/src/mcp_over_acp/actor.rs @@ -4,32 +4,31 @@ use agent_client_protocol::{ use futures::{SinkExt as _, StreamExt as _, channel::mpsc}; use tracing::info; -use crate::conductor::ConductorMessage; +use super::BridgeMessage; -/// Trait for actors that handle MCP bridge connections. -/// -/// Implementations bridge between MCP clients and the conductor's ACP message flow. +/// Actor that bridges a single MCP connection between a local MCP client +/// and the ACP proxy chain. #[derive(Debug)] -pub struct McpBridgeConnectionActor { - /// How to connect to the MCP server +pub(crate) struct BridgeConnectionActor { + /// How to connect to the MCP server (e.g., stdio or HTTP transport). transport: DynConnectTo, - /// Sender for messages to the conductor - conductor_tx: mpsc::Sender, + /// Sender for messages back to the polyfill's bridge responder loop. + bridge_tx: mpsc::Sender, - /// Receiver for messages from the conductor to the MCP client + /// Receiver for messages from the polyfill to forward to the MCP client. to_mcp_client_rx: mpsc::Receiver, } -impl McpBridgeConnectionActor { +impl BridgeConnectionActor { pub fn new( component: impl ConnectTo, - conductor_tx: mpsc::Sender, + bridge_tx: mpsc::Sender, to_mcp_client_rx: mpsc::Receiver, ) -> Self { Self { transport: DynConnectTo::new(component), - conductor_tx, + bridge_tx, to_mcp_client_rx, } } @@ -37,23 +36,22 @@ impl McpBridgeConnectionActor { pub async fn run(self, connection_id: String) -> Result<(), agent_client_protocol::Error> { info!(connection_id, "MCP bridge connected"); - let McpBridgeConnectionActor { + let Self { transport, - mut conductor_tx, + mut bridge_tx, to_mcp_client_rx, } = self; let result = mcp::Client .builder() - .name(format!("mpc-client-to-conductor({connection_id})")) - // When we receive a message from the MCP client, forward it to the conductor + .name(format!("mcp-client-to-polyfill({connection_id})")) .on_receive_dispatch( { - let mut conductor_tx = conductor_tx.clone(); + let mut bridge_tx = bridge_tx.clone(); let connection_id = connection_id.clone(); - async move |message: agent_client_protocol::Dispatch, _cx| { - conductor_tx - .send(ConductorMessage::McpClientToMcpServer { + async move |message: Dispatch, _cx| { + bridge_tx + .send(BridgeMessage::ClientToServer { connection_id: connection_id.clone(), message, }) @@ -63,7 +61,6 @@ impl McpBridgeConnectionActor { }, agent_client_protocol::on_receive_dispatch!(), ) - // When we receive messages from the conductor, forward them to the MCP client .connect_with(transport, async move |mcp_connection_to_client| { let mut to_mcp_client_rx = to_mcp_client_rx; while let Some(message) = to_mcp_client_rx.next().await { @@ -73,8 +70,8 @@ impl McpBridgeConnectionActor { }) .await; - conductor_tx - .send(ConductorMessage::McpConnectionDisconnected { + bridge_tx + .send(BridgeMessage::Disconnected { notification: McpDisconnectNotification { connection_id, meta: None, diff --git a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/http.rs b/src/agent-client-protocol-polyfill/src/mcp_over_acp/http.rs similarity index 67% rename from src/agent-client-protocol-conductor/src/conductor/mcp_bridge/http.rs rename to src/agent-client-protocol-polyfill/src/mcp_over_acp/http.rs index 9ea579d..fc01e40 100644 --- a/src/agent-client-protocol-conductor/src/conductor/mcp_bridge/http.rs +++ b/src/agent-client-protocol-polyfill/src/mcp_over_acp/http.rs @@ -1,3 +1,5 @@ +//! HTTP-based MCP bridge transport. + use agent_client_protocol::{BoxFuture, Channel, ConnectTo, jsonrpcmsg::Message, role::mcp}; use axum::{ Router, @@ -17,32 +19,25 @@ use std::{ }; use tokio::net::TcpListener; -use crate::conductor::{ - ConductorMessage, - mcp_bridge::{McpBridgeConnection, McpBridgeConnectionActor}, -}; +use super::{BridgeConnection, BridgeMessage, actor::BridgeConnectionActor}; -/// Runs an HTTP listener for MCP bridge connections +/// Runs an HTTP listener for MCP bridge connections. pub async fn run_http_listener( tcp_listener: TcpListener, - acp_url: String, - mut conductor_tx: mpsc::Sender, + acp_id: String, + mut bridge_tx: mpsc::Sender, ) -> Result<(), agent_client_protocol::Error> { let (to_mcp_client_tx, to_mcp_client_rx) = mpsc::channel(128); - // When we send this message to the conductor, - // it is going to go through a step or two and eventually - // spawn the McpBridgeConnectionActor, which will ferry MCP requests - // back and forth. - conductor_tx - .send(ConductorMessage::McpConnectionReceived { - acp_url, - actor: McpBridgeConnectionActor::new( + bridge_tx + .send(BridgeMessage::ConnectionReceived { + acp_id, + actor: BridgeConnectionActor::new( HttpMcpBridge::new(tcp_listener), - conductor_tx.clone(), + bridge_tx.clone(), to_mcp_client_rx, ), - connection: McpBridgeConnection::new(to_mcp_client_tx), + connection: BridgeConnection::new(to_mcp_client_tx), }) .await .map_err(|_| agent_client_protocol::Error::internal_error())?; @@ -50,7 +45,8 @@ pub async fn run_http_listener( Ok(()) } -/// A component that receives HTTP requests/responses using the HTTP transport defined by the MCP protocol. +/// A component that receives HTTP requests/responses using the HTTP transport +/// defined by the MCP protocol. struct HttpMcpBridge { listener: tokio::net::TcpListener, } @@ -87,7 +83,7 @@ impl ConnectTo for HttpMcpBridge { } } -/// Error type that we use to respond to malformed HTTP requests. +/// Error type for responding to malformed HTTP requests. #[derive(Debug, thiserror::Error)] #[error(transparent)] struct HttpError(#[from] agent_client_protocol::Error); @@ -109,7 +105,6 @@ impl IntoResponse for HttpError { /// and communicating those requests over `channel` to the JSON-RPC server. async fn run(listener: TcpListener, channel: Channel) -> Result<(), agent_client_protocol::Error> { let (registration_tx, registration_rx) = mpsc::unbounded(); - let state = BridgeState { registration_tx }; // The way that the MCP protocol works is a bit "special". @@ -120,11 +115,11 @@ async fn run(listener: TcpListener, channel: Channel) -> Result<(), agent_client // // Clients can also issue a *GET* request. This will result in a stream of messages. // - // Non-reply messages can be send to any open stream (POST, GET, etc) but must be sent to + // Non-reply messages can be sent to any open stream (POST, GET, etc) but must be sent to // exactly one. // - // There are provisions for "resuming" from a blocked point by tagging each message in the SSE stream - // with an id, but we are not implementing that because I am lazy. + // There are provisions for "resuming" from a blocked point by tagging each message in the SSE + // stream with an id, but we are not implementing that because I am lazy. async { let app = Router::new() .route("/", post(handle_post).get(handle_get)) @@ -146,38 +141,39 @@ struct BridgeState { /// Messages from HTTP handlers to the bridge server. #[derive(Debug)] +#[allow(dead_code)] enum HttpMessage { - /// A JSON-RPC request (has an id, expects a response via the channel) + /// A JSON-RPC request (has an id, expects a response via the channel). Request { http_request_id: uuid::Uuid, request: agent_client_protocol::jsonrpcmsg::Request, response_tx: mpsc::UnboundedSender, }, - /// A JSON-RPC notification (no id, no response expected) + /// A JSON-RPC notification (no id, no response expected). Notification { http_request_id: uuid::Uuid, request: agent_client_protocol::jsonrpcmsg::Request, }, - /// A JSON-RPC response from the client + /// A JSON-RPC response from the client. Response { http_request_id: uuid::Uuid, response: agent_client_protocol::jsonrpcmsg::Response, }, - /// A GET request to open an SSE stream for server-initiated messages + /// A GET request to open an SSE stream for server-initiated messages. Get { http_request_id: uuid::Uuid, response_tx: mpsc::UnboundedSender, }, } -/// Clone of `agent_client_protocol::jsonrpcmsg::Id` since for unfathomable reasons that does not impl Hash +/// Clone of `agent_client_protocol::jsonrpcmsg::Id` since it does not impl `Hash`. #[derive(Eq, PartialEq, PartialOrd, Ord, Hash, Debug, Clone)] enum JsonRpcId { - /// String identifier + /// String identifier. String(String), - /// Numeric identifier + /// Numeric identifier. Number(u64), - /// Null identifier (for notifications) + /// Null identifier (for notifications). Null, } @@ -207,11 +203,6 @@ impl RunningServer { } /// The main loop: listen for incoming HTTP messages and outgoing JSON-RPC messages. - /// - /// # Parameters - /// - /// * `channel`: The channel to use for sending/receiving JSON-RPC messages. - /// * `http_rx`: The receiver for messages from HTTP handlers. async fn run( mut self, mut channel: Channel, @@ -236,7 +227,6 @@ impl RunningServer { MultiplexMessage::FromHttpToChannel(http_message) => { self.handle_http_message(http_message, &mut channel.tx)?; } - MultiplexMessage::FromChannelToHttp(message) => { let message = message.unwrap_or_else(|err| { agent_client_protocol::jsonrpcmsg::Message::Response( @@ -246,11 +236,6 @@ impl RunningServer { ), ) }); - tracing::debug!( - queue_len = self.message_deque.len() + 1, - ?message, - "enqueuing outgoing message" - ); self.message_deque.push_back(message); } } @@ -258,8 +243,6 @@ impl RunningServer { self.drain_jsonrpc_messages(); } - tracing::trace!("http connection terminating"); - Ok(()) } @@ -279,175 +262,92 @@ impl RunningServer { } => { tracing::debug!(%http_request_id, ?request, "handling request"); let request_id = request.id.clone().map(JsonRpcId::from); - - // Send to the JSON-RPC server channel_tx .unbounded_send(Ok(Message::Request(request))) .map_err(agent_client_protocol::util::internal_error)?; - - // Register to receive the response let session = RegisteredSession::new(response_tx); if let Some(id) = request_id { - tracing::debug!(%http_request_id, session_id = %session.id, ?id, "registering waiting session"); self.waiting_sessions.insert(id, session); } else { - // Request without id - treat like a general session - tracing::debug!(%http_request_id, session_id = %session.id, "registering general session (request without id)"); self.general_sessions.push(session); } } - HttpMessage::Notification { - http_request_id, + http_request_id: _, request, } => { - tracing::debug!(%http_request_id, ?request, "handling notification"); - // Just forward to the server, no response tracking needed channel_tx .unbounded_send(Ok(Message::Request(request))) .map_err(agent_client_protocol::util::internal_error)?; } - HttpMessage::Response { - http_request_id, + http_request_id: _, response, } => { - tracing::debug!(%http_request_id, ?response, "handling response"); - // Forward to the server channel_tx .unbounded_send(Ok(Message::Response(response))) .map_err(agent_client_protocol::util::internal_error)?; } - HttpMessage::Get { - http_request_id, + http_request_id: _, response_tx, } => { - let session = RegisteredSession::new(response_tx); - tracing::debug!( - %http_request_id, - session_id = %session.id, - queued_messages = self.message_deque.len(), - "handling GET (opening SSE stream)" - ); - // Register as a general session to receive server-initiated messages - self.general_sessions.push(session); + self.general_sessions + .push(RegisteredSession::new(response_tx)); } } - - // Purge closed sessions for good hygiene self.purge_closed_sessions(); - Ok(()) } - /// Remove messages from the queue and send them. - /// Stop if we cannot find places to send them. fn drain_jsonrpc_messages(&mut self) { - if !self.message_deque.is_empty() { - tracing::debug!( - queue_len = self.message_deque.len(), - general_sessions = self.general_sessions.len(), - waiting_sessions = self.waiting_sessions.len(), - "draining message queue" - ); - } - while let Some(message) = self.message_deque.pop_front() { - match self.try_dispatch_jsonrpc_message(message) { - None => { - tracing::debug!( - remaining = self.message_deque.len(), - "message dispatched successfully" - ); - } - - Some(message) => { - tracing::debug!( - remaining = self.message_deque.len() + 1, - "no available session, re-enqueuing message" - ); - self.message_deque.push_front(message); - break; - } + if let Some(message) = self.try_dispatch_jsonrpc_message(message) { + self.message_deque.push_front(message); + break; } } } - /// Invoked when there is an outgoing JSON-RPC message to send. - /// Tries to find a suitable place to send it. - /// If it succeeds, returns `None`. - /// If there is no place to send it, returns `Some(message)`. fn try_dispatch_jsonrpc_message( &mut self, mut message: agent_client_protocol::jsonrpcmsg::Message, ) -> Option { - // Extract the id of the message we are replying to, if any let message_id = match &message { Message::Response(response) => response.id.as_ref().map(|v| v.clone().into()), Message::Request(_) => None, }; - tracing::debug!(?message_id, "attempting to dispatch JSON-RPC message"); - - // If there is a specific id, try to send the message to that sender. - // This also removes them from the list of waiting sessions. if let Some(ref message_id) = message_id && let Some(session) = self.waiting_sessions.remove(message_id) { - tracing::debug!(session_id = %session.id, "found waiting session, attempting send"); - match session.outgoing_tx.unbounded_send(message) { - // Successfully sent the message, return - Ok(()) => { - tracing::debug!(session_id = %session.id, "sent to waiting session"); - return None; - } - - // If the sender died, just recover the message and send it to anyone. + Ok(()) => return None, Err(m) => { - tracing::debug!(session_id = %session.id, "waiting session disconnected"); - // If that sender is dead, remove them from the list - // and recover the message. assert!(m.is_disconnected()); message = m.into_inner(); } } } - // Try to find *somewhere* to send the message self.purge_closed_sessions(); - tracing::debug!( - general_sessions = self.general_sessions.len(), - waiting_sessions = self.waiting_sessions.len(), - "trying to find any active session" - ); let all_sessions = self .general_sessions .iter_mut() .chain(self.waiting_sessions.values_mut()); for session in all_sessions { - tracing::trace!(session_id = %session.id, "trying session"); match session.outgoing_tx.unbounded_send(message) { - Ok(()) => { - tracing::debug!(session_id = %session.id, "sent to session"); - return None; - } - + Ok(()) => return None, Err(m) => { - tracing::debug!(session_id = %session.id, "session disconnected, trying next"); assert!(m.is_disconnected()); message = m.into_inner(); } } } - // If we don't find anywhere to send the message, return it. Some(message) } - /// Purge sessions from the bridge state where the receiver is closed. - /// This happens when the HTTP client disconnects. fn purge_closed_sessions(&mut self) { self.general_sessions .retain(|session| !session.outgoing_tx.is_closed()); @@ -457,6 +357,7 @@ impl RunningServer { } struct RegisteredSession { + #[allow(dead_code)] id: uuid::Uuid, outgoing_tx: mpsc::UnboundedSender, } @@ -478,15 +379,11 @@ async fn handle_post( body: String, ) -> Result { let http_request_id = uuid::Uuid::new_v4(); - - // Parse incoming JSON-RPC message let message: agent_client_protocol::jsonrpcmsg::Message = serde_json::from_str(&body).map_err(agent_client_protocol::util::parse_error)?; match message { Message::Request(request) if request.id.is_some() => { - tracing::debug!(%http_request_id, method = %request.method, "POST request received"); - // Request with id - return SSE stream for response let (tx, mut rx) = mpsc::unbounded(); state .registration_tx @@ -499,20 +396,15 @@ async fn handle_post( let stream = async_stream::stream! { while let Some(message) = rx.next().await { - tracing::debug!(%http_request_id, "sending SSE event"); match axum::response::sse::Event::default().json_data(message) { Ok(v) => yield Ok(v), Err(e) => yield Err(HttpError::from(e)), } } - tracing::debug!(%http_request_id, "SSE stream completed"); }; Ok(Sse::new(stream).into_response()) } - Message::Request(request) => { - tracing::debug!(%http_request_id, method = %request.method, "POST notification received"); - // Request without id is a notification state .registration_tx .unbounded_send(HttpMessage::Notification { @@ -522,10 +414,7 @@ async fn handle_post( .map_err(agent_client_protocol::util::internal_error)?; Ok(StatusCode::ACCEPTED.into_response()) } - Message::Response(response) => { - tracing::debug!(%http_request_id, "POST response received"); - // Response from client (rare, but possible in MCP) state .registration_tx .unbounded_send(HttpMessage::Response { @@ -544,8 +433,6 @@ async fn handle_get( State(state): State>, ) -> Result>>, HttpError> { let http_request_id = uuid::Uuid::new_v4(); - tracing::debug!(%http_request_id, "GET request received"); - let (tx, mut rx) = mpsc::unbounded(); state .registration_tx @@ -557,13 +444,11 @@ async fn handle_get( let stream = async_stream::stream! { while let Some(message) = rx.next().await { - tracing::debug!(%http_request_id, "sending SSE event"); match axum::response::sse::Event::default().json_data(message) { Ok(v) => yield Ok(v), Err(e) => yield Err(HttpError::from(e)), } } - tracing::debug!(%http_request_id, "SSE stream completed"); }; Ok(Sse::new(stream)) diff --git a/src/agent-client-protocol-polyfill/src/mcp_over_acp/mod.rs b/src/agent-client-protocol-polyfill/src/mcp_over_acp/mod.rs new file mode 100644 index 0000000..308c81c --- /dev/null +++ b/src/agent-client-protocol-polyfill/src/mcp_over_acp/mod.rs @@ -0,0 +1,403 @@ +//! MCP-over-ACP polyfill proxy. +//! +//! This proxy bridges MCP-over-ACP transport for agents that don't support +//! `mcpCapabilities.acp` natively. It sits in the proxy chain and: +//! +//! - Intercepts `NewSessionRequest` to transform `McpServer::Http` entries with `acp:` URLs +//! into localhost TCP bridges +//! - Handles `_mcp/connect`, `_mcp/message`, `_mcp/disconnect` by routing through those bridges +//! +//! # Usage +//! +//! ```rust,ignore +//! use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +//! +//! // Add to a conductor proxy chain +//! let conductor = ConductorImpl::new_agent( +//! "conductor", +//! ProxiesAndAgent::new(my_agent).proxy(McpOverAcpPolyfill::http()), +//! McpBridgeMode::default(), +//! ); +//! ``` + +mod actor; +pub(crate) mod http; +pub(crate) mod stdio; + +use std::collections::HashMap; +use std::path::PathBuf; + +use agent_client_protocol::schema::{ + InitializeProxyRequest, McpConnectRequest, McpConnectResponse, McpDisconnectNotification, + McpOverAcpMessage, McpServer, McpServerHttp, McpServerStdio, NewSessionRequest, +}; +use agent_client_protocol::{ + Agent, Client, Conductor, ConnectTo, ConnectionTo, Dispatch, Proxy, Role, +}; +use futures::{SinkExt, channel::mpsc}; +use tokio::net::TcpListener; +use tracing::info; + +use self::actor::BridgeConnectionActor; + +/// Internal messages for the polyfill's bridge management. +#[derive(Debug)] +pub(crate) enum BridgeMessage { + /// A new TCP connection was accepted and needs an ACP connection ID. + ConnectionReceived { + acp_id: String, + actor: BridgeConnectionActor, + connection: BridgeConnection, + }, + + /// ACP connection ID received — spawn the actor and store the connection. + ConnectionEstablished { + response: McpConnectResponse, + actor: BridgeConnectionActor, + connection: BridgeConnection, + }, + + /// MCP message from a bridge client that needs to be forwarded over ACP. + ClientToServer { + connection_id: String, + message: Dispatch, + }, + + /// Bridge client disconnected. + Disconnected { + notification: McpDisconnectNotification, + }, +} + +/// Connection handle for sending messages to an MCP client via a bridge. +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub(crate) struct BridgeConnection { + to_mcp_client_tx: mpsc::Sender, +} + +impl BridgeConnection { + pub fn new(to_mcp_client_tx: mpsc::Sender) -> Self { + Self { to_mcp_client_tx } + } + + #[allow(dead_code)] + pub async fn send(&mut self, message: Dispatch) -> Result<(), agent_client_protocol::Error> { + self.to_mcp_client_tx + .send(message) + .await + .map_err(|_| agent_client_protocol::Error::internal_error()) + } +} + +/// Mode for the MCP bridge transport. +#[derive(Debug, Clone)] +pub enum BridgeMode { + /// Use stdio-based MCP bridge with a subprocess. + Stdio { + /// Command and args to spawn bridge processes. + conductor_command: Vec, + }, + + /// Use HTTP-based MCP bridge (default). + Http, +} + +impl Default for BridgeMode { + fn default() -> Self { + BridgeMode::Http + } +} + +/// MCP-over-ACP polyfill proxy. +/// +/// Bridges MCP-over-ACP transport for agents that don't support `mcpCapabilities.acp`. +#[derive(Debug)] +pub struct McpOverAcpPolyfill { + mode: BridgeMode, +} + +impl McpOverAcpPolyfill { + /// Create a polyfill using HTTP bridge mode. + pub fn http() -> Self { + Self { + mode: BridgeMode::Http, + } + } + + /// Create a polyfill using stdio bridge mode. + pub fn stdio(conductor_command: Vec) -> Self { + Self { + mode: BridgeMode::Stdio { conductor_command }, + } + } +} + +impl ConnectTo for McpOverAcpPolyfill { + async fn connect_to( + self, + client: impl ConnectTo, + ) -> Result<(), agent_client_protocol::Error> { + let (bridge_tx, bridge_rx) = mpsc::channel(128); + let mode = self.mode; + + Proxy + .builder() + .name("mcp-over-acp-polyfill") + .with_responder(BridgeResponder { + bridge_tx: bridge_tx.clone(), + bridge_rx, + bridge_connections: HashMap::new(), + }) + .on_receive_request_from( + Client, + async move |request: InitializeProxyRequest, + responder, + cx: ConnectionTo| { + // Forward initialize to successor, then set mcpCapabilities.acp = true + // in the response to advertise that we handle MCP-over-ACP. + cx.send_request_to(Agent, request.initialize) + .on_receiving_result(async move |result| { + responder.respond_with_result(result.map(|mut response| { + response.agent_capabilities.mcp_capabilities.acp = true; + response + })) + }) + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_request_from( + Client, + { + let bridge_tx = bridge_tx.clone(); + async move |mut request: NewSessionRequest, + responder, + cx: ConnectionTo| { + // Transform acp: URLs in MCP servers + let mut listeners = BridgeListeners::default(); + for mcp_server in &mut request.mcp_servers { + listeners + .transform_mcp_server(cx.clone(), mcp_server, &bridge_tx, &mode) + .await?; + } + // Forward modified request to successor + cx.send_request_to(Agent, request) + .forward_response_to(responder) + } + }, + agent_client_protocol::on_receive_request!(), + ) + .connect_to(client) + .await + } +} + +/// Manages active bridge listeners (TCP listeners for acp: URLs). +#[derive(Default, Debug)] +struct BridgeListeners { + listeners: HashMap, +} + +#[derive(Clone, Debug)] +struct BridgeListener { + server: McpServer, +} + +impl BridgeListeners { + /// Transform an MCP server with `acp:` URL into a bridged localhost server. + async fn transform_mcp_server( + &mut self, + connection: ConnectionTo, + mcp_server: &mut McpServer, + bridge_tx: &mpsc::Sender, + mode: &BridgeMode, + ) -> Result<(), agent_client_protocol::Error> { + let McpServer::Http(http) = mcp_server else { + return Ok(()); + }; + + if !http.url.starts_with("acp:") { + return Ok(()); + } + + if !http.headers.is_empty() { + return Err(agent_client_protocol::Error::internal_error()); + } + + let name = http.name.clone(); + let url = http.url.clone(); + + info!( + server_name = %name, + acp_id = %url, + "Detected MCP server with ACP transport, spawning TCP bridge" + ); + + let transformed = self + .spawn_bridge(connection, &name, &url, bridge_tx, mode) + .await?; + *mcp_server = transformed; + Ok(()) + } + + async fn spawn_bridge( + &mut self, + connection: ConnectionTo, + server_name: &str, + acp_id: &str, + bridge_tx: &mpsc::Sender, + mode: &BridgeMode, + ) -> anyhow::Result { + if let Some(listener) = self.listeners.get(acp_id) { + return Ok(listener.server.clone()); + } + + let tcp_listener = TcpListener::bind("127.0.0.1:0").await?; + let tcp_port = tcp_listener.local_addr()?.port(); + + info!(acp_id = acp_id, tcp_port, "Bound listener for MCP bridge"); + + let new_server = match mode { + BridgeMode::Stdio { conductor_command } => McpServer::Stdio( + McpServerStdio::new( + server_name.to_string(), + PathBuf::from(&conductor_command[0]), + ) + .args( + conductor_command[1..] + .iter() + .cloned() + .chain(vec!["mcp".to_string(), format!("{tcp_port}")]) + .collect::>(), + ), + ), + + BridgeMode::Http => McpServer::Http(McpServerHttp::new( + server_name.to_string(), + format!("http://localhost:{tcp_port}"), + )), + }; + + self.listeners.insert( + acp_id.to_string(), + BridgeListener { + server: new_server.clone(), + }, + ); + + connection.spawn({ + let acp_id = acp_id.to_string(); + let bridge_tx = bridge_tx.clone(); + let mode = mode.clone(); + async move { + info!( + acp_id = acp_id, + tcp_port, "now accepting bridge connections" + ); + match mode { + BridgeMode::Stdio { + conductor_command: _, + } => stdio::run_tcp_listener(tcp_listener, acp_id, bridge_tx).await, + BridgeMode::Http => { + http::run_http_listener(tcp_listener, acp_id, bridge_tx).await + } + } + } + })?; + + Ok(new_server) + } +} + +/// Responder that runs alongside the proxy, managing bridge state. +struct BridgeResponder { + bridge_tx: mpsc::Sender, + bridge_rx: mpsc::Receiver, + bridge_connections: HashMap, +} + +impl std::fmt::Debug for BridgeResponder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BridgeResponder") + .field("bridge_connections", &self.bridge_connections.len()) + .finish_non_exhaustive() + } +} + +impl agent_client_protocol::RunWithConnectionTo for BridgeResponder { + async fn run_with_connection_to( + mut self, + connection: ConnectionTo, + ) -> Result<(), agent_client_protocol::Error> { + use futures::StreamExt; + + while let Some(message) = self.bridge_rx.next().await { + match message { + BridgeMessage::ConnectionReceived { + acp_id, + actor, + connection: bridge_conn, + } => { + // Send _mcp/connect request back through the chain. + // When the response arrives, send ConnectionEstablished back to ourselves. + connection + .send_request_to(Client, McpConnectRequest { acp_id, meta: None }) + .on_receiving_result({ + let mut bridge_tx = self.bridge_tx.clone(); + async move |result| match result { + Ok(response) => bridge_tx + .send(BridgeMessage::ConnectionEstablished { + response, + actor, + connection: bridge_conn, + }) + .await + .map_err(|_| agent_client_protocol::Error::internal_error()), + Err(_) => Ok(()), + } + })?; + } + + BridgeMessage::ConnectionEstablished { + response: McpConnectResponse { connection_id, .. }, + actor, + connection: bridge_conn, + } => { + self.bridge_connections + .insert(connection_id.clone(), bridge_conn); + connection.spawn(actor.run(connection_id))?; + } + + BridgeMessage::ClientToServer { + connection_id, + message, + } => { + let wrapped = message.map( + |request, responder| { + ( + McpOverAcpMessage { + connection_id: connection_id.clone(), + message: request, + meta: None, + }, + responder, + ) + }, + |notification| McpOverAcpMessage { + connection_id: connection_id.clone(), + message: notification, + meta: None, + }, + ); + connection.send_proxied_message_to(Client, wrapped)?; + } + + BridgeMessage::Disconnected { notification } => { + self.bridge_connections.remove(¬ification.connection_id); + connection.send_notification_to(Client, notification)?; + } + } + } + Ok(()) + } +} diff --git a/src/agent-client-protocol-polyfill/src/mcp_over_acp/stdio.rs b/src/agent-client-protocol-polyfill/src/mcp_over_acp/stdio.rs new file mode 100644 index 0000000..30556c7 --- /dev/null +++ b/src/agent-client-protocol-polyfill/src/mcp_over_acp/stdio.rs @@ -0,0 +1,44 @@ +//! Stdio-based MCP bridge transport. + +use agent_client_protocol::Dispatch; +use futures::{SinkExt, channel::mpsc}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; + +use super::{BridgeConnection, BridgeMessage, actor::BridgeConnectionActor}; + +/// Runs the stdio bridge TCP listener, accepting connections and creating bridge actors. +pub async fn run_tcp_listener( + tcp_listener: TcpListener, + acp_id: String, + mut bridge_tx: mpsc::Sender, +) -> Result<(), agent_client_protocol::Error> { + loop { + let (stream, _addr) = tcp_listener + .accept() + .await + .map_err(agent_client_protocol::Error::into_internal_error)?; + + let (to_mcp_client_tx, to_mcp_client_rx) = mpsc::channel(128); + + bridge_tx + .send(BridgeMessage::ConnectionReceived { + acp_id: acp_id.clone(), + actor: make_stdio_actor(stream, bridge_tx.clone(), to_mcp_client_rx), + connection: BridgeConnection::new(to_mcp_client_tx), + }) + .await + .map_err(|_| agent_client_protocol::Error::internal_error())?; + } +} + +fn make_stdio_actor( + stream: TcpStream, + bridge_tx: mpsc::Sender, + to_mcp_client_rx: mpsc::Receiver, +) -> BridgeConnectionActor { + let (read_half, write_half) = stream.into_split(); + let transport = + agent_client_protocol::ByteStreams::new(write_half.compat_write(), read_half.compat()); + BridgeConnectionActor::new(transport, bridge_tx, to_mcp_client_rx) +} diff --git a/src/agent-client-protocol/CHANGELOG.md b/src/agent-client-protocol/CHANGELOG.md index 6c0b977..bfdd37a 100644 --- a/src/agent-client-protocol/CHANGELOG.md +++ b/src/agent-client-protocol/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## [Unreleased] + +### Breaking Changes + +- **Removed `McpAcpTransport`** struct and its `MetaCapability` impl. MCP-over-ACP support is now advertised via `mcpCapabilities.acp` in `InitializeResponse`, not `_meta.symposium.mcp_acp_transport`. +- **Renamed `McpConnectRequest.acp_url` to `acp_id`** to match `McpServerAcp.id` and the MCP-over-ACP RFD. + +### Added + +- `McpConnectionTo::acp_id()` method. + +### Deprecated + +- `McpConnectionTo::acp_url()` — use `acp_id()` instead. + ## [0.11.1](https://github.com/agentclientprotocol/rust-sdk/compare/v0.11.0...v0.11.1) - 2026-04-21 ### Fixed diff --git a/src/agent-client-protocol/Cargo.toml b/src/agent-client-protocol/Cargo.toml index 14731dc..1b2db4d 100644 --- a/src/agent-client-protocol/Cargo.toml +++ b/src/agent-client-protocol/Cargo.toml @@ -19,6 +19,7 @@ unstable = [ "unstable_auth_methods", "unstable_boolean_config", "unstable_logout", + "unstable_mcp_over_acp", "unstable_message_id", "unstable_session_additional_directories", "unstable_session_fork", @@ -28,6 +29,7 @@ unstable = [ unstable_auth_methods = ["agent-client-protocol-schema/unstable_auth_methods"] unstable_boolean_config = ["agent-client-protocol-schema/unstable_boolean_config"] unstable_logout = ["agent-client-protocol-schema/unstable_logout"] +unstable_mcp_over_acp = ["agent-client-protocol-schema/unstable_mcp_over_acp"] unstable_message_id = ["agent-client-protocol-schema/unstable_message_id"] unstable_session_additional_directories = ["agent-client-protocol-schema/unstable_session_additional_directories"] unstable_session_fork = ["agent-client-protocol-schema/unstable_session_fork"] diff --git a/src/agent-client-protocol/src/capabilities.rs b/src/agent-client-protocol/src/capabilities.rs index 73a2b5c..42d096d 100644 --- a/src/agent-client-protocol/src/capabilities.rs +++ b/src/agent-client-protocol/src/capabilities.rs @@ -6,13 +6,18 @@ //! # Example //! //! ```rust,no_run -//! use agent_client_protocol::{MetaCapabilityExt, McpAcpTransport}; +//! use agent_client_protocol::{MetaCapability, MetaCapabilityExt}; //! # use agent_client_protocol::schema::InitializeResponse; //! # let init_response: InitializeResponse = unimplemented!(); //! -//! let response = init_response.add_meta_capability(McpAcpTransport); -//! if response.has_meta_capability(McpAcpTransport) { -//! // Agent supports MCP-over-ACP bridging +//! struct Proxy; +//! impl MetaCapability for Proxy { +//! fn key(&self) -> &'static str { "proxy" } +//! } +//! +//! let response = init_response.add_meta_capability(Proxy); +//! if response.has_meta_capability(Proxy) { +//! // Agent has the proxy capability //! } //! ``` @@ -33,19 +38,6 @@ pub trait MetaCapability { } } -/// The mcp_acp_transport capability - indicates support for MCP-over-ACP bridging. -/// -/// When present in `_meta.symposium.mcp_acp_transport`, signals that the agent -/// supports having MCP servers with `acp:UUID` transport proxied through the conductor. -#[derive(Debug)] -pub struct McpAcpTransport; - -impl MetaCapability for McpAcpTransport { - fn key(&self) -> &'static str { - "mcp_acp_transport" - } -} - /// Extension trait for checking and modifying capabilities in `InitializeRequest`. pub trait MetaCapabilityExt { /// Check if a capability is present in `_meta.symposium` @@ -140,15 +132,22 @@ mod tests { use crate::schema::{ClientCapabilities, ProtocolVersion}; use serde_json::json; + struct TestCapability; + impl MetaCapability for TestCapability { + fn key(&self) -> &'static str { + "test_cap" + } + } + #[test] fn test_add_capability_to_request() { let request = InitializeRequest::new(ProtocolVersion::LATEST); - let request = request.add_meta_capability(McpAcpTransport); + let request = request.add_meta_capability(TestCapability); - assert!(request.has_meta_capability(McpAcpTransport)); + assert!(request.has_meta_capability(TestCapability)); assert_eq!( - request.client_capabilities.meta.as_ref().unwrap()["symposium"]["mcp_acp_transport"], + request.client_capabilities.meta.as_ref().unwrap()["symposium"]["test_cap"], json!(true) ); } @@ -160,7 +159,7 @@ mod tests { "symposium".to_string(), json!({ "version": "1.0", - "mcp_acp_transport": true + "test_cap": true }), ); let client_capabilities = ClientCapabilities::new().meta(meta); @@ -168,20 +167,20 @@ mod tests { let request = InitializeRequest::new(ProtocolVersion::LATEST) .client_capabilities(client_capabilities); - let request = request.remove_meta_capability(McpAcpTransport); + let request = request.remove_meta_capability(TestCapability); - assert!(!request.has_meta_capability(McpAcpTransport)); + assert!(!request.has_meta_capability(TestCapability)); } #[test] fn test_add_capability_to_response() { let response = InitializeResponse::new(ProtocolVersion::LATEST); - let response = response.add_meta_capability(McpAcpTransport); + let response = response.add_meta_capability(TestCapability); - assert!(response.has_meta_capability(McpAcpTransport)); + assert!(response.has_meta_capability(TestCapability)); assert_eq!( - response.agent_capabilities.meta.as_ref().unwrap()["symposium"]["mcp_acp_transport"], + response.agent_capabilities.meta.as_ref().unwrap()["symposium"]["test_cap"], json!(true) ); } diff --git a/src/agent-client-protocol/src/mcp_server/active_session.rs b/src/agent-client-protocol/src/mcp_server/active_session.rs index 7bb1c5c..03004f7 100644 --- a/src/agent-client-protocol/src/mcp_server/active_session.rs +++ b/src/agent-client-protocol/src/mcp_server/active_session.rs @@ -20,8 +20,8 @@ use std::sync::Arc; /// (see [`ConnectionTo::add_dynamic_handler`]) and handles MCP-over-ACP messages /// with the appropriate ACP url. pub(super) struct McpActiveSession { - /// The ACP URL created for this session - acp_url: String, + /// The ACP identifier created for this session + acp_id: String, /// The MCP server we are managing mcp_connect: Arc>, @@ -34,9 +34,9 @@ impl McpActiveSession where Counterpart: HasPeer, { - pub fn new(acp_url: String, mcp_connect: Arc>) -> Self { + pub fn new(acp_id: String, mcp_connect: Arc>) -> Self { Self { - acp_url, + acp_id, mcp_connect, connections: FxHashMap::default(), } @@ -51,7 +51,7 @@ where acp_connection: &ConnectionTo, ) -> Result)>, crate::Error> { // Check that this is for our MCP server - if request.acp_url != self.acp_url { + if request.acp_id != self.acp_id { return Ok(Handled::No { message: (request, responder), retry: false, @@ -110,7 +110,7 @@ where // Get the MCP server component let spawned_server = self.mcp_connect.connect(McpConnectionTo { - acp_url: request.acp_url.clone(), + acp_id: request.acp_id.clone(), connection: acp_connection.clone(), }); diff --git a/src/agent-client-protocol/src/mcp_server/context.rs b/src/agent-client-protocol/src/mcp_server/context.rs index 7905f20..c2ab2b4 100644 --- a/src/agent-client-protocol/src/mcp_server/context.rs +++ b/src/agent-client-protocol/src/mcp_server/context.rs @@ -3,14 +3,20 @@ use crate::{ConnectionTo, role::Role}; /// Context about the ACP and MCP connection available to an MCP server. #[derive(Clone, Debug)] pub struct McpConnectionTo { - pub(super) acp_url: String, + pub(super) acp_id: String, pub(super) connection: ConnectionTo, } impl McpConnectionTo { + /// The ACP identifier for this MCP server (e.g., `"acp:UUID"`). + pub fn acp_id(&self) -> String { + self.acp_id.clone() + } + /// The `acp:UUID` that was given. + #[deprecated(since = "0.12.0", note = "renamed to `acp_id()`")] pub fn acp_url(&self) -> String { - self.acp_url.clone() + self.acp_id() } /// The host connection context. diff --git a/src/agent-client-protocol/src/mcp_server/server.rs b/src/agent-client-protocol/src/mcp_server/server.rs index 5d5fd3f..f746ba4 100644 --- a/src/agent-client-protocol/src/mcp_server/server.rs +++ b/src/agent-client-protocol/src/mcp_server/server.rs @@ -48,8 +48,8 @@ pub struct McpServer { /// The host role that is serving up this MCP server phantom: PhantomData, - /// The ACP URL we assigned for this mcp server; always unique - acp_url: String, + /// The ACP identifier we assigned for this mcp server; always unique + acp_id: String, /// The "connect" instance connect: Arc>, @@ -69,7 +69,7 @@ impl std::fmt::Debug fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("McpServer") .field("phantom", &self.phantom) - .field("acp_url", &self.acp_url) + .field("acp_id", &self.acp_id) .field("responder", &self.responder) .finish_non_exhaustive() } @@ -94,7 +94,7 @@ where pub fn new(c: impl McpServerConnect, responder: Run) -> Self { McpServer { phantom: PhantomData, - acp_url: format!("acp:{}", Uuid::new_v4()), + acp_id: format!("acp:{}", Uuid::new_v4()), connect: Arc::new(c), responder, } @@ -107,11 +107,11 @@ where { let Self { phantom: _, - acp_url, + acp_id, connect, responder, } = self; - (McpNewSessionHandler::new(acp_url, connect), responder) + (McpNewSessionHandler::new(acp_id, connect), responder) } } @@ -120,7 +120,7 @@ pub(crate) struct McpNewSessionHandler where Counterpart: HasPeer, { - acp_url: String, + acp_id: String, connect: Arc>, active_session: McpActiveSession, } @@ -129,10 +129,10 @@ impl McpNewSessionHandler where Counterpart: HasPeer, { - pub fn new(acp_url: String, connect: Arc>) -> Self { + pub fn new(acp_id: String, connect: Arc>) -> Self { Self { - active_session: McpActiveSession::new(acp_url.clone(), connect.clone()), - acp_url, + active_session: McpActiveSession::new(acp_id.clone(), connect.clone()), + acp_id, connect, } } @@ -140,7 +140,7 @@ where /// Modify the new session request to include this MCP server. fn modify_new_session_request(&self, request: &mut NewSessionRequest) { request.mcp_servers.push(crate::schema::McpServer::Http( - crate::schema::McpServerHttp::new(self.connect.name(), self.acp_url.clone()), + crate::schema::McpServerHttp::new(self.connect.name(), self.acp_id.clone()), )); } } @@ -208,7 +208,7 @@ where client: impl ConnectTo, ) -> Result<(), crate::Error> { let Self { - acp_url, + acp_id, connect, responder, phantom: _, @@ -229,7 +229,7 @@ where .with_spawned(async move |connection_to_client| { let spawned_server: DynConnectTo = connect.connect(McpConnectionTo { - acp_url, + acp_id, connection: connection_to_client.clone(), }); diff --git a/src/agent-client-protocol/src/schema/proxy_protocol.rs b/src/agent-client-protocol/src/schema/proxy_protocol.rs index aa35dd1..da18da4 100644 --- a/src/agent-client-protocol/src/schema/proxy_protocol.rs +++ b/src/agent-client-protocol/src/schema/proxy_protocol.rs @@ -79,8 +79,8 @@ pub const METHOD_MCP_CONNECT_REQUEST: &str = "_mcp/connect"; #[derive(Debug, Clone, Serialize, Deserialize, crate::JsonRpcRequest)] #[request(method = "_mcp/connect", response = McpConnectResponse, crate = crate)] pub struct McpConnectRequest { - /// The ACP URL to connect to (e.g., "acp:uuid") - pub acp_url: String, + /// The ACP identifier for the server (e.g., "acp:uuid"), matching `McpServerAcp.id` + pub acp_id: String, /// Optional metadata #[serde(skip_serializing_if = "Option::is_none")]