diff --git a/examples/agentic_demo/openreward_endless_terminals_reinforce_qwen35_2b.yaml b/examples/agentic_demo/openreward_endless_terminals_reinforce_qwen35_2b.yaml new file mode 100644 index 00000000..84db30ba --- /dev/null +++ b/examples/agentic_demo/openreward_endless_terminals_reinforce_qwen35_2b.yaml @@ -0,0 +1,203 @@ +# OpenReward + EndlessTerminals + STEP_REINFORCE (vanilla PG) config +# Simpler baseline without IPA chunk-level loss. +# The model IS the agent — no iflow, no sandbox, no anti_call_llm. +# +# Usage: +# bash examples/agentic_demo/run_openreward_endless_terminals.sh reinforce +# # or directly: +# python examples/start_agentic_pipeline.py \ +# --config_path agentic_demo \ +# --config_name openreward_endless_terminals_reinforce_qwen35_2b + +defaults: + - ../config/deepspeed_zero@_here_ + - ../config/deepspeed_zero2@_here_ + - ../config/deepspeed_zero3@_here_ + - ../config/deepspeed_zero3_cpuoffload@_here_ + +hydra: + run: + dir: . + output_subdir: null + +exp_name: "openreward_endless_terminals_reinforce_qwen35_2b" +seed: 42 + +logging_dir: ./output/logs +output_dir: ./output +model_name: ${exp_name}-${now:%Y%m%d_%H%M%S} +rollout_dump_dir: /home/ubuntu/ALE-latest/ROLL-personal/output/rollout_dump +system_envs: + USE_MODELSCOPE: '1' + +checkpoint_config: + type: file_system + output_dir: /data + +num_gpus_per_node: 8 +rpc_timeout: 72000 + +max_steps: 10 +save_steps: 50 +logging_steps: 1 +eval_steps: 0 +resume_from_checkpoint: false + +async_generation_ratio: 1 +parse_tool_call_parameter_to_dict: true +skip_mock_system_prompt: true + +track_with: wandb +tracker_kwargs: + api_key: ${oc.env:WANDB_API_KEY} + project: roll-agentic + name: ${exp_name} + +rollout_batch_size: 16 +val_batch_size: 1 +sequence_length: 32768 + +max_tokens_per_step: 4096 + +# --- Vanilla STEP_REINFORCE config --- +advantage_clip: 0.2 +ppo_epochs: 1 +adv_estimator: "step_reinforce" +batch_adjust_mode: "random_sample" +step_reward_gamma: 1.0 + +init_kl_coef: 0.0 +whiten_advantages: true +entropy_loss_coef: 0 +max_grad_norm: 1.0 + +# --- Model configs --- +pretrain: /home/ubuntu/ALE-latest/model-checkpoints/Qwen3.5-2B +reward_pretrain: /home/ubuntu/ALE-latest/model-checkpoints/Qwen3.5-2B +actor_train: + model_args: + flash_attn: sdpa + attn_implementation: sdpa + disable_gradient_checkpointing: false + dtype: bf16 + model_type: ~ + freeze_module_prefix: vision_model + training_args: + learning_rate: 1.0e-6 + weight_decay: 0 + per_device_train_batch_size: 1 + gradient_accumulation_steps: 4 + warmup_steps: 0 + data_args: + template: qwen3_coder + strategy_args: + strategy_name: megatron_train + strategy_config: + tensor_model_parallel_size: 2 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + context_parallel_size: 2 + sequence_parallel: true + use_distributed_optimizer: true + recompute_granularity: full + device_mapping: list(range(0,4)) + infer_batch_size: 1 +actor_infer: + model_args: + flash_attn: sdpa + attn_implementation: sdpa + disable_gradient_checkpointing: true + dtype: bf16 + generating_args: + max_new_tokens: ${max_tokens_per_step} + top_p: 1.0 + top_k: 50 + num_beams: 1 + temperature: 1.0 + num_return_sequences: 1 + stop_strings: [""] + include_stop_str_in_output: true + data_args: + template: qwen3_coder + strategy_args: + strategy_name: vllm + strategy_config: + gpu_memory_utilization: 0.6 + block_size: 16 + load_format: auto + tensor_parallel_size: 1 + max_model_len: 32768 + device_mapping: list(range(0,8)) + +reference: + model_args: + attn_implementation: sdpa + disable_gradient_checkpointing: true + dtype: bf16 + model_type: ~ + freeze_module_prefix: vision_model + data_args: + template: qwen3_coder + strategy_args: + strategy_name: megatron_infer + strategy_config: + tensor_model_parallel_size: 2 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + context_parallel_size: 2 + device_mapping: list(range(0,4)) + infer_batch_size: 1 + +reward_normalization: + grouping: traj_group_id + method: identity + +# --- Environment config (OpenReward) --- +max_actions_per_traj: 16 +env_manager_cls: roll.pipeline.agentic.env_manager.agent_native_env_manager.AgentNativeStepEnvManager + +train_env_manager: + max_env_num_per_worker: 1 + num_env_groups: 1 + group_size: 1 + tags: [OpenRewardEndlessTerminalsTrain] + num_groups_partition: [1] + +val_env_manager: + max_env_num_per_worker: 1 + num_env_groups: 1 + group_size: 1 + tags: [OpenRewardEndlessTerminalsVal] + num_groups_partition: [1] + +custom_envs: + OpenRewardEndlessTerminalsTrain: + env_type: "openreward_env" + max_steps: ${max_actions_per_traj} + max_tokens_per_step: ${max_tokens_per_step} + env_manager_cls: ${env_manager_cls} + agent_system_template: "unused — system prompt built dynamically from OpenReward tool specs" + agent_template: "unused — observation is full message list from OpenRewardEnv" + env_config: + environment_name: "kanishk/EndlessTerminals" + split: "train" + mode: "train" + max_steps: ${max_actions_per_traj} + reward_reduction: "sum" + nonterminal_reward: 0.0 + retry_max_attempts: 3 + retry_backoff_seconds: 5.0 + OpenRewardEndlessTerminalsVal: + env_type: "openreward_env" + max_steps: ${max_actions_per_traj} + max_tokens_per_step: ${max_tokens_per_step} + env_manager_cls: ${env_manager_cls} + agent_system_template: "unused" + agent_template: "unused" + env_config: + environment_name: "kanishk/EndlessTerminals" + split: "train" + mode: "val" + max_steps: ${max_actions_per_traj} + reward_reduction: "sum" + nonterminal_reward: 0.0 diff --git a/examples/agentic_demo/run_openreward_endless_terminals.sh b/examples/agentic_demo/run_openreward_endless_terminals.sh new file mode 100755 index 00000000..abff7dff --- /dev/null +++ b/examples/agentic_demo/run_openreward_endless_terminals.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Run OpenReward EndlessTerminals REINFORCE training with Qwen3.5-2B. +# +# Prerequisites: +# pip install openreward # inside the docker container +# +# Usage (inside roll_openreward_runner container): +# export OPENREWARD_API_KEY="..." +# export WANDB_API_KEY="..." +# cd /home/ubuntu/ALE-latest/ROLL-personal +# bash examples/agentic_demo/run_openreward_endless_terminals.sh + +set -euo pipefail + +: "${OPENREWARD_API_KEY:?Set OPENREWARD_API_KEY}" +: "${WANDB_API_KEY:?Set WANDB_API_KEY}" + +export NCCL_NET_PLUGIN='' +export NCCL_TUNER_PLUGIN='' +export NCCL_NET=Socket +export PYTHONPATH="${PWD}:${PYTHONPATH:-}" + +python examples/start_agentic_pipeline.py \ + --config_path agentic_demo \ + --config_name openreward_endless_terminals_reinforce_qwen35_2b diff --git a/roll/pipeline/agentic/env/__init__.py b/roll/pipeline/agentic/env/__init__.py index 1cb9620d..0373abca 100644 --- a/roll/pipeline/agentic/env/__init__.py +++ b/roll/pipeline/agentic/env/__init__.py @@ -17,7 +17,10 @@ gem.register("deepeyes", entry_point="roll.pipeline.agentic.env.deepeyes:DeepEyesEnv") gem.register("rock_tb_native_env", entry_point="roll.pipeline.agentic.env.sandbox.rock_tb_native_env:RockTBNativeEnv") - +try: + gem.register("openreward_env", entry_point="roll.pipeline.agentic.env.openreward:OpenRewardEnv") +except Exception as e: + logger.info(f"Failed to register openreward_env: {e}") try: # add webshop-minimal to PYTHONPATH diff --git a/roll/pipeline/agentic/env/openreward/__init__.py b/roll/pipeline/agentic/env/openreward/__init__.py new file mode 100644 index 00000000..ff87e190 --- /dev/null +++ b/roll/pipeline/agentic/env/openreward/__init__.py @@ -0,0 +1,3 @@ +from .openreward_env import OpenRewardEnv + +__all__ = ["OpenRewardEnv"] diff --git a/roll/pipeline/agentic/env/openreward/openreward_env.py b/roll/pipeline/agentic/env/openreward/openreward_env.py new file mode 100644 index 00000000..9caace7d --- /dev/null +++ b/roll/pipeline/agentic/env/openreward/openreward_env.py @@ -0,0 +1,414 @@ +"""OpenReward environment for ROLL agentic training. + +Wraps the OpenReward sync SDK to implement the ``gem.Env`` interface expected by +:class:`AgentNativeStepEnvManager`. Each episode opens an OpenReward session for +one task, collects tool-call interactions, and returns the terminal reward. + +Usage in YAML config:: + + custom_envs: + MyEnvTag: + env_type: "openreward_env" + env_config: + environment_name: "kanishk/EndlessTerminals" + split: "train" + max_steps: 16 +""" +import copy +import logging +import os +import time +from typing import Any, Dict, List, Optional, SupportsFloat, Tuple, Union + +from gem import Env + +from roll.pipeline.agentic.env.openreward.tool_utils import ( + openreward_spec_to_qwen_tool, + parse_tool_call, + reduce_rewards, +) +from roll.utils.constants import EpisodeStopReason + +logger = logging.getLogger(__name__) + + +class OpenRewardEnv(Env): + """ROLL environment backed by the OpenReward SDK. + + The model generates ```` XML blocks. This class parses them, + forwards the call to :pymethod:`session.call_tool`, and wraps the result + in ```` for the next model turn. + + Args: + environment_name: Fully-qualified OpenReward environment name + (e.g. ``"kanishk/EndlessTerminals"``). + split: Dataset split to draw tasks from (``"train"`` or ``"test"``). + mode: ``"train"`` or ``"val"`` — mirrors the ROCK env convention. + max_steps: Maximum tool-call turns per episode. + system_prompt_template: Override the default system prompt template. + Must contain a ``{tools}`` placeholder. + reward_reduction: How to reduce per-step rewards (``"sum"``, ``"mean"``, + ``"max"``, ``"min"``). + nonterminal_reward: Penalty added when the episode truncates without + reaching a terminal state. ``None`` means no penalty. + retry_max_attempts: Number of session-creation retries on transient errors. + retry_backoff_seconds: Base backoff between retries (doubles each attempt). + """ + + def __init__( + self, + environment_name: str, + split: str = "train", + mode: str = "train", + max_steps: int = 16, + system_prompt_template: Optional[str] = None, + reward_reduction: str = "sum", + nonterminal_reward: Optional[float] = None, + retry_max_attempts: int = 3, + retry_backoff_seconds: float = 5.0, + **kwargs: Any, + ) -> None: + super().__init__() + self._environment_name = environment_name + self._split = split + self._mode = mode + self._max_steps = max_steps + self._system_prompt_template = system_prompt_template + self._reward_reduction = reward_reduction + self._nonterminal_reward = nonterminal_reward + self._retry_max_attempts = retry_max_attempts + self._retry_backoff_seconds = retry_backoff_seconds + + # --- SDK handles (lazy: created once in __init__) --- + from openreward import OpenReward + + api_key = os.environ.get("OPENREWARD_API_KEY", "") + self._client = OpenReward(api_key=api_key) if api_key else OpenReward() + self._or_env = self._client.environments.get(name=environment_name) + self._num_tasks: int = self._or_env.num_tasks(split) + logger.info( + "[OpenRewardEnv] Connected to %s — %d %s tasks", + environment_name, self._num_tasks, split, + ) + + # --- Episode state (reset each episode) --- + self._session: Any = None + self._message_history: List[Dict[str, str]] = [] + self._step_rewards: List[float] = [] + self._task_index: int = -1 + self.current_step: int = 0 + self._num_tool_calls: int = 0 + self._num_failed_tool_calls: int = 0 + self._finished: bool = False + + # --- Flags read by the env manager --- + self.env_reset_failed: bool = False + self.env_timeout: bool = False + + # ------------------------------------------------------------------ + # gem.Env interface + # ------------------------------------------------------------------ + + def reset( + self, seed: Optional[int] = None, + ) -> Tuple[List[Dict[str, str]], Dict[str, Any]]: + """Open an OpenReward session and return the initial conversation. + + Args: + seed: Used to deterministically select a task index. + + Returns: + ``(observation, info)`` where *observation* is a list of message + dicts (system + user prompt) and *info* contains ``tools``, + ``error_msg``, and ``failure_mode`` keys. + """ + super().reset(seed=seed) + self._clean_state() + + # Derive a deterministic task index from the seed + if seed is not None: + self._task_index = seed % self._num_tasks + else: + self._task_index = 0 + + # Open session with retry logic + if not self._open_session(): + # Session creation failed — signal to env manager + return [], { + "tools": [], + "error_msg": "Session creation failed after retries", + "failure_mode": "session_creation_failed", + } + + # Fetch tools and prompt from the live session + try: + raw_tools = self._session.list_tools() + prompt_blocks = self._session.get_prompt() + except Exception as exc: + logger.error("[OpenRewardEnv] Failed to get tools/prompt: %s", exc) + self.env_reset_failed = True + self._close_session() + return [], { + "tools": [], + "error_msg": str(exc), + "failure_mode": "tools_or_prompt_failed", + } + + # Convert OpenReward tool specs to Qwen-native dict format. + # These are passed via info["tools"] → env_manager → tokenizer.apply_chat_template(tools=...) + # The tokenizer's Jinja2 template builds the system prompt automatically with the correct + # tool-call format that the model was trained on (...). + self._qwen_tools = [openreward_spec_to_qwen_tool(t) for t in raw_tools] + + user_text = "".join( + b.text for b in prompt_blocks if b.type == "text" + ) + + # No manual system prompt needed — the tokenizer builds it from tools=. + # We only provide the user message with the task prompt. + self._message_history = [ + {"role": "user", "content": user_text}, + ] + + # --- Observability: log reset --- + logger.info( + "[OBSERVE][ENV_RESET] task_index=%d tools=[%s] prompt=%.200s", + self._task_index, + ", ".join(t["function"]["name"] for t in self._qwen_tools), + user_text[:200].replace("\n", "\\n"), + ) + + return copy.deepcopy(self._message_history), { + "tools": self._qwen_tools, + "error_msg": "", + "failure_mode": "", + "task_name": f"{self._environment_name}:{self._split}:{self._task_index}", + } + + def step( + self, action: Union[str, Any], + ) -> Tuple[List[Dict[str, str]], SupportsFloat, bool, bool, Dict[str, Any]]: + """Execute one tool-call turn. + + Args: + action: The model's decoded text output, or an + :class:`EpisodeStopReason` for forced termination. + + Returns: + ``(observation, reward, terminated, truncated, info)`` + """ + self.current_step += 1 + + # Handle forced-termination actions from the env manager + if isinstance(action, EpisodeStopReason): + reward = self._compute_final_reward(reached_terminal=False) + info = self._build_info(stop_reason=action.value) + self._close_session() + return copy.deepcopy(self._message_history), reward, True, True, info + + # Enforce max_steps internally (env manager expects this) + if self.current_step > self._max_steps: + reward = self._compute_final_reward(reached_terminal=False) + info = self._build_info(stop_reason="max_steps") + self._close_session() + return copy.deepcopy(self._message_history), reward, True, True, info + + # Clean trailing special tokens from the model output + clean_action = action.replace("<|im_end|>", "").rstrip() + + # --- Observability: log model action --- + logger.info( + "[OBSERVE][ENV_ACTION] step=%d has_tool_call=%s has_close_tag=%s action=%.500s", + self.current_step, "" in clean_action, "" in clean_action, + clean_action.replace("\n", "\\n"), + ) + + # Append the assistant's message to history + self._message_history.append({"role": "assistant", "content": clean_action}) + + # Parse the tool call + tc = parse_tool_call(clean_action) + + if tc is None: + # No tool call found — nudge the model + nudge = ( + "No tool call detected in your response. " + "Please use the provided tools with ... format " + "to complete the task." + ) + self._message_history.append({"role": "user", "content": nudge}) + info = self._build_info(stop_reason="no_tool_call") + return copy.deepcopy(self._message_history), 0.0, False, False, info + + if tc["type"] == "error": + # Parse error — nudge with the error message + error_nudge = ( + f"Tool call parse error: {tc['error']}. " + "Please ensure arguments are valid JSON within " + "... tags." + ) + self._message_history.append({"role": "user", "content": error_nudge}) + self._num_failed_tool_calls += 1 + info = self._build_info(stop_reason="parse_error") + return copy.deepcopy(self._message_history), 0.0, False, False, info + + # Valid tool call — execute it + self._num_tool_calls += 1 + + # --- Observability: log tool call request --- + logger.info( + "[OBSERVE][TOOL_CALL] step=%d name=%s arguments=%s", + self.current_step, tc["name"], + str(tc["arguments"])[:300].replace("\n", "\\n"), + ) + + tool_text, finished = self._execute_tool_call(tc["name"], tc["arguments"]) + + # --- Observability: log tool response --- + logger.info( + "[OBSERVE][TOOL_RESPONSE] step=%d name=%s finished=%s reward=%s output=%.200s", + self.current_step, tc["name"], finished, + self._step_rewards[-1] if self._step_rewards else None, + tool_text[:200].replace("\n", "\\n"), + ) + + # Append tool response as user message + self._message_history.append({ + "role": "user", + "content": f"\n{tool_text}\n", + }) + + # Determine termination and reward + terminated = finished + reward = 0.0 + if terminated: + reward = self._compute_final_reward(reached_terminal=True) + self._close_session() + + info = self._build_info( + stop_reason="finished" if terminated else "continue", + ) + return copy.deepcopy(self._message_history), reward, terminated, False, info + + def close(self) -> None: + """Release the OpenReward session if still open.""" + self._close_session() + + @property + def env_info(self) -> Dict[str, Any]: + """Task metadata used by ``formulate_rollouts`` for trajectory logging.""" + return { + "environment_name": self._environment_name, + "task_index": self._task_index, + "split": self._split, + "current_step": self.current_step, + "num_tool_calls": self._num_tool_calls, + } + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _clean_state(self) -> None: + """Reset all per-episode state.""" + self._close_session() + self._message_history = [] + self._step_rewards = [] + self._task_index = -1 + self.current_step = 0 + self._num_tool_calls = 0 + self._num_failed_tool_calls = 0 + self._finished = False + self.env_reset_failed = False + self.env_timeout = False + + def _open_session(self) -> bool: + """Open an OpenReward session with retry + exponential backoff. + + Returns: + ``True`` if a session was opened, ``False`` on failure. + """ + backoff = self._retry_backoff_seconds + for attempt in range(self._retry_max_attempts + 1): + try: + self._session = self._or_env.session( + split=self._split, index=self._task_index, + ) + self._session.__enter__() + logger.info( + "[OpenRewardEnv] Session opened: %s split=%s index=%d (attempt %d)", + self._environment_name, self._split, self._task_index, attempt, + ) + return True + except Exception as exc: + logger.warning( + "[OpenRewardEnv] Session creation failed (attempt %d/%d): %s", + attempt + 1, self._retry_max_attempts + 1, exc, + ) + if attempt < self._retry_max_attempts: + time.sleep(backoff) + backoff *= 2 + + self.env_reset_failed = True + return False + + def _close_session(self) -> None: + """Safely close the active session.""" + if self._session is not None: + try: + self._session.__exit__(None, None, None) + except Exception as exc: + logger.debug("[OpenRewardEnv] Error closing session: %s", exc) + self._session = None + + def _execute_tool_call( + self, name: str, arguments: Dict[str, Any], + ) -> Tuple[str, bool]: + """Call a tool on the OpenReward session. + + Returns: + ``(tool_output_text, finished)`` + """ + try: + tool_out = self._session.call_tool(name, arguments) + tool_text = "".join( + b.text for b in tool_out.blocks if b.type == "text" + ) + if tool_out.reward is not None: + self._step_rewards.append(tool_out.reward) + return tool_text, tool_out.finished + except Exception as exc: + logger.warning("[OpenRewardEnv] Tool call failed (%s): %s", name, exc) + self._num_failed_tool_calls += 1 + return f"Error executing tool '{name}': {exc}", False + + def _compute_final_reward(self, reached_terminal: bool) -> float: + """Compute the episode reward from collected step rewards.""" + rewards = list(self._step_rewards) + if not reached_terminal and self._nonterminal_reward is not None: + rewards.append(self._nonterminal_reward) + return reduce_rewards(rewards, self._reward_reduction) + + def _build_info(self, stop_reason: str = "") -> Dict[str, Any]: + """Build the info dict returned by ``step()``.""" + metrics = { + "env_timeout": self.env_timeout, + "env_reset_failed": self.env_reset_failed, + "success": any(r > 0 for r in self._step_rewards), + "raw_reward": self._compute_final_reward(reached_terminal=True), + "current_step": self.current_step, + "num_tool_calls": self._num_tool_calls, + "num_failed_tool_calls": self._num_failed_tool_calls, + } + metrics_agg_mode = { + "success": "last", + "raw_reward": "last", + } + return { + "metrics": metrics, + "metrics_agg_mode": metrics_agg_mode, + "failure_mode": "", + "error_messages": [], + "stop_reason": stop_reason, + "test_output": "", + } diff --git a/roll/pipeline/agentic/env/openreward/tool_utils.py b/roll/pipeline/agentic/env/openreward/tool_utils.py new file mode 100644 index 00000000..01477407 --- /dev/null +++ b/roll/pipeline/agentic/env/openreward/tool_utils.py @@ -0,0 +1,134 @@ +"""Reusable utilities for OpenReward tool call parsing and system prompt building. + +Supports Qwen3.5's **native** tool-call format (``...``) +as well as the JSON fallback (``{"name": ..., "arguments": {...}}``). +""" +import json +import re +from typing import Any, Dict, List, Optional + + +# --------------------------------------------------------------------------- +# Tool-spec conversion: OpenReward spec → Qwen chat-template dict +# --------------------------------------------------------------------------- + +def openreward_spec_to_qwen_tool(spec: Any) -> Dict[str, Any]: + """Convert an OpenReward tool spec to the dict format expected by + ``tokenizer.apply_chat_template(tools=[...])``. + + The Qwen3.5 chat template expects each tool as:: + + {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}} + + Args: + spec: OpenReward tool spec with ``.name``, ``.input_schema``, ``.description``. + + Returns: + A dict compatible with the Qwen tokenizer's ``tools`` parameter. + """ + return { + "type": "function", + "function": { + "name": spec.name, + "description": spec.description, + "parameters": spec.input_schema, + }, + } + + +# --------------------------------------------------------------------------- +# Tool-call parsing: Qwen native XML + JSON fallback +# --------------------------------------------------------------------------- + +# Regex for Qwen3.5 native format: ...\nvalue\n... +_FUNCTION_RE = re.compile( + r"[^>]+)>(?P.*?)", + re.DOTALL, +) +_PARAMETER_RE = re.compile( + r"[^>]+)>\s*(?P.*?)\s*", + re.DOTALL, +) + + +def parse_tool_call(text: str) -> Optional[Dict[str, Any]]: + """Parse a tool call from model output. + + Supports two formats: + + 1. **Qwen3.5 native** (preferred):: + + + + ls + list files + + + + 2. **JSON fallback** (cookbook style):: + + + {"name": "bash", "arguments": {"command": "ls", "description": "list files"}} + + + Args: + text: Raw model output. + + Returns: + ``None`` if no ```` found. + ``{"type": "success", "name": str, "arguments": dict}`` on success. + ``{"type": "error", "error": str}`` on parse failure. + """ + start_tag = "" + si = text.find(start_tag) + if si == -1: + return None + + end_tag = "" + ei = text.find(end_tag, si) + inner = text[si + len(start_tag):ei].strip() if ei != -1 else text[si + len(start_tag):].strip() + + if not inner: + return {"type": "error", "error": "empty tool call block"} + + # --- Try Qwen native XML format first --- + func_match = _FUNCTION_RE.search(inner) + if func_match: + name = func_match.group("name").strip() + body = func_match.group("body") + arguments: Dict[str, str] = {} + for param_match in _PARAMETER_RE.finditer(body): + key = param_match.group("key").strip() + value = param_match.group("value").strip() + arguments[key] = value + return {"type": "success", "name": name, "arguments": arguments} + + # --- Fallback: JSON format --- + try: + data = json.loads(inner) + if not isinstance(data, dict): + return {"type": "error", "error": f"parsed value is not a dict: {type(data).__name__}"} + name = data.get("name") + if not name: + return {"type": "error", "error": "missing 'name' field in tool call"} + args = data.get("arguments", {}) + if not isinstance(args, dict): + return {"type": "error", "error": f"arguments is not a dict: {type(args).__name__}"} + return {"type": "success", "name": name, "arguments": args} + except (json.JSONDecodeError, KeyError) as exc: + return {"type": "error", "error": str(exc)} + + +def reduce_rewards(rewards: List[float], method: str) -> float: + """Reduce a list of per-step rewards to a single scalar.""" + if not rewards: + return 0.0 + if method == "sum": + return sum(rewards) + elif method == "mean": + return sum(rewards) / len(rewards) + elif method == "max": + return max(rewards) + elif method == "min": + return min(rewards) + raise ValueError(f"Unknown reward reduction method: {method!r}")