diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 90afb3b4c5dd..05a5d459fb7b 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -183,6 +183,7 @@ Guidelines for modifications: * Tsz Ki GAO * Tyler Lum * Victor Khaustov +* Vidur Vij * Virgilio Gómez Lambo * Vladimir Fokow * Wei Yang diff --git a/docs/source/api/lab/isaaclab.actuators.rst b/docs/source/api/lab/isaaclab.actuators.rst index 5ab005de5b3b..a59c3017ac44 100644 --- a/docs/source/api/lab/isaaclab.actuators.rst +++ b/docs/source/api/lab/isaaclab.actuators.rst @@ -23,6 +23,10 @@ ActuatorNetMLPCfg ActuatorNetLSTM ActuatorNetLSTMCfg + ActuatorNetGRU + ActuatorNetGRUCfg + ActuatorNetGRUResidual + ActuatorNetGRUResidualCfg Actuator Base ------------- @@ -133,3 +137,31 @@ LSTM Network Actuator :inherited-members: :show-inheritance: :exclude-members: __init__, class_type + +GRU Network Actuator +-------------------- + +.. autoclass:: ActuatorNetGRU + :members: + :inherited-members: + :show-inheritance: + +.. autoclass:: ActuatorNetGRUCfg + :members: + :inherited-members: + :show-inheritance: + :exclude-members: __init__, class_type + +GRU Residual Network Actuator +----------------------------- + +.. autoclass:: ActuatorNetGRUResidual + :members: + :inherited-members: + :show-inheritance: + +.. autoclass:: ActuatorNetGRUResidualCfg + :members: + :inherited-members: + :show-inheritance: + :exclude-members: __init__, class_type diff --git a/source/isaaclab/changelog.d/vidurv-gru-actuators.minor.rst b/source/isaaclab/changelog.d/vidurv-gru-actuators.minor.rst new file mode 100644 index 000000000000..5ec5ae050512 --- /dev/null +++ b/source/isaaclab/changelog.d/vidurv-gru-actuators.minor.rst @@ -0,0 +1,11 @@ +Added +^^^^^ + +* Added :class:`~isaaclab.actuators.ActuatorNetGRU` and + :class:`~isaaclab.actuators.ActuatorNetGRUCfg`, an explicit actuator whose GRU + network predicts the total joint effort from the joint position, position error, and + velocity, with optional input and output normalization. +* Added :class:`~isaaclab.actuators.ActuatorNetGRUResidual` and + :class:`~isaaclab.actuators.ActuatorNetGRUResidualCfg`, an implicit-PD actuator that + adds a GRU-predicted residual feed-forward effort, with optional input and output + normalization. diff --git a/source/isaaclab/isaaclab/actuators/__init__.pyi b/source/isaaclab/isaaclab/actuators/__init__.pyi index 566967cf1100..7c760e2bf92a 100644 --- a/source/isaaclab/isaaclab/actuators/__init__.pyi +++ b/source/isaaclab/isaaclab/actuators/__init__.pyi @@ -6,8 +6,12 @@ __all__ = [ "ActuatorBase", "ActuatorBaseCfg", + "ActuatorNetGRU", + "ActuatorNetGRUResidual", "ActuatorNetLSTM", "ActuatorNetMLP", + "ActuatorNetGRUCfg", + "ActuatorNetGRUResidualCfg", "ActuatorNetLSTMCfg", "ActuatorNetMLPCfg", "DCMotor", @@ -24,8 +28,8 @@ __all__ = [ from .actuator_base import ActuatorBase from .actuator_base_cfg import ActuatorBaseCfg -from .actuator_net import ActuatorNetLSTM, ActuatorNetMLP -from .actuator_net_cfg import ActuatorNetLSTMCfg, ActuatorNetMLPCfg +from .actuator_net import ActuatorNetGRU, ActuatorNetGRUResidual, ActuatorNetLSTM, ActuatorNetMLP +from .actuator_net_cfg import ActuatorNetGRUCfg, ActuatorNetGRUResidualCfg, ActuatorNetLSTMCfg, ActuatorNetMLPCfg from .actuator_pd import ( DCMotor, DelayedPDActuator, diff --git a/source/isaaclab/isaaclab/actuators/actuator_net.py b/source/isaaclab/isaaclab/actuators/actuator_net.py index 2274d1b78db3..081580675ca6 100644 --- a/source/isaaclab/isaaclab/actuators/actuator_net.py +++ b/source/isaaclab/isaaclab/actuators/actuator_net.py @@ -9,11 +9,13 @@ * Multi-Layer Perceptron (MLP) * Long Short-Term Memory (LSTM) +* Gated Recurrent Unit (GRU), both explicit full-torque and implicit-PD residual variants """ from __future__ import annotations +import logging from collections.abc import Sequence from typing import TYPE_CHECKING @@ -22,10 +24,17 @@ from isaaclab.utils.assets import read_file from isaaclab.utils.types import ArticulationActions -from .actuator_pd import DCMotor +from .actuator_pd import DCMotor, IdealPDActuator, ImplicitActuator if TYPE_CHECKING: - from .actuator_net_cfg import ActuatorNetLSTMCfg, ActuatorNetMLPCfg + from .actuator_net_cfg import ( + ActuatorNetGRUCfg, + ActuatorNetGRUResidualCfg, + ActuatorNetLSTMCfg, + ActuatorNetMLPCfg, + ) + +logger = logging.getLogger(__name__) class ActuatorNetLSTM(DCMotor): @@ -98,6 +107,242 @@ def compute( return control_action +class _GRUActuatorMixin: + """Shared machinery for the GRU-based actuator models. + + Loads the TorchScript GRU network, allocates the recurrent input and hidden-state buffers, and + runs inference. The network consumes a fixed input of joint position, position error, and + velocity. An optional ``(mean, std)`` normalization may be applied to each input and to the + output (``None`` selects the identity transform). The concrete actuator classes combine this + mixin with an explicit (:class:`IdealPDActuator`) or implicit (:class:`ImplicitActuator`) base + to define their effort semantics. + """ + + # number of fixed network inputs: [position, position_error, velocity] + _NUM_INPUTS = 3 + # standard-deviation floor used when normalizing to avoid division by tiny values + _GRU_STD_FLOOR = 1.0e-8 + + def _init_gru_runtime(self) -> None: + """Load the network and allocate the GRU buffers and normalization statistics. + + Raises: + ValueError: If the TorchScript module does not expose a ``.gru`` submodule, or if its + input dimension is not 3 (joint position, position error, and velocity). + """ + # load the TorchScript network + file_bytes = read_file(self.cfg.network_file) + self.network = torch.jit.load(file_bytes, map_location=self._device).eval() + if not hasattr(self.network, "gru"): + raise ValueError(f"The network file '{self.cfg.network_file}' must expose a TorchScript '.gru' submodule.") + + # infer dimensions from the GRU weights (the input is [position, position_error, velocity]) + gru_state = self.network.gru.state_dict() + if any("reverse" in key for key in gru_state): + raise ValueError( + f"The network file '{self.cfg.network_file}' uses a bidirectional GRU, which is not supported." + ) + input_dim = int(gru_state["weight_ih_l0"].shape[1]) + hidden_dim = int(gru_state["weight_hh_l0"].shape[1]) + num_layers = sum(1 for key in gru_state if key.startswith("weight_ih_l") and "reverse" not in key) + if input_dim != self._NUM_INPUTS: + raise ValueError( + f"The network file '{self.cfg.network_file}' must take {self._NUM_INPUTS} inputs (joint position," + f" position error, and velocity), but its GRU expects {input_dim}." + ) + + # resolve (mean, std) normalization for the inputs and output (identity when unset) + self._position_norm = self._resolve_normalization(self.cfg.position_normalization, "position_normalization") + self._pos_error_norm = self._resolve_normalization(self.cfg.pos_error_normalization, "pos_error_normalization") + self._vel_norm = self._resolve_normalization(self.cfg.vel_normalization, "vel_normalization") + self._output_norm = self._resolve_normalization(self.cfg.output_normalization, "output_normalization") + + # recurrent input and hidden-state buffers + batch = self._num_envs * self.num_joints + self.sea_input = torch.zeros(batch, 1, self._NUM_INPUTS, device=self._device) + self.sea_hidden_state = torch.zeros(num_layers, batch, hidden_dim, device=self._device) + # per-env view for resets (shares storage) + self.sea_hidden_state_per_env = self.sea_hidden_state.view( + num_layers, self._num_envs, self.num_joints, hidden_dim + ) + + def _resolve_normalization(self, stats: tuple[float, float] | None, name: str) -> tuple[float, float]: + """Return the ``(mean, std)`` to apply, defaulting to identity and flooring the std. + + Args: + stats: The ``(mean, std)`` pair, or None for the identity transform. + name: The configuration field name, used for the warning message. + + Returns: + The resolved ``(mean, std)`` with the std floored to avoid division by tiny values. + """ + if stats is None: + return 0.0, 1.0 + mean, std = float(stats[0]), float(stats[1]) + if std < 0.0: + raise ValueError( + f"Actuator '{self.cfg.network_file}' has {name} std={std}; the standard deviation must be" + " non-negative. Check the (mean, std) ordering." + ) + if std < self._GRU_STD_FLOOR: + logger.warning( + "Actuator '%s' has %s std=%s below the floor %s; flooring it, which can amplify the" + " normalized values. Set a larger std or leave the field unset for identity.", + self.cfg.network_file, + name, + std, + self._GRU_STD_FLOOR, + ) + return mean, max(std, self._GRU_STD_FLOOR) + + def _reset_gru_state(self, env_ids: Sequence[int]): + """Zero the GRU hidden state for the specified environments. + + Args: + env_ids: The environment indices whose hidden state should be reset. + """ + with torch.no_grad(): + self.sea_hidden_state_per_env[:, env_ids] = 0.0 + + def _predict_gru_effort( + self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor + ) -> torch.Tensor: + """Assemble the network input, run inference, and return the denormalized effort. + + Args: + control_action: The joint action instance holding the desired joint positions. + joint_pos: The current joint positions. Shape is (num_envs, num_joints). + joint_vel: The current joint velocities. Shape is (num_envs, num_joints). + + Returns: + The predicted effort [N·m or N, depending on joint type]. Shape is + (num_envs, num_joints). + + Raises: + ValueError: If ``control_action.joint_positions`` is None. + """ + if control_action.joint_positions is None: + raise ValueError("GRU actuator input requires control_action.joint_positions to be set.") + # normalized [position, position_error, velocity] inputs + position = joint_pos.flatten() + pos_error = (control_action.joint_positions - joint_pos).flatten() + velocity = joint_vel.flatten() + self.sea_input[:, 0, 0] = (position - self._position_norm[0]) / self._position_norm[1] + self.sea_input[:, 0, 1] = (pos_error - self._pos_error_norm[0]) / self._pos_error_norm[1] + self.sea_input[:, 0, 2] = (velocity - self._vel_norm[0]) / self._vel_norm[1] + + # run inference, then denormalize and guard against a non-finite output + with torch.inference_mode(): + output, self.sea_hidden_state[:] = self.network(self.sea_input, self.sea_hidden_state) + output = output * self._output_norm[1] + self._output_norm[0] + # a non-finite prediction carries no usable actuation, so command zero effort this step + output = torch.nan_to_num(output, nan=0.0, posinf=0.0, neginf=0.0) + return output.reshape(self._num_envs, self.num_joints) + + +class ActuatorNetGRU(_GRUActuatorMixin, IdealPDActuator): + """Explicit actuator model based on a recurrent neural network (GRU). + + The GRU network predicts the *total* joint effort [N·m or N, depending on joint type] from the + joint position, position error, and velocity. Unlike the analytical models, no PD gains are + applied; the hidden state of the recurrent network captures the actuator history. The predicted + effort is clipped to the actuator's effort limit via :meth:`~isaaclab.actuators.ActuatorBase._clip_effort`. + + This model derives from :class:`IdealPDActuator`, whose simple symmetric ``±effort_limit`` + saturation matches a learned total-torque source without requiring the velocity-dependent + torque-speed parameters of a DC motor. + + Note: + The recurrent hidden state encodes the actuator history and is only cleared by + :meth:`reset`. Callers must reset the relevant environments on episode boundaries + (and after any control gap, e.g. a hardware reconnect) so the first post-reset effort is + not computed against stale temporal context. + """ + + cfg: ActuatorNetGRUCfg + """The configuration of the actuator model.""" + + def __init__(self, cfg: ActuatorNetGRUCfg, *args, **kwargs): + super().__init__(cfg, *args, **kwargs) + self._init_gru_runtime() + + """ + Operations. + """ + + def reset(self, env_ids: Sequence[int]): + super().reset(env_ids) + self._reset_gru_state(env_ids) + + def compute( + self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor + ) -> ArticulationActions: + self.computed_effort = self._predict_gru_effort(control_action, joint_pos, joint_vel) + # clip the computed effort based on the motor limits + self.applied_effort = self._clip_effort(self.computed_effort) + control_action.joint_efforts = self.applied_effort + control_action.joint_positions = None + control_action.joint_velocities = None + return control_action + + +class ActuatorNetGRUResidual(_GRUActuatorMixin, ImplicitActuator): + """Implicit-PD actuator model with an added recurrent (GRU) residual effort. + + This model behaves like an :class:`ImplicitActuator` -- the physics engine applies the PD + control using the configured stiffness and damping -- but augments the feed-forward effort + term with a *residual* effort [N·m or N, depending on joint type] predicted by a recurrent + (GRU) network. The residual is added to any existing feed-forward effort, and the approximate + total effort is stored for reward computation while the desired joint positions and velocities + are preserved so the engine can compute the PD term. + + Note: + As with any :class:`ImplicitActuator`, the effort actually applied by the engine is the + feed-forward effort plus the engine-side PD term, and it is bounded by the simulation + effort limit (``effort_limit_sim``) rather than by :meth:`~isaaclab.actuators.ActuatorBase._clip_effort` + (which only populates the reported :attr:`applied_effort`). Set ``effort_limit_sim`` to a + finite value to bound the residual feed-forward. The hidden state is cleared only by + :meth:`reset`; reset the relevant environments on episode boundaries (and after any control + gap) to avoid stale recurrent context. + """ + + cfg: ActuatorNetGRUResidualCfg + """The configuration of the actuator model.""" + + def __init__(self, cfg: ActuatorNetGRUResidualCfg, *args, **kwargs): + super().__init__(cfg, *args, **kwargs) + self._init_gru_runtime() + + """ + Operations. + """ + + def reset(self, env_ids: Sequence[int]): + super().reset(env_ids) + self._reset_gru_state(env_ids) + + def compute( + self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor + ) -> ArticulationActions: + # add the GRU residual to the feed-forward effort + residual = self._predict_gru_effort(control_action, joint_pos, joint_vel) + if control_action.joint_efforts is None: + control_action.joint_efforts = residual + else: + control_action.joint_efforts = control_action.joint_efforts + residual + + # approximate total effort for reward telemetry (engine applies the PD term) + error_pos = control_action.joint_positions - joint_pos + if control_action.joint_velocities is not None: + error_vel = control_action.joint_velocities - joint_vel + else: + error_vel = -joint_vel + self.computed_effort = self.stiffness * error_pos + self.damping * error_vel + control_action.joint_efforts + self.applied_effort = self._clip_effort(self.computed_effort) + # positions/velocities are preserved so the engine computes the PD term + return control_action + + class ActuatorNetMLP(DCMotor): """Actuator model based on multi-layer perceptron and joint history. diff --git a/source/isaaclab/isaaclab/actuators/actuator_net_cfg.py b/source/isaaclab/isaaclab/actuators/actuator_net_cfg.py index 4a2d9ff30465..be754474a2a2 100644 --- a/source/isaaclab/isaaclab/actuators/actuator_net_cfg.py +++ b/source/isaaclab/isaaclab/actuators/actuator_net_cfg.py @@ -9,10 +9,10 @@ from isaaclab.utils.configclass import configclass -from .actuator_pd_cfg import DCMotorCfg +from .actuator_pd_cfg import DCMotorCfg, IdealPDActuatorCfg, ImplicitActuatorCfg if TYPE_CHECKING: - from .actuator_net import ActuatorNetLSTM, ActuatorNetMLP + from .actuator_net import ActuatorNetGRU, ActuatorNetGRUResidual, ActuatorNetLSTM, ActuatorNetMLP @configclass @@ -64,3 +64,105 @@ class ActuatorNetMLPCfg(DCMotorCfg): The index *0* corresponds to current time-step, while *n* corresponds to n-th time-step in the past. The allocated history length is `max(input_idx) + 1`. """ + + +@configclass +class ActuatorNetGRUCfg(IdealPDActuatorCfg): + """Configuration for explicit full-torque GRU actuator models. + + This configures the :class:`~isaaclab.actuators.ActuatorNetGRU` model, where a recurrent + (GRU) network predicts the *total* joint effort [N·m or N, depending on joint type]. The + network is loaded as a TorchScript module from :attr:`network_file`. Since the network + predicts the total effort directly, no PD gains are used; the computed effort is clipped to + the actuator's effort limit by :meth:`~isaaclab.actuators.ActuatorBase._clip_effort`. + """ + + class_type: type["ActuatorNetGRU"] | str = "{DIR}.actuator_net:ActuatorNetGRU" + # we don't use stiffness and damping since the network predicts the total effort + stiffness = None + damping = None + + network_file: str = MISSING + """Path to the TorchScript file containing the network weights. + + The loaded module must expose a ``.gru`` submodule (used to introspect the hidden and layer + dimensions) and implement ``forward(x, hidden) -> (output, hidden)``, where ``x`` has shape + (batch, 1, 3) carrying the joint position, position error, and velocity, ``hidden`` has shape + (num_layers, batch, hidden_dim), and ``batch = num_envs * num_joints``. The ``output`` reshapes + to (num_envs, num_joints). + """ + + position_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint position input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + pos_error_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint position error input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + vel_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint velocity input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + output_normalization: tuple[float, float] | None = None + """Output denormalization as ``(mean, std)``. + + The raw network output ``y`` is denormalized as ``y * std + mean`` to recover the effort + [N·m or N, depending on joint type]. ``None`` (the default) disables denormalization (identity). + """ + + +@configclass +class ActuatorNetGRUResidualCfg(ImplicitActuatorCfg): + """Configuration for implicit-PD actuators with an added GRU residual effort. + + This configures the :class:`~isaaclab.actuators.ActuatorNetGRUResidual` model, an + implicit-PD actuator whose feed-forward effort term is augmented by a recurrent (GRU) + network predicting a *residual* effort [N·m or N, depending on joint type]. The PD term is + handled by the physics engine using the configured :attr:`stiffness` and :attr:`damping`, + while the network output is injected as the feed-forward effort. + """ + + class_type: type["ActuatorNetGRUResidual"] | str = "{DIR}.actuator_net:ActuatorNetGRUResidual" + + network_file: str = MISSING + """Path to the TorchScript file containing the network weights. + + The loaded module must expose a ``.gru`` submodule (used to introspect the hidden and layer + dimensions) and implement ``forward(x, hidden) -> (output, hidden)``, where ``x`` has shape + (batch, 1, 3) carrying the joint position, position error, and velocity, ``hidden`` has shape + (num_layers, batch, hidden_dim), and ``batch = num_envs * num_joints``. The ``output`` reshapes + to (num_envs, num_joints). + """ + + position_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint position input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + pos_error_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint position error input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + vel_normalization: tuple[float, float] | None = None + """``(mean, std)`` applied to the joint velocity input as ``(x - mean) / std``. + + ``None`` (the default) disables normalization (identity). + """ + + output_normalization: tuple[float, float] | None = None + """Residual denormalization as ``(mean, std)``. + + The raw network output ``y`` is denormalized as ``y * std + mean`` to recover the residual + effort [N·m or N, depending on joint type]. ``None`` (the default) disables denormalization + (identity). + """ diff --git a/source/isaaclab/test/actuators/test_actuator_net_gru.py b/source/isaaclab/test/actuators/test_actuator_net_gru.py new file mode 100644 index 000000000000..26d60103b240 --- /dev/null +++ b/source/isaaclab/test/actuators/test_actuator_net_gru.py @@ -0,0 +1,552 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from isaaclab.app import AppLauncher + +HEADLESS = True + +# launch the simulator before importing the rest of the framework +simulation_app = AppLauncher(headless=HEADLESS).app + +"""Rest of imports follows""" + +import pytest +import torch + +from isaaclab.actuators import ActuatorNetGRUCfg, ActuatorNetGRUResidualCfg +from isaaclab.sim import build_simulation_context +from isaaclab.utils.types import ArticulationActions + + +@pytest.fixture +def sim(request): + """Create simulation context with the specified device.""" + device = request.getfixturevalue("device") + with build_simulation_context(device=device) as sim: + sim._app_control_on_stop_handle = None + yield sim + + +""" +Helpers: scriptable GRU modules satisfying the contract ([position, position_error, velocity] -> effort). +""" + + +class _TinyGRUNet(torch.nn.Module): + """GRU + linear-head module matching the actuator's TorchScript export contract. + + Mirrors the runtime GRU produced by the actuator-model exporter: a ``.gru`` submodule + (``torch.nn.GRU``, ``batch_first``) followed by a linear head, with recurrent dropout only when + stacking layers. ``forward(x, hidden)`` consumes ``x`` of shape (batch, 1, 3) -- the joint + position, position error, and velocity -- and ``hidden`` of shape (num_layers, batch, + hidden_dim), and returns ``(output, new_hidden)`` where ``output`` has shape (batch, 1, + output_size). + """ + + def __init__(self, input_dim: int = 3, hidden_dim: int = 4, num_layers: int = 1, dropout: float = 0.0): + super().__init__() + recurrent_dropout = dropout if num_layers > 1 else 0.0 + self.gru = torch.nn.GRU(input_dim, hidden_dim, num_layers, dropout=recurrent_dropout, batch_first=True) + self.head = torch.nn.Linear(hidden_dim, 1) + + def forward(self, x: torch.Tensor, hidden: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: + out, new_hidden = self.gru(x, hidden) + return self.head(out), new_hidden + + +def _make_network_file(tmp_path, input_dim: int = 3, hidden_dim: int = 4, num_layers: int = 1) -> str: + """Build, script, and save a tiny GRU network, returning the saved file path.""" + torch.manual_seed(0) + module = _TinyGRUNet(input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers) + module.eval() + scripted = torch.jit.script(module) + file_path = str(tmp_path / f"tiny_gru_{input_dim}_{hidden_dim}_{num_layers}.pt") + torch.jit.save(scripted, file_path) + return file_path + + +def _make_bad_network_file(tmp_path) -> str: + """Build and save a scripted module that lacks a ``.gru`` submodule.""" + + class _NoGRU(torch.nn.Module): + def __init__(self): + super().__init__() + self.linear = torch.nn.Linear(2, 1) + + def forward(self, x: torch.Tensor, hidden: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: + return self.linear(x), hidden + + scripted = torch.jit.script(_NoGRU().eval()) + file_path = str(tmp_path / "no_gru.pt") + torch.jit.save(scripted, file_path) + return file_path + + +def _make_runtime_gru_file(tmp_path, hidden_dim: int = 64, num_layers: int = 2) -> str: + """Build, script, and save a production-sized multi-layer GRU (the real export architecture).""" + torch.manual_seed(0) + module = _TinyGRUNet(input_dim=3, hidden_dim=hidden_dim, num_layers=num_layers, dropout=0.1) + module.eval() + scripted = torch.jit.script(module) + file_path = str(tmp_path / f"runtime_gru_{hidden_dim}_{num_layers}.pt") + torch.jit.save(scripted, file_path) + return file_path + + +def _make_nan_network_file(tmp_path) -> str: + """Build and save a GRU network whose head emits non-finite output (poisoned head params).""" + torch.manual_seed(0) + module = _TinyGRUNet(input_dim=3) + with torch.no_grad(): + module.head.weight.fill_(float("nan")) + module.head.bias.fill_(float("nan")) + module.eval() + scripted = torch.jit.script(module) + file_path = str(tmp_path / "nan_gru.pt") + torch.jit.save(scripted, file_path) + return file_path + + +def _reference_effort(network_file, des_pos, joint_pos, joint_vel, hidden_dim=4, num_layers=1): + """Roll the saved network forward by hand for one step (identity normalization).""" + device = joint_pos.device + num_envs, num_joints = joint_pos.shape + net = torch.jit.load(network_file, map_location=device).eval() + batch = num_envs * num_joints + hidden = torch.zeros(num_layers, batch, hidden_dim, device=device) + x = torch.stack([joint_pos.flatten(), (des_pos - joint_pos).flatten(), joint_vel.flatten()], dim=1).reshape( + batch, 1, 3 + ) + with torch.inference_mode(): + out, _ = net(x, hidden) + return out.reshape(num_envs, num_joints) + + +""" +Test ActuatorNetGRU (explicit, full-torque). +""" + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_compute(sim, num_envs, num_joints, device, tmp_path): + """ActuatorNetGRU.compute returns the network effort (matching a reference forward), nulls pos/vel.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + + network_file = _make_network_file(tmp_path) + + # large effort limit so the applied effort is the un-clipped network output + actuator_cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=network_file, effort_limit=1.0e6) + actuator = actuator_cfg.class_type( + actuator_cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device + ) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + des_pos = torch.rand(num_envs, num_joints, device=device) + control_action = ArticulationActions( + joint_positions=des_pos, + joint_velocities=torch.rand(num_envs, num_joints, device=device), + joint_efforts=None, + ) + + # independent reference forward of the same network with identity normalization + reference = _reference_effort(network_file, des_pos, joint_pos, joint_vel) + + out = actuator.compute(control_action, joint_pos, joint_vel) + + # efforts have the expected shape and positions/velocities are nulled + assert out.joint_efforts.shape == (num_envs, num_joints) + assert out.joint_positions is None + assert out.joint_velocities is None + # the returned effort matches the reference forward (catches input-assembly/order bugs) + torch.testing.assert_close(out.joint_efforts, actuator.applied_effort) + torch.testing.assert_close(out.joint_efforts, reference) + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_effort_clipping(sim, num_envs, num_joints, device, tmp_path): + """A tiny effort limit forces the applied effort to saturate at the limit.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + effort_limit = 0.5 + + network_file = _make_network_file(tmp_path) + + actuator_cfg = ActuatorNetGRUCfg( + joint_names_expr=joint_names, + network_file=network_file, + effort_limit=effort_limit, + # bias the denormalized output well above the effort limit + output_normalization=(100.0, 1.0), + ) + actuator = actuator_cfg.class_type( + actuator_cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device + ) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + control_action = ArticulationActions( + joint_positions=torch.rand(num_envs, num_joints, device=device), + joint_velocities=torch.rand(num_envs, num_joints, device=device), + ) + + actuator.compute(control_action, joint_pos, joint_vel) + torch.testing.assert_close(actuator.applied_effort, effort_limit * torch.ones(num_envs, num_joints, device=device)) + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_nan_output_is_sanitized(sim, num_envs, num_joints, device, tmp_path): + """A non-finite network output is sanitized to zero effort before reaching the engine.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + + network_file = _make_nan_network_file(tmp_path) + + actuator_cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=network_file, effort_limit=5.0) + actuator = actuator_cfg.class_type( + actuator_cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device + ) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + control_action = ArticulationActions( + joint_positions=torch.rand(num_envs, num_joints, device=device), + joint_velocities=torch.rand(num_envs, num_joints, device=device), + ) + + out = actuator.compute(control_action, joint_pos, joint_vel) + + assert torch.all(torch.isfinite(out.joint_efforts)) + torch.testing.assert_close(out.joint_efforts, torch.zeros(num_envs, num_joints, device=device)) + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_output_normalization(sim, num_envs, num_joints, device, tmp_path): + """Output denormalization scales the raw effort by std and offsets by mean.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + out_mean, out_std = 2.0, 3.0 + + network_file = _make_network_file(tmp_path) + + def _build(output_normalization): + cfg = ActuatorNetGRUCfg( + joint_names_expr=joint_names, + network_file=network_file, + effort_limit=1.0e6, + output_normalization=output_normalization, + ) + return cfg.class_type(cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device) + + actuator_identity = _build(None) + actuator_scaled = _build((out_mean, out_std)) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + des_pos = torch.rand(num_envs, num_joints, device=device) + + def _ca(): + return ArticulationActions(joint_positions=des_pos.clone(), joint_velocities=joint_vel.clone()) + + eff_identity = actuator_identity.compute(_ca(), joint_pos, joint_vel).joint_efforts.clone() + eff_scaled = actuator_scaled.compute(_ca(), joint_pos, joint_vel).joint_efforts.clone() + + torch.testing.assert_close(eff_scaled, eff_identity * out_std + out_mean) + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_input_normalization(sim, num_envs, num_joints, device, tmp_path): + """Input normalization writes ``(x - mean) / std`` for position, position error, and velocity.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + pos_norm = (0.2, 3.0) + pos_err_norm = (0.5, 2.0) + vel_norm = (-1.0, 4.0) + + network_file = _make_network_file(tmp_path) + + cfg = ActuatorNetGRUCfg( + joint_names_expr=joint_names, + network_file=network_file, + effort_limit=1.0e6, + position_normalization=pos_norm, + pos_error_normalization=pos_err_norm, + vel_normalization=vel_norm, + ) + actuator = cfg.class_type(cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + des_pos = joint_pos + 0.3 + actuator.compute(ArticulationActions(joint_positions=des_pos, joint_velocities=joint_vel), joint_pos, joint_vel) + + pos_error = (des_pos - joint_pos).flatten() + torch.testing.assert_close(actuator.sea_input[:, 0, 0], (joint_pos.flatten() - pos_norm[0]) / pos_norm[1]) + torch.testing.assert_close(actuator.sea_input[:, 0, 1], (pos_error - pos_err_norm[0]) / pos_err_norm[1]) + torch.testing.assert_close(actuator.sea_input[:, 0, 2], (joint_vel.flatten() - vel_norm[0]) / vel_norm[1]) + + +@pytest.mark.parametrize("num_envs", [2]) +@pytest.mark.parametrize("num_joints", [2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_reset(sim, num_envs, num_joints, device, tmp_path): + """reset(env_ids) zeros the GRU hidden state only for the given environments.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + + network_file = _make_network_file(tmp_path) + + cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=network_file, effort_limit=1.0e6) + actuator = cfg.class_type(cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device) + + # advance the hidden state for all envs + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + actuator.compute( + ArticulationActions( + joint_positions=torch.rand(num_envs, num_joints, device=device), joint_velocities=joint_vel + ), + joint_pos, + joint_vel, + ) + assert torch.any(actuator.sea_hidden_state_per_env[:, 0] != 0.0) + assert torch.any(actuator.sea_hidden_state_per_env[:, 1] != 0.0) + + # reset env 0 only + actuator.reset([0]) + assert torch.all(actuator.sea_hidden_state_per_env[:, 0] == 0.0) + assert torch.any(actuator.sea_hidden_state_per_env[:, 1] != 0.0) + + +""" +Test ActuatorNetGRUResidual (implicit-PD + residual). +""" + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.parametrize("preset_efforts", [False, True]) +def test_actuator_net_gru_residual_compute(sim, num_envs, num_joints, device, preset_efforts, tmp_path): + """ActuatorNetGRUResidual adds the residual to joint_efforts and preserves pos/vel. + + Covers both a pre-set ``joint_efforts`` (residual added on top) and ``None`` (residual becomes + the feed-forward effort). The approximate ``computed_effort`` follows + ``stiffness * err_pos + damping * err_vel + joint_efforts`` and positions/velocities are + preserved on return so the engine can apply the PD term. + """ + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + stiffness, damping = 40.0, 3.0 + + network_file = _make_network_file(tmp_path) + + cfg = ActuatorNetGRUResidualCfg( + joint_names_expr=joint_names, + network_file=network_file, + stiffness=stiffness, + damping=damping, + effort_limit_sim=1.0e6, + ) + actuator = cfg.class_type( + cfg, + joint_names=joint_names, + joint_ids=joint_ids, + num_envs=num_envs, + device=device, + stiffness=stiffness, + damping=damping, + ) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + des_pos = joint_pos + 0.3 + des_vel = joint_vel + 0.1 + preset = torch.rand(num_envs, num_joints, device=device) if preset_efforts else None + control_action = ArticulationActions( + joint_positions=des_pos, joint_velocities=des_vel, joint_efforts=preset.clone() if preset is not None else None + ) + + # independent reference residual (identity normalization, hidden starts at zero) + residual = _reference_effort(network_file, des_pos, joint_pos, joint_vel) + + out = actuator.compute(control_action, joint_pos, joint_vel) + + # residual is added to the feed-forward effort + expected_ff = residual if preset is None else preset + residual + torch.testing.assert_close(out.joint_efforts, expected_ff) + # approximate total effort follows the implicit-PD-plus-feedforward formula + expected_computed = stiffness * (des_pos - joint_pos) + damping * (des_vel - joint_vel) + expected_ff + torch.testing.assert_close(actuator.computed_effort, expected_computed) + # positions/velocities are preserved so the engine can apply the PD term + assert out.joint_positions is not None + assert out.joint_velocities is not None + + +@pytest.mark.parametrize("num_envs", [1, 2]) +@pytest.mark.parametrize("num_joints", [1, 2]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_residual_velocities_none(sim, num_envs, num_joints, device, tmp_path): + """When joint_velocities is None, the velocity error falls back to ``-joint_vel``.""" + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + stiffness, damping = 40.0, 3.0 + + network_file = _make_network_file(tmp_path) + + cfg = ActuatorNetGRUResidualCfg( + joint_names_expr=joint_names, + network_file=network_file, + stiffness=stiffness, + damping=damping, + effort_limit_sim=1.0e6, + ) + actuator = cfg.class_type( + cfg, + joint_names=joint_names, + joint_ids=joint_ids, + num_envs=num_envs, + device=device, + stiffness=stiffness, + damping=damping, + ) + + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + des_pos = joint_pos + 0.3 + control_action = ArticulationActions(joint_positions=des_pos, joint_velocities=None, joint_efforts=None) + + residual = _reference_effort(network_file, des_pos, joint_pos, joint_vel) + + out = actuator.compute(control_action, joint_pos, joint_vel) + + # velocity error falls back to -joint_vel when no desired velocity is provided + expected_computed = stiffness * (des_pos - joint_pos) + damping * (-joint_vel) + residual + torch.testing.assert_close(actuator.computed_effort, expected_computed) + assert out.joint_velocities is None + + +""" +Test initialization-time validation errors. +""" + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_missing_gru_submodule_raises(sim, device, tmp_path): + """A network without a ``.gru`` submodule raises ValueError at init.""" + joint_names = ["joint_0"] + bad_file = _make_bad_network_file(tmp_path) + + cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=bad_file) + with pytest.raises(ValueError): + cfg.class_type(cfg, joint_names=joint_names, joint_ids=[0], num_envs=1, device=device) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_input_dim_mismatch_raises(sim, device, tmp_path): + """A network whose GRU does not take exactly 3 inputs raises ValueError at init.""" + joint_names = ["joint_0"] + network_file = _make_network_file(tmp_path, input_dim=2) + + cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=network_file) + with pytest.raises(ValueError): + cfg.class_type(cfg, joint_names=joint_names, joint_ids=[0], num_envs=1, device=device) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_negative_std_raises(sim, device, tmp_path): + """A negative normalization std raises ValueError at init (rather than being floored).""" + joint_names = ["joint_0"] + network_file = _make_network_file(tmp_path) + + cfg = ActuatorNetGRUCfg( + joint_names_expr=joint_names, network_file=network_file, pos_error_normalization=(0.0, -2.0) + ) + with pytest.raises(ValueError): + cfg.class_type(cfg, joint_names=joint_names, joint_ids=[0], num_envs=1, device=device) + + +""" +Test the real (production-sized) GRU export architecture. +""" + + +@pytest.mark.parametrize("variant", ["explicit", "residual"]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +def test_actuator_net_gru_runtime_export_architecture(sim, variant, device, tmp_path): + """A production-sized multi-layer GRU (the real export architecture) loads and steps. + + Exercises a stacked GRU (hidden_dim=64, num_layers=2) -- matching the runtime model the + actuator-model exporter produces -- in both the explicit and residual actuators. Verifies the + multi-layer hidden-state buffer is allocated correctly, the effort is finite and correctly + shaped, and the recurrent hidden state evolves across consecutive steps (the GRU memory is + actually carried, not just zeroed). + """ + num_envs, num_joints = 2, 3 + joint_names = [f"joint_{d}" for d in range(num_joints)] + joint_ids = list(range(num_joints)) + hidden_dim, num_layers = 64, 2 + + network_file = _make_runtime_gru_file(tmp_path, hidden_dim=hidden_dim, num_layers=num_layers) + + if variant == "explicit": + cfg = ActuatorNetGRUCfg(joint_names_expr=joint_names, network_file=network_file, effort_limit=1.0e6) + actuator = cfg.class_type(cfg, joint_names=joint_names, joint_ids=joint_ids, num_envs=num_envs, device=device) + else: + stiffness, damping = 40.0, 3.0 + cfg = ActuatorNetGRUResidualCfg( + joint_names_expr=joint_names, + network_file=network_file, + stiffness=stiffness, + damping=damping, + effort_limit_sim=1.0e6, + ) + actuator = cfg.class_type( + cfg, + joint_names=joint_names, + joint_ids=joint_ids, + num_envs=num_envs, + device=device, + stiffness=stiffness, + damping=damping, + ) + + # the recurrent buffer reflects the stacked-layer network dimensions + assert actuator.sea_hidden_state.shape == (num_layers, num_envs * num_joints, hidden_dim) + + # frozen input across steps; rebuild the action each step (compute may consume it) + des_pos = torch.rand(num_envs, num_joints, device=device) + des_vel = torch.rand(num_envs, num_joints, device=device) + joint_pos = torch.rand(num_envs, num_joints, device=device) + joint_vel = torch.rand(num_envs, num_joints, device=device) + + def _action(): + return ArticulationActions(joint_positions=des_pos.clone(), joint_velocities=des_vel.clone()) + + out = actuator.compute(_action(), joint_pos, joint_vel) + assert out.joint_efforts.shape == (num_envs, num_joints) + assert torch.all(torch.isfinite(out.joint_efforts)) + + # after one step the hidden state has advanced away from zero, and a second identical step + # advances it further -- confirming the GRU memory is carried across steps + hidden_after_first = actuator.sea_hidden_state.clone() + assert torch.any(hidden_after_first != 0.0) + actuator.compute(_action(), joint_pos, joint_vel) + assert not torch.allclose(hidden_after_first, actuator.sea_hidden_state) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "--maxfail=1"])