From 00a12fbdacded4ea7372d7db4cab40e9ca1ff0fd Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Mon, 11 Aug 2025 00:05:35 +0000 Subject: [PATCH 1/5] handle streaming httpclient error and closure in same asyncio task and context --- eval_protocol/mcp/client/connection.py | 4 ++-- eval_protocol/mcp/execution/base_policy.py | 2 +- eval_protocol/mcp/execution/manager.py | 23 +++++++++++++++++++--- eval_protocol/mcp/session/manager.py | 1 + eval_protocol/mcp_env.py | 8 +------- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/eval_protocol/mcp/client/connection.py b/eval_protocol/mcp/client/connection.py index 97943a23..6d984cd2 100644 --- a/eval_protocol/mcp/client/connection.py +++ b/eval_protocol/mcp/client/connection.py @@ -544,10 +544,10 @@ async def close_session(self, session: MCPSession) -> None: await session._exit_stack.aclose() except asyncio.CancelledError: # Handle cancellation gracefully (especially important for Python 3.12) - logger.debug(f"Session {session.session_id} close was cancelled") + logger.error(f"Session {session.session_id} close was cancelled") except Exception as e: # Hitting this error, probably because of use of threads: "Attempted to exit cancel scope in a different task than it was entered in" - logger.debug(f"Error closing session {session.session_id}: {e}") + logger.error(f"Error closing session {session.session_id}: {e}") finally: session._exit_stack = None session._mcp_session = None diff --git a/eval_protocol/mcp/execution/base_policy.py b/eval_protocol/mcp/execution/base_policy.py index 5b5ef4c8..17f67da3 100644 --- a/eval_protocol/mcp/execution/base_policy.py +++ b/eval_protocol/mcp/execution/base_policy.py @@ -220,7 +220,7 @@ async def _generate_live_tool_calls( return mcp_tool_calls, usage_stats else: # No tool calls in response - this is normal when episode ends or LLM provides only text - logger.info(f"No tool calls in response for env {env_index}, message content: {message.get('content')}") + logger.debug(f"No tool calls in response for env {env_index}, message content: {message.get('content')}") return [ MCPToolCall( tool_name="_no_tool_call", diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index 749a8d1f..950ecef6 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -98,10 +98,12 @@ async def execute_rollouts( async def _execute_with_semaphore(idx): async with semaphore: - return await self._execute_rollout( + result = await self._execute_rollout( envs, policy, idx, steps, openai_logger, recording_mode, playback_mode, start_time ) + return result + tasks = [_execute_with_semaphore(i) for i in range(envs.n)] # exceptions will be try catched inside single _execute_rollout trajectories = await asyncio.gather(*tasks) @@ -114,7 +116,7 @@ async def _execute_with_semaphore(idx): shared_tool_schema = envs.tool_schemas # Clean up - await envs.close() + # await envs.close() # Enhanced reporting with control plane info successful = sum(1 for traj in trajectories if traj.total_reward > 0) @@ -227,6 +229,7 @@ async def _execute_rollout( "total_tokens": 0, }, ) + failure_reason = None try: current_observation, tool_schema = await envs.reset(session) system_prompt = dataset_row.system_prompt @@ -467,11 +470,25 @@ async def _execute_rollout( logger.info( f"✅ Rollout {rollout_idx} completed: {trajectory.steps} steps, reward: {trajectory.total_reward:.2f}, termination: {trajectory.termination_reason}, in thread {threading.current_thread().name}" ) + + except asyncio.CancelledError: + logger.error(f"🚨 AsyncIO Cancel Error in roll out {rollout_idx}", exc_info=True) + failure_reason = "asyncio context cancelled" except Exception as e: logger.error(f"🚨 Error in rollout {rollout_idx}: {e}", exc_info=True) + failure_reason = str(e) + finally: trajectory.terminated = True trajectory.termination_reason = TerminationReason.ERROR - trajectory.control_plane_summary.update({"error_message": str(e)}) + trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"}) + try: + await envs.connection_manager.reset_session(session) + except: + logger.error(f"Error resetting session {session.session_id}") + try: + await envs.connection_manager.close_session(session) + except: + logger.error(f"Error closing session {session.session_id}") return trajectory async def _get_control_plane_status(self, session) -> Optional[Dict[str, Any]]: diff --git a/eval_protocol/mcp/session/manager.py b/eval_protocol/mcp/session/manager.py index 5bd36e5a..71c23af0 100644 --- a/eval_protocol/mcp/session/manager.py +++ b/eval_protocol/mcp/session/manager.py @@ -58,6 +58,7 @@ async def reset(self, session: MCPSession) -> Tuple[Any, List[Dict]]: This is thread-safe and can be called from worker threads. """ + await self.connection_manager.initialize_session(session) # Get available tools from MCP server tool_schemas = await self.connection_manager.discover_tools(session) diff --git a/eval_protocol/mcp_env.py b/eval_protocol/mcp_env.py index 05ea8414..1aab313a 100644 --- a/eval_protocol/mcp_env.py +++ b/eval_protocol/mcp_env.py @@ -86,7 +86,7 @@ async def reset_mcp_sessions(envs: GeneralMCPVectorEnv): Reset mcp server sessions """ tasks = [envs.connection_manager.reset_session(session) for session in envs.sessions] - await asyncio.gather(*tasks) + await asyncio.gather(*tasks, return_exceptions=True) async def make( @@ -236,12 +236,6 @@ async def make( sessions.append(session) mcp_envs = GeneralMCPVectorEnv(sessions, dataset_rows, user_prompt_formatter) - tasks = [mcp_envs.connection_manager.initialize_session(session) for session in sessions] - await asyncio.gather(*tasks) - - if reset_sessions: - await reset_mcp_sessions(mcp_envs) - return mcp_envs From 5ea7e08d1ce6828bdc1d2457c024ca1629c7e860 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Mon, 11 Aug 2025 00:13:02 +0000 Subject: [PATCH 2/5] revert ep.make back --- eval_protocol/mcp/execution/manager.py | 7 ++++--- eval_protocol/mcp_env.py | 12 +++++------- .../default_mcp_gym_rollout_processor.py | 2 +- .../tests/test_record_and_replay_e2e.py | 16 ++++++++-------- .../tests/test_cliff_walking_e2e.py | 16 ++++++++-------- .../test_basic_functionality.py | 2 +- examples/frozen_lake_mcp/test_multi_session.py | 2 +- examples/frozen_lake_mcp/test_seed_logging.py | 2 +- .../tests/test_frozen_lake_e2e.py | 18 +++++++++--------- .../test_lunar_lander_conda.py | 2 +- .../tests/test_lunar_lander_e2e.py | 16 ++++++++-------- examples/tau2_mcp/tests/test_tau2_e2e.py | 16 ++++++++-------- .../local_testing/test_north_star.py | 2 +- .../taxi_mcp_complete/tests/test_taxi_e2e.py | 8 ++++---- tests/test_parallel_rollouts.py | 4 ++-- tests/test_url_handling.py | 10 +++++----- 16 files changed, 67 insertions(+), 68 deletions(-) diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index 950ecef6..22b8ee83 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -478,9 +478,10 @@ async def _execute_rollout( logger.error(f"🚨 Error in rollout {rollout_idx}: {e}", exc_info=True) failure_reason = str(e) finally: - trajectory.terminated = True - trajectory.termination_reason = TerminationReason.ERROR - trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"}) + if failure_reason: + trajectory.terminated = True + trajectory.termination_reason = TerminationReason.ERROR + trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"}) try: await envs.connection_manager.reset_session(session) except: diff --git a/eval_protocol/mcp_env.py b/eval_protocol/mcp_env.py index 1aab313a..5ec67658 100644 --- a/eval_protocol/mcp_env.py +++ b/eval_protocol/mcp_env.py @@ -17,7 +17,7 @@ policy = ep.FireworksPolicy(model_id="accounts/fireworks/models/qwen3-235b-a22b") # Create environments with evaluation_rows configuration - envs = await ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows) + envs = ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows) # Execute tool-calling rollouts evaluation_rows = await ep.rollout(envs, policy=policy, steps=512) @@ -89,7 +89,7 @@ async def reset_mcp_sessions(envs: GeneralMCPVectorEnv): await asyncio.gather(*tasks, return_exceptions=True) -async def make( +def make( env_spec: str, evaluation_rows: Optional[List[EvaluationRow]] = None, dataset: Optional[List[Dict]] = None, @@ -97,7 +97,6 @@ async def make( seeds: Optional[List[int]] = None, model_id: str = "unknown", user_prompt_formatter: Optional[Callable] = None, - reset_sessions: bool = False, ) -> GeneralMCPVectorEnv: """ Create general MCP environments driven by evaluation_rows configuration. @@ -110,20 +109,19 @@ async def make( seeds: List of seeds (for backward compatibility) model_id: Model identifier user_prompt_formatter: Optional callback for formatting user prompts - reset_sessions: Whether to reset sessions before returning the environment Returns: General MCP environment that works with any MCP server Example: # EvaluationRow approach (preferred) - envs = await ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows) + envs = ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows) # Dataset approach (backward compatibility) - envs = await ep.make("http://localhost:8000/mcp", dataset=dataset) + envs = ep.make("http://localhost:8000/mcp", dataset=dataset) # Legacy approach (backward compatibility) - envs = await ep.make("http://localhost:8000/mcp", n=10, seeds=seeds) + envs = ep.make("http://localhost:8000/mcp", n=10, seeds=seeds) """ # Parse environment specification - make sure URL format is correct base_url = env_spec diff --git a/eval_protocol/pytest/default_mcp_gym_rollout_processor.py b/eval_protocol/pytest/default_mcp_gym_rollout_processor.py index d7cba33d..d252dac5 100644 --- a/eval_protocol/pytest/default_mcp_gym_rollout_processor.py +++ b/eval_protocol/pytest/default_mcp_gym_rollout_processor.py @@ -213,7 +213,7 @@ async def default_mcp_gym_rollout_processor( ) # Create MCP environments directly from evaluation_rows - envs = await ep.make( + envs = ep.make( "http://localhost:9700/mcp/", evaluation_rows=rows, model_id=policy.model_id, diff --git a/examples/blackjack_mcp/tests/test_record_and_replay_e2e.py b/examples/blackjack_mcp/tests/test_record_and_replay_e2e.py index b77b7daa..69552c53 100644 --- a/examples/blackjack_mcp/tests/test_record_and_replay_e2e.py +++ b/examples/blackjack_mcp/tests/test_record_and_replay_e2e.py @@ -215,7 +215,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_ assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=blackjack_dataset, model_id=playback_policy.model_id, @@ -250,7 +250,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_ assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create environments - envs = await ep.make( + envs = ep.make( "http://localhost:9500/mcp/", dataset=blackjack_dataset, model_id=policy.model_id, @@ -310,7 +310,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_ assert playback_policy.is_playback_mode(), "Should be in playback mode" # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=blackjack_dataset, model_id=playback_policy.model_id, @@ -462,7 +462,7 @@ async def test_blackjack_step_by_step(conda_isolation_recording_file): ] # Create environment pointing to conda-isolated server - envs = await ep.make( + envs = ep.make( f"http://localhost:{port}/mcp/", dataset=test_dataset, model_id=policy.model_id, @@ -570,7 +570,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording policy = create_blackjack_static_policy(action_sequence=["HIT", "HIT", "STICK"]) # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -992,7 +992,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_dataset, model_id=playback_policy.model_id, @@ -1033,7 +1033,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1149,7 +1149,7 @@ async def test_control_plane_state_querying(multi_env_dataset): policy = create_blackjack_static_policy(action_sequence=["HIT", "STAND"]) # Create environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing model_id=policy.model_id, diff --git a/examples/cliff_walking_mcp/tests/test_cliff_walking_e2e.py b/examples/cliff_walking_mcp/tests/test_cliff_walking_e2e.py index 277a3457..fc327f62 100644 --- a/examples/cliff_walking_mcp/tests/test_cliff_walking_e2e.py +++ b/examples/cliff_walking_mcp/tests/test_cliff_walking_e2e.py @@ -224,7 +224,7 @@ async def test_production_server_record_and_replay( assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=cliff_walking_dataset, model_id=playback_policy.model_id, @@ -259,7 +259,7 @@ async def test_production_server_record_and_replay( assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create environments - envs = await ep.make( + envs = ep.make( "http://localhost:9500/mcp/", dataset=cliff_walking_dataset, model_id=policy.model_id, @@ -318,7 +318,7 @@ async def test_production_server_record_and_replay( assert playback_policy.is_playback_mode(), "Should be in playback mode" # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=cliff_walking_dataset, model_id=playback_policy.model_id, @@ -471,7 +471,7 @@ async def test_cliff_walking_step_by_step(conda_isolation_recording_file): ] # Create environment pointing to conda-isolated server - envs = await ep.make( + envs = ep.make( f"http://localhost:{port}/mcp/", dataset=test_dataset, model_id=policy.model_id, @@ -589,7 +589,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording ) # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1018,7 +1018,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_dataset, model_id=playback_policy.model_id, @@ -1059,7 +1059,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1178,7 +1178,7 @@ async def test_control_plane_state_querying(multi_env_dataset): policy = create_cliff_walking_static_policy(action_sequence=["UP", "UP"]) # Create environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing model_id=policy.model_id, diff --git a/examples/frozen_lake_mcp/test_basic_functionality.py b/examples/frozen_lake_mcp/test_basic_functionality.py index a4a310ad..296d611f 100644 --- a/examples/frozen_lake_mcp/test_basic_functionality.py +++ b/examples/frozen_lake_mcp/test_basic_functionality.py @@ -46,7 +46,7 @@ async def test_basic_server_functionality(): policy = ep.FireworksPolicy(model_id="accounts/fireworks/models/qwen3-235b-a22b", temperature=0.2) # Create environment pointing to local server - envs = await ep.make("http://localhost:8000/mcp/", dataset=test_dataset, model_id=policy.model_id) + envs = ep.make("http://localhost:8000/mcp/", dataset=test_dataset, model_id=policy.model_id) print("✅ Successfully connected to MCP server") # Test 2: Try to make tool calls (we'll simulate this for now) diff --git a/examples/frozen_lake_mcp/test_multi_session.py b/examples/frozen_lake_mcp/test_multi_session.py index 529f543f..08140af8 100644 --- a/examples/frozen_lake_mcp/test_multi_session.py +++ b/examples/frozen_lake_mcp/test_multi_session.py @@ -60,7 +60,7 @@ async def test_multi_session(): try: # Create environments (assumes server is running on localhost:8000) - envs = await ep.make( + envs = ep.make( "http://localhost:8000/mcp/", dataset=test_dataset, model_id=policy.model_id, diff --git a/examples/frozen_lake_mcp/test_seed_logging.py b/examples/frozen_lake_mcp/test_seed_logging.py index 248c004b..edb1b272 100644 --- a/examples/frozen_lake_mcp/test_seed_logging.py +++ b/examples/frozen_lake_mcp/test_seed_logging.py @@ -30,7 +30,7 @@ async def test_seed_logging(): try: # Create environment pointing to our server print("🔌 Connecting to server...") - envs = await ep.make("http://localhost:9600/mcp/", dataset=dataset, model_id="test") + envs = ep.make("http://localhost:9600/mcp/", dataset=dataset, model_id="test") print(f"✅ Created envs: {len(envs.sessions)} sessions") # Reset environments to trigger session creation diff --git a/examples/frozen_lake_mcp/tests/test_frozen_lake_e2e.py b/examples/frozen_lake_mcp/tests/test_frozen_lake_e2e.py index 3ce71f3e..e2c4c78e 100644 --- a/examples/frozen_lake_mcp/tests/test_frozen_lake_e2e.py +++ b/examples/frozen_lake_mcp/tests/test_frozen_lake_e2e.py @@ -232,7 +232,7 @@ async def test_production_server_record_and_replay(production_server, frozen_lak assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=frozen_lake_dataset, model_id=playback_policy.model_id, @@ -268,7 +268,7 @@ async def test_production_server_record_and_replay(production_server, frozen_lak assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create environments - envs = await ep.make( + envs = ep.make( "http://localhost:9500/mcp/", dataset=frozen_lake_dataset, model_id=policy.model_id, @@ -335,7 +335,7 @@ async def test_production_server_record_and_replay(production_server, frozen_lak assert playback_policy.is_playback_mode(), "Should be in playback mode" # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=frozen_lake_dataset, model_id=playback_policy.model_id, @@ -488,7 +488,7 @@ async def test_frozen_lake_step_by_step(conda_isolation_recording_file): ] # Create environment pointing to conda-isolated server - envs = await ep.make( + envs = ep.make( f"http://localhost:{port}/mcp/", dataset=test_dataset, model_id=policy.model_id, @@ -593,7 +593,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording policy = create_frozen_lake_static_policy(action_sequence=["RIGHT", "RIGHT", "RIGHT", "DOWN", "DOWN", "DOWN"]) # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1071,7 +1071,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_dataset, model_id=playback_policy.model_id, @@ -1113,7 +1113,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1232,7 +1232,7 @@ async def test_control_plane_state_querying(multi_env_dataset): policy = create_frozen_lake_static_policy(action_sequence=["RIGHT", "DOWN"]) # Create environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing model_id=policy.model_id, @@ -1283,7 +1283,7 @@ async def _run_playback_only(recording_file: str, dataset: List[Dict], server_ur assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( server_url, dataset=dataset, model_id=playback_policy.model_id, diff --git a/examples/lunar_lander_mcp/test_lunar_lander_conda.py b/examples/lunar_lander_mcp/test_lunar_lander_conda.py index 9e88f92b..98d3c491 100644 --- a/examples/lunar_lander_mcp/test_lunar_lander_conda.py +++ b/examples/lunar_lander_mcp/test_lunar_lander_conda.py @@ -119,7 +119,7 @@ async def test_lunar_lander_with_conda_isolation(): ] # Configure for MCP environment - envs = await ep.make("http://localhost:9004/mcp", dataset=dataset) + envs = ep.make("http://localhost:9004/mcp", dataset=dataset) # Simple policy that takes random actions class RandomLunarLanderPolicy: diff --git a/examples/lunar_lander_mcp/tests/test_lunar_lander_e2e.py b/examples/lunar_lander_mcp/tests/test_lunar_lander_e2e.py index 7f187cac..723b68bb 100644 --- a/examples/lunar_lander_mcp/tests/test_lunar_lander_e2e.py +++ b/examples/lunar_lander_mcp/tests/test_lunar_lander_e2e.py @@ -235,7 +235,7 @@ async def test_production_server_record_and_replay(production_server, lunar_land assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=lunar_lander_dataset, model_id=playback_policy.model_id, @@ -271,7 +271,7 @@ async def test_production_server_record_and_replay(production_server, lunar_land assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create environments - envs = await ep.make( + envs = ep.make( "http://localhost:9500/mcp/", dataset=lunar_lander_dataset, model_id=policy.model_id, @@ -332,7 +332,7 @@ async def test_production_server_record_and_replay(production_server, lunar_land assert playback_policy.is_playback_mode(), "Should be in playback mode" # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=lunar_lander_dataset, model_id=playback_policy.model_id, @@ -487,7 +487,7 @@ async def test_lunar_lander_step_by_step(conda_isolation_recording_file): ] # Create environment pointing to conda-isolated server - envs = await ep.make( + envs = ep.make( f"http://localhost:{port}/mcp/", dataset=test_dataset, model_id=policy.model_id, @@ -626,7 +626,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording policy = create_lunar_lander_static_policy() # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1076,7 +1076,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_dataset, model_id=playback_policy.model_id, @@ -1118,7 +1118,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset, model_id=policy.model_id, @@ -1228,7 +1228,7 @@ async def test_control_plane_state_querying(multi_env_dataset): policy = create_lunar_lander_static_policy(action_sequence=["FIRE_MAIN", "FIRE_LEFT"]) # Create environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing model_id=policy.model_id, diff --git a/examples/tau2_mcp/tests/test_tau2_e2e.py b/examples/tau2_mcp/tests/test_tau2_e2e.py index f31584dd..cb71fab7 100644 --- a/examples/tau2_mcp/tests/test_tau2_e2e.py +++ b/examples/tau2_mcp/tests/test_tau2_e2e.py @@ -886,7 +886,7 @@ async def test_fireworks_multi_airline_environment_sessions( assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_airline_dataset, model_id=playback_policy.model_id, @@ -928,7 +928,7 @@ async def test_fireworks_multi_airline_environment_sessions( assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_airline_dataset, model_id=policy.model_id, @@ -1029,7 +1029,7 @@ async def test_entire_airline_dataset(multi_env_airline_full_dataset, fireworks_ assert playback_policy.is_playback_mode(), "Should be in playback mode in CI" # Create environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=multi_env_airline_full_dataset, model_id=playback_policy.model_id, @@ -1076,7 +1076,7 @@ async def test_entire_airline_dataset(multi_env_airline_full_dataset, fireworks_ assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_airline_full_dataset, model_id=policy.model_id, @@ -1425,7 +1425,7 @@ async def test_fireworks_multi_mock_environment_sessions( server = _create_test_server(8021, domain="mock") # Use unique port for mock try: - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_mock_dataset, model_id=playback_policy.model_id, @@ -1469,7 +1469,7 @@ async def test_fireworks_multi_mock_environment_sessions( assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_mock_dataset, model_id=policy.model_id, @@ -1559,7 +1559,7 @@ async def test_fireworks_multi_retail_environment_sessions( server = _create_test_server(8022, domain="retail") # Use unique port for retail try: - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_retail_dataset, model_id=playback_policy.model_id, @@ -1603,7 +1603,7 @@ async def test_fireworks_multi_retail_environment_sessions( assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create multiple environments - envs = await ep.make( + envs = ep.make( f"http://localhost:{server.port}/mcp/", dataset=multi_env_retail_dataset, model_id=policy.model_id, diff --git a/examples/taxi_mcp_complete/local_testing/test_north_star.py b/examples/taxi_mcp_complete/local_testing/test_north_star.py index b5d84006..3721b4e7 100644 --- a/examples/taxi_mcp_complete/local_testing/test_north_star.py +++ b/examples/taxi_mcp_complete/local_testing/test_north_star.py @@ -57,7 +57,7 @@ async def test_north_star_interface(): print(f"✅ Policy created in {'playback' if policy.is_playback_mode() else 'live'} mode") # Create environments - envs = await ep.make("http://localhost:8000/mcp/", dataset=dataset, model_id=policy.model_id) + envs = ep.make("http://localhost:8000/mcp/", dataset=dataset, model_id=policy.model_id) print("✅ MCP environments created successfully") # Run rollout - same API for both modes! diff --git a/examples/taxi_mcp_complete/tests/test_taxi_e2e.py b/examples/taxi_mcp_complete/tests/test_taxi_e2e.py index 337f9fd1..5fd24fcc 100644 --- a/examples/taxi_mcp_complete/tests/test_taxi_e2e.py +++ b/examples/taxi_mcp_complete/tests/test_taxi_e2e.py @@ -165,7 +165,7 @@ async def test_production_server_record_and_replay(production_server, taxi_datas assert not policy.is_playback_mode(), "Should be in recording mode initially" # Create environments - envs = await ep.make("http://localhost:9500/mcp/", dataset=taxi_dataset, model_id=policy.model_id) + envs = ep.make("http://localhost:9500/mcp/", dataset=taxi_dataset, model_id=policy.model_id) # Record evaluation rows (Taxi typically needs more steps) start_time = time.time() @@ -196,7 +196,7 @@ async def test_production_server_record_and_replay(production_server, taxi_datas assert playback_policy.is_playback_mode(), "Should be in playback mode" # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9500/mcp/", dataset=taxi_dataset, model_id=playback_policy.model_id, @@ -242,7 +242,7 @@ async def test_simulation_server_record_and_replay(simulation_server, taxi_datas ) # Create environments pointing to simulation server - envs = await ep.make("http://localhost:9501/mcp/", dataset=taxi_dataset, model_id=policy.model_id) + envs = ep.make("http://localhost:9501/mcp/", dataset=taxi_dataset, model_id=policy.model_id) # Record evaluation rows start_time = time.time() @@ -266,7 +266,7 @@ async def test_simulation_server_record_and_replay(simulation_server, taxi_datas ) # Create new environments for playback - playback_envs = await ep.make( + playback_envs = ep.make( "http://localhost:9501/mcp/", dataset=taxi_dataset, model_id=playback_policy.model_id, diff --git a/tests/test_parallel_rollouts.py b/tests/test_parallel_rollouts.py index ae12a9c0..ef5c83a6 100644 --- a/tests/test_parallel_rollouts.py +++ b/tests/test_parallel_rollouts.py @@ -138,7 +138,7 @@ async def _test_seed_handling_and_type_compatibility_impl(): ) # 3. Test that environments are created with proper seed isolation - envs = await ep.make("http://127.0.0.1:8001/mcp/", dataset=dataset) + envs = ep.make("http://127.0.0.1:8001/mcp/", dataset=dataset) # Verify we have the right number of environments assert len(envs.sessions) == len(test_seeds), f"Expected {len(test_seeds)} sessions, got {len(envs.sessions)}" @@ -273,7 +273,7 @@ async def _run_simplified_compatibility_test(): ) # This should work even without a server (just creates session objects) - envs = await ep.make("http://127.0.0.1:8001/mcp/", dataset=dataset) + envs = ep.make("http://127.0.0.1:8001/mcp/", dataset=dataset) assert len(envs.sessions) == len(test_seeds) print("✅ Environment creation works") diff --git a/tests/test_url_handling.py b/tests/test_url_handling.py index fbd71b28..09a8e8ff 100644 --- a/tests/test_url_handling.py +++ b/tests/test_url_handling.py @@ -6,11 +6,11 @@ import eval_protocol as ep -# Sync tests for the await ep.make() function +# Sync tests for the ep.make() function @pytest.mark.asyncio async def test_mcp_env_make_appends_trailing_slash(): """ - Verify that await ep.make() appends a trailing slash to the MCP server URL if it's missing. + Verify that ep.make() appends a trailing slash to the MCP server URL if it's missing. This prevents 307 redirects that can break HTTP clients. """ base_url = "http://localhost:8000/mcp" @@ -22,7 +22,7 @@ async def test_mcp_env_make_appends_trailing_slash(): ) as mock_init: mock_init.return_value = None - envs = await ep.make(base_url, n=1, seeds=[42]) + envs = ep.make(base_url, n=1, seeds=[42]) mock_init.assert_awaited_once() @@ -33,7 +33,7 @@ async def test_mcp_env_make_appends_trailing_slash(): @pytest.mark.asyncio async def test_mcp_env_make_keeps_existing_trailing_slash(): """ - Verify that await ep.make() does not add an extra slash if one is already present. + Verify that ep.make() does not add an extra slash if one is already present. """ base_url = "http://localhost:8000/mcp/" @@ -43,7 +43,7 @@ async def test_mcp_env_make_keeps_existing_trailing_slash(): ) as mock_init: mock_init.return_value = None - envs = await ep.make(base_url, n=1, seeds=[42]) + envs = ep.make(base_url, n=1, seeds=[42]) mock_init.assert_awaited_once() From e26d4b7420039aff6bae800b36e4b6deea11e586 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Mon, 11 Aug 2025 00:28:27 +0000 Subject: [PATCH 3/5] fix ut: --- tests/test_url_handling.py | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/tests/test_url_handling.py b/tests/test_url_handling.py index 09a8e8ff..542b0f05 100644 --- a/tests/test_url_handling.py +++ b/tests/test_url_handling.py @@ -1,4 +1,4 @@ -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest from werkzeug.wrappers import Response @@ -7,8 +7,7 @@ # Sync tests for the ep.make() function -@pytest.mark.asyncio -async def test_mcp_env_make_appends_trailing_slash(): +def test_mcp_env_make_appends_trailing_slash(): """ Verify that ep.make() appends a trailing slash to the MCP server URL if it's missing. This prevents 307 redirects that can break HTTP clients. @@ -16,36 +15,19 @@ async def test_mcp_env_make_appends_trailing_slash(): base_url = "http://localhost:8000/mcp" corrected_url = "http://localhost:8000/mcp/" - with patch( - "eval_protocol.mcp.client.connection.MCPConnectionManager.initialize_session", - new_callable=AsyncMock, - ) as mock_init: - mock_init.return_value = None - - envs = ep.make(base_url, n=1, seeds=[42]) - - mock_init.assert_awaited_once() + envs = ep.make(base_url, n=1, seeds=[42]) assert len(envs.sessions) == 1 assert envs.sessions[0].base_url == corrected_url -@pytest.mark.asyncio -async def test_mcp_env_make_keeps_existing_trailing_slash(): +def test_mcp_env_make_keeps_existing_trailing_slash(): """ Verify that ep.make() does not add an extra slash if one is already present. """ base_url = "http://localhost:8000/mcp/" - with patch( - "eval_protocol.mcp.client.connection.MCPConnectionManager.initialize_session", - new_callable=AsyncMock, - ) as mock_init: - mock_init.return_value = None - - envs = ep.make(base_url, n=1, seeds=[42]) - - mock_init.assert_awaited_once() + envs = ep.make(base_url, n=1, seeds=[42]) assert len(envs.sessions) == 1 # The session's base_url should remain unchanged From 3319e4763083a18926cbf7d9565d40d8dc368235 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Mon, 11 Aug 2025 01:25:28 +0000 Subject: [PATCH 4/5] add interrupt termination reason --- eval_protocol/mcp/execution/manager.py | 8 +++++--- eval_protocol/types/types.py | 3 +++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index 22b8ee83..e6544757 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -178,8 +178,11 @@ async def _execute_with_semaphore(idx): TerminationReason.USER_STOP, }: evaluation_rows[idx].rollout_status.status = "finished" - elif trajectory.termination_reason == TerminationReason.MAX_STEPS: + elif trajectory.termination_reason in {TerminationReason.MAX_STEPS, TerminationReason.INTERRUPTED}: evaluation_rows[idx].rollout_status.status = "stopped" + evaluation_rows[idx].rollout_status.error_message = trajectory.control_plane_summary.get( + "termination_reason", trajectory.termination_reason + ) else: evaluation_rows[idx].rollout_status.status = "error" evaluation_rows[idx].rollout_status.error_message = trajectory.control_plane_summary.get( @@ -315,8 +318,7 @@ async def _execute_rollout( # If there's no user simulator, no tool call means policy failed and we should terminate the rollout elif tool_calls[0].tool_name in ["_playback_terminate", "_no_tool_call"]: trajectory.terminated = True - trajectory.termination_reason = TerminationReason.ERROR - trajectory.control_plane_summary.update({"error_message": "No expected tool call"}) + trajectory.termination_reason = TerminationReason.INTERRUPTED break # Execute each tool call sequentially diff --git a/eval_protocol/types/types.py b/eval_protocol/types/types.py index b9d4a19e..85bdf5e9 100644 --- a/eval_protocol/types/types.py +++ b/eval_protocol/types/types.py @@ -11,11 +11,14 @@ class TerminationReason(str, Enum): MAX_STEPS: Trajectory ends because we hit the step limit CONTROL_PLANE_SIGNAL: Trajectory ends because the control plane signals termination (e.g. env goal reached or failure condition) USER_STOP: Trajectory ends because the simulated user signals to stop + INTERRUPTED: Trajectory ends unexpectedly, for example, expecting tool call but there is no tool call + ERROR: Trajectory ends because of an error """ MAX_STEPS = "max_steps" CONTROL_PLANE_SIGNAL = "control_plane_signal" USER_STOP = "user_stop" + INTERRUPTED = "interrupted" ERROR = "error" From 90f51990feaceb1d8b3594bd0309eb14e5685128 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Mon, 11 Aug 2025 01:41:32 +0000 Subject: [PATCH 5/5] remove comment --- eval_protocol/mcp/execution/manager.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index e6544757..96e2cf6e 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -115,9 +115,6 @@ async def _execute_with_semaphore(idx): shared_tool_schema = envs.tool_schemas - # Clean up - # await envs.close() - # Enhanced reporting with control plane info successful = sum(1 for traj in trajectories if traj.total_reward > 0) terminated_by_control_plane = sum(