Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eval_protocol/mcp/client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ async def close_session(self, session: MCPSession) -> None:
await session._exit_stack.aclose()
except asyncio.CancelledError:
# Handle cancellation gracefully (especially important for Python 3.12)
logger.debug(f"Session {session.session_id} close was cancelled")
logger.error(f"Session {session.session_id} close was cancelled")
except Exception as e:
# Hitting this error, probably because of use of threads: "Attempted to exit cancel scope in a different task than it was entered in"
logger.debug(f"Error closing session {session.session_id}: {e}")
logger.error(f"Error closing session {session.session_id}: {e}")
finally:
session._exit_stack = None
session._mcp_session = None
2 changes: 1 addition & 1 deletion eval_protocol/mcp/execution/base_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def _generate_live_tool_calls(
return mcp_tool_calls, usage_stats
else:
# No tool calls in response - this is normal when episode ends or LLM provides only text
logger.info(f"No tool calls in response for env {env_index}, message content: {message.get('content')}")
logger.debug(f"No tool calls in response for env {env_index}, message content: {message.get('content')}")
return [
MCPToolCall(
tool_name="_no_tool_call",
Expand Down
37 changes: 27 additions & 10 deletions eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ async def execute_rollouts(

async def _execute_with_semaphore(idx):
async with semaphore:
return await self._execute_rollout(
result = await self._execute_rollout(
envs, policy, idx, steps, openai_logger, recording_mode, playback_mode, start_time
)

return result

tasks = [_execute_with_semaphore(i) for i in range(envs.n)]
# exceptions will be try catched inside single _execute_rollout
trajectories = await asyncio.gather(*tasks)
Expand All @@ -113,9 +115,6 @@ async def _execute_with_semaphore(idx):

shared_tool_schema = envs.tool_schemas

# Clean up
await envs.close()

# Enhanced reporting with control plane info
successful = sum(1 for traj in trajectories if traj.total_reward > 0)
terminated_by_control_plane = sum(
Expand Down Expand Up @@ -176,8 +175,11 @@ async def _execute_with_semaphore(idx):
TerminationReason.USER_STOP,
}:
evaluation_rows[idx].rollout_status.status = "finished"
elif trajectory.termination_reason == TerminationReason.MAX_STEPS:
elif trajectory.termination_reason in {TerminationReason.MAX_STEPS, TerminationReason.INTERRUPTED}:
evaluation_rows[idx].rollout_status.status = "stopped"
evaluation_rows[idx].rollout_status.error_message = trajectory.control_plane_summary.get(
"termination_reason", trajectory.termination_reason
)
else:
evaluation_rows[idx].rollout_status.status = "error"
evaluation_rows[idx].rollout_status.error_message = trajectory.control_plane_summary.get(
Expand Down Expand Up @@ -227,6 +229,7 @@ async def _execute_rollout(
"total_tokens": 0,
},
)
failure_reason = None
try:
current_observation, tool_schema = await envs.reset(session)
system_prompt = dataset_row.system_prompt
Expand Down Expand Up @@ -312,8 +315,7 @@ async def _execute_rollout(
# If there's no user simulator, no tool call means policy failed and we should terminate the rollout
elif tool_calls[0].tool_name in ["_playback_terminate", "_no_tool_call"]:
trajectory.terminated = True
trajectory.termination_reason = TerminationReason.ERROR
trajectory.control_plane_summary.update({"error_message": "No expected tool call"})
trajectory.termination_reason = TerminationReason.INTERRUPTED
break

# Execute each tool call sequentially
Expand Down Expand Up @@ -467,11 +469,26 @@ async def _execute_rollout(
logger.info(
f"✅ Rollout {rollout_idx} completed: {trajectory.steps} steps, reward: {trajectory.total_reward:.2f}, termination: {trajectory.termination_reason}, in thread {threading.current_thread().name}"
)

except asyncio.CancelledError:
logger.error(f"🚨 AsyncIO Cancel Error in roll out {rollout_idx}", exc_info=True)
failure_reason = "asyncio context cancelled"
except Exception as e:
logger.error(f"🚨 Error in rollout {rollout_idx}: {e}", exc_info=True)
trajectory.terminated = True
trajectory.termination_reason = TerminationReason.ERROR
trajectory.control_plane_summary.update({"error_message": str(e)})
failure_reason = str(e)
finally:
if failure_reason:
trajectory.terminated = True
trajectory.termination_reason = TerminationReason.ERROR
trajectory.control_plane_summary.update({"error_message": f"{failure_reason}"})
try:
await envs.connection_manager.reset_session(session)
except:
logger.error(f"Error resetting session {session.session_id}")
try:
await envs.connection_manager.close_session(session)
except:
logger.error(f"Error closing session {session.session_id}")
return trajectory

async def _get_control_plane_status(self, session) -> Optional[Dict[str, Any]]:
Expand Down
1 change: 1 addition & 0 deletions eval_protocol/mcp/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ async def reset(self, session: MCPSession) -> Tuple[Any, List[Dict]]:

This is thread-safe and can be called from worker threads.
"""
await self.connection_manager.initialize_session(session)
# Get available tools from MCP server
tool_schemas = await self.connection_manager.discover_tools(session)

Expand Down
20 changes: 6 additions & 14 deletions eval_protocol/mcp_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
policy = ep.FireworksPolicy(model_id="accounts/fireworks/models/qwen3-235b-a22b")

# Create environments with evaluation_rows configuration
envs = await ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows)
envs = ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows)

# Execute tool-calling rollouts
evaluation_rows = await ep.rollout(envs, policy=policy, steps=512)
Expand Down Expand Up @@ -86,18 +86,17 @@ async def reset_mcp_sessions(envs: GeneralMCPVectorEnv):
Reset mcp server sessions
"""
tasks = [envs.connection_manager.reset_session(session) for session in envs.sessions]
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)


async def make(
def make(
env_spec: str,
evaluation_rows: Optional[List[EvaluationRow]] = None,
dataset: Optional[List[Dict]] = None,
n: Optional[int] = None,
seeds: Optional[List[int]] = None,
model_id: str = "unknown",
user_prompt_formatter: Optional[Callable] = None,
reset_sessions: bool = False,
) -> GeneralMCPVectorEnv:
"""
Create general MCP environments driven by evaluation_rows configuration.
Expand All @@ -110,20 +109,19 @@ async def make(
seeds: List of seeds (for backward compatibility)
model_id: Model identifier
user_prompt_formatter: Optional callback for formatting user prompts
reset_sessions: Whether to reset sessions before returning the environment

Returns:
General MCP environment that works with any MCP server

Example:
# EvaluationRow approach (preferred)
envs = await ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows)
envs = ep.make("http://localhost:8000/mcp", evaluation_rows=evaluation_rows)

# Dataset approach (backward compatibility)
envs = await ep.make("http://localhost:8000/mcp", dataset=dataset)
envs = ep.make("http://localhost:8000/mcp", dataset=dataset)

# Legacy approach (backward compatibility)
envs = await ep.make("http://localhost:8000/mcp", n=10, seeds=seeds)
envs = ep.make("http://localhost:8000/mcp", n=10, seeds=seeds)
"""
# Parse environment specification - make sure URL format is correct
base_url = env_spec
Expand Down Expand Up @@ -236,12 +234,6 @@ async def make(
sessions.append(session)

mcp_envs = GeneralMCPVectorEnv(sessions, dataset_rows, user_prompt_formatter)
tasks = [mcp_envs.connection_manager.initialize_session(session) for session in sessions]
await asyncio.gather(*tasks)

if reset_sessions:
await reset_mcp_sessions(mcp_envs)

return mcp_envs


Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/pytest/default_mcp_gym_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def default_mcp_gym_rollout_processor(
)

# Create MCP environments directly from evaluation_rows
envs = await ep.make(
envs = ep.make(
"http://localhost:9700/mcp/",
evaluation_rows=rows,
model_id=policy.model_id,
Expand Down
3 changes: 3 additions & 0 deletions eval_protocol/types/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ class TerminationReason(str, Enum):
MAX_STEPS: Trajectory ends because we hit the step limit
CONTROL_PLANE_SIGNAL: Trajectory ends because the control plane signals termination (e.g. env goal reached or failure condition)
USER_STOP: Trajectory ends because the simulated user signals to stop
INTERRUPTED: Trajectory ends unexpectedly, for example, expecting tool call but there is no tool call
ERROR: Trajectory ends because of an error
"""

MAX_STEPS = "max_steps"
CONTROL_PLANE_SIGNAL = "control_plane_signal"
USER_STOP = "user_stop"
INTERRUPTED = "interrupted"
ERROR = "error"


Expand Down
16 changes: 8 additions & 8 deletions examples/blackjack_mcp/tests/test_record_and_replay_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_
assert playback_policy.is_playback_mode(), "Should be in playback mode in CI"

# Create environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=blackjack_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -250,7 +250,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_
assert not policy.is_playback_mode(), "Should be in recording mode initially"

# Create environments
envs = await ep.make(
envs = ep.make(
"http://localhost:9500/mcp/",
dataset=blackjack_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -310,7 +310,7 @@ async def test_production_server_record_and_replay(production_server, blackjack_
assert playback_policy.is_playback_mode(), "Should be in playback mode"

# Create new environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=blackjack_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -462,7 +462,7 @@ async def test_blackjack_step_by_step(conda_isolation_recording_file):
]

# Create environment pointing to conda-isolated server
envs = await ep.make(
envs = ep.make(
f"http://localhost:{port}/mcp/",
dataset=test_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -570,7 +570,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording
policy = create_blackjack_static_policy(action_sequence=["HIT", "HIT", "STICK"])

# Create multiple environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -992,7 +992,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks
assert playback_policy.is_playback_mode(), "Should be in playback mode in CI"

# Create environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=multi_env_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -1033,7 +1033,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks
assert not policy.is_playback_mode(), "Should be in recording mode initially"

# Create multiple environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -1149,7 +1149,7 @@ async def test_control_plane_state_querying(multi_env_dataset):
policy = create_blackjack_static_policy(action_sequence=["HIT", "STAND"])

# Create environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing
model_id=policy.model_id,
Expand Down
16 changes: 8 additions & 8 deletions examples/cliff_walking_mcp/tests/test_cliff_walking_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def test_production_server_record_and_replay(
assert playback_policy.is_playback_mode(), "Should be in playback mode in CI"

# Create environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=cliff_walking_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -259,7 +259,7 @@ async def test_production_server_record_and_replay(
assert not policy.is_playback_mode(), "Should be in recording mode initially"

# Create environments
envs = await ep.make(
envs = ep.make(
"http://localhost:9500/mcp/",
dataset=cliff_walking_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -318,7 +318,7 @@ async def test_production_server_record_and_replay(
assert playback_policy.is_playback_mode(), "Should be in playback mode"

# Create new environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=cliff_walking_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -471,7 +471,7 @@ async def test_cliff_walking_step_by_step(conda_isolation_recording_file):
]

# Create environment pointing to conda-isolated server
envs = await ep.make(
envs = ep.make(
f"http://localhost:{port}/mcp/",
dataset=test_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -589,7 +589,7 @@ async def test_multi_environment_sessions(multi_env_dataset, multi_env_recording
)

# Create multiple environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -1018,7 +1018,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks
assert playback_policy.is_playback_mode(), "Should be in playback mode in CI"

# Create environments for playback
playback_envs = await ep.make(
playback_envs = ep.make(
"http://localhost:9500/mcp/",
dataset=multi_env_dataset,
model_id=playback_policy.model_id,
Expand Down Expand Up @@ -1059,7 +1059,7 @@ async def test_fireworks_multi_environment_sessions(multi_env_dataset, fireworks
assert not policy.is_playback_mode(), "Should be in recording mode initially"

# Create multiple environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset,
model_id=policy.model_id,
Expand Down Expand Up @@ -1178,7 +1178,7 @@ async def test_control_plane_state_querying(multi_env_dataset):
policy = create_cliff_walking_static_policy(action_sequence=["UP", "UP"])

# Create environments
envs = await ep.make(
envs = ep.make(
f"http://localhost:{server.port}/mcp/",
dataset=multi_env_dataset[:2], # Use only 2 environments for faster testing
model_id=policy.model_id,
Expand Down
2 changes: 1 addition & 1 deletion examples/frozen_lake_mcp/test_basic_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def test_basic_server_functionality():
policy = ep.FireworksPolicy(model_id="accounts/fireworks/models/qwen3-235b-a22b", temperature=0.2)

# Create environment pointing to local server
envs = await ep.make("http://localhost:8000/mcp/", dataset=test_dataset, model_id=policy.model_id)
envs = ep.make("http://localhost:8000/mcp/", dataset=test_dataset, model_id=policy.model_id)
print("✅ Successfully connected to MCP server")

# Test 2: Try to make tool calls (we'll simulate this for now)
Expand Down
2 changes: 1 addition & 1 deletion examples/frozen_lake_mcp/test_multi_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def test_multi_session():

try:
# Create environments (assumes server is running on localhost:8000)
envs = await ep.make(
envs = ep.make(
"http://localhost:8000/mcp/",
dataset=test_dataset,
model_id=policy.model_id,
Expand Down
2 changes: 1 addition & 1 deletion examples/frozen_lake_mcp/test_seed_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def test_seed_logging():
try:
# Create environment pointing to our server
print("🔌 Connecting to server...")
envs = await ep.make("http://localhost:9600/mcp/", dataset=dataset, model_id="test")
envs = ep.make("http://localhost:9600/mcp/", dataset=dataset, model_id="test")
print(f"✅ Created envs: {len(envs.sessions)} sessions")

# Reset environments to trigger session creation
Expand Down
Loading
Loading