Extend eval runner: per-env camera recording, metrics file, and episode boundaries#776
Extend eval runner: per-env camera recording, metrics file, and episode boundaries#776aiguldzh-nvidia wants to merge 12 commits into
Conversation
There was a problem hiding this comment.
Code Review: EpisodeRecord with Task Metadata, Metrics Output, All Envs Recording and Episode Boundaries
Summary
This PR adds a well-structured EpisodeRecord schema and wires up episode boundaries, metrics persistence, and multi-env camera recording into the eval runner. The overall architecture is clean — dataclass-based schema with versioning, clear separation between building and writing records, and thoughtful extension points for future phases. A few issues worth addressing before merge.
🔴 Critical Issues
1. MetricsLogger receives non-timestamped path (race with path mutation)
File: eval_runner.py (lines ~205–215)
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json")
# ... later ...
if args_cli.metrics_file is not None:
base, ext = os.path.splitext(args_cli.metrics_file)
args_cli.metrics_file = f"{base}_{run_ts}{ext}"MetricsLogger is initialized before args_cli.metrics_file is mutated to include the timestamp. The logger stores the original path at construction time, so save_metrics_to_file() writes to the non-timestamped path while the [INFO] print references metrics_logger.metrics_file (also non-timestamped). The timestamped path is effectively dead code.
Suggestion: Move MetricsLogger instantiation to after the timestamp mutation, or explicitly update metrics_logger.metrics_file post-mutation.
2. episode_boundaries lost on exception — NameError in caller
File: eval_runner.py (line ~268)
metrics, episode_boundaries = rollout_policy(...)If rollout_policy raises an exception (which re-raises after pbar cleanup), the tuple unpacking never completes, so episode_boundaries is unbound in the except block's scope. While the failure-path build_episode_record(...) call doesn't pass episode_boundaries, any future code that references it in the except block would hit a NameError. The variable should be initialized before the call:
episode_boundaries = []
metrics, episode_boundaries = rollout_policy(...)This also future-proofs the failure record to include partial boundaries collected before the crash.
🟡 Moderate Issues
3. MetricsLogger always receives a metrics_file argument even when user didn't request file output
File: eval_runner.py
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json")When --metrics_file is not passed, args_cli.metrics_file is None, so this falls through to "metrics.json". If MetricsLogger writes on destruction or has side effects, this could produce an unexpected metrics.json file. The original code passed no argument — this changes the default behavior. Consider:
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file)and only calling save_metrics_to_file() when the file is set (which you already gate on args_cli.metrics_file is not None).
4. Off-by-one risk in num_episodes mode trailing boundary
File: policy_runner.py (lines ~141–148)
last_step = (num_steps_completed - 1) if num_steps is not None else num_steps_completedIn num_episodes mode, the comment says "break fired before increment." This is correct if the break at the episode-count check fires before num_steps_completed += 1. However, the break can also fire when num_steps is not None inside the inner if num_steps ... branch. Verify that both exit paths are covered by the same formula, especially when num_episodes mode terminates mid-step due to the num_episodes_completed >= num_episodes check happening after num_steps_completed is already incremented (the +=1 appears before the episode count check in the original code).
5. Memory accumulation unbounded — no chunked flush for multi-env recording
File: camera_video.py
The docstring warns about ~3.8 GB for 10 envs / 500 steps / 512×512×3, but there's no runtime guard. For longer runs where video_length is large, the in-memory buffers will grow linearly until _flush(). Consider adding a configurable max_buffer_frames parameter that triggers an intermediate flush-and-stitch, or at minimum a runtime warning when the estimated buffer exceeds a threshold (e.g., 2 GB).
🟢 Suggestions / Nits
6. EpisodeRecord.arena_env_args typed as list[str] but populated with list(job.arena_env_args)
File: episode_record.py (field declaration) vs job_manager.py (arena_env_args is read from a dict)
In Job.from_dict, arena_env_args comes from converting a dict to a CLI list — confirm the runtime type is always list[str] and not list[Any]. If the config dict values are non-string, this will produce a JSON record with mixed types that breaks the schema contract.
7. _find_video_paths only searches one level deep
File: episode_record.py
job_video_dir = os.path.join(video_dir, job_name)Now that video_dir already has the timestamp subdirectory appended (video_dir/run_ts), the video files are at video_dir/run_ts/job_name/*.mp4. But _find_video_paths receives args_cli.video_dir (already timestamped) and appends job_name. This seems correct, but worth a comment to clarify the expected directory layout for future maintainers.
8. CI failures unrelated to this PR
The "Run tests" and "GR00T closed-loop E2E" checks are failing — confirm these are pre-existing failures on main and not regressions introduced here.
✅ What Looks Good
- Clean dataclass schema with forward/backward compatibility (
from_dictfilters toknownfields) - Schema versioning from day one
- Episode boundary tracking is well-reasoned — start/end inclusive, handles both termination modes
- Failure-path episode record writing (silently swallowed exceptions prevent cascading failures)
- Timestamp-isolated output directories prevent stale file accumulation
- Good docstrings explaining the frame-index ↔ step-index correspondence
Update 2 (commit 66dd66a)
The latest commits complete the scope reduction by:
- Deleting
episode_record.pyentirely (335 lines removed) - Removing
--episode_record_dirCLI argument from eval_runner_cli.py - Cleaning up eval_runner.py to remove all episode record building/writing logic
- Adding new test coverage for task metadata extraction in Job.from_dict
Previous findings status:
| Finding | Status |
|---|---|
| 🔴 #1 (MetricsLogger timestamped-path race) | |
| 🔴 #2 (episode_boundaries NameError) | ✅ Moot — boundaries now discarded (_) |
| 🟡 #3 (MetricsLogger default fallback) | |
| 🟡 #4 (Off-by-one in boundaries) | ✅ Moot — boundaries no longer consumed |
| 🟡 #5 (Memory accumulation in camera_video) | |
| 🟢 #6, #7 (EpisodeRecord schema concerns) | ✅ Moot — file removed |
New observations:
- ✅ New tests
test_job_from_dict_task_metadataandtest_job_task_metadata_defaultsare well-structured - ✅ Minor docstring update in job_manager.py removes now-stale EpisodeRecord reference
- The PR now focuses cleanly on task metadata extraction, metrics output, and camera recording — EpisodeRecord is deferred
Remaining actionable items:
- 🔴 Fix MetricsLogger initialization order — move instantiation after timestamp mutation (or update the path post-mutation)
- 🟡 Consider removing the
or "metrics.json"fallback if file output is only intended when explicitly requested - 🟡 camera_video.py memory warning is still relevant for long runs
Greptile SummaryThis PR extends the eval runner with per-env camera recording, timestamped run directories, per-job episode record JSON files, and episode boundary tracking. The
Confidence Score: 3/5Not safe to merge as-is: the timestamped run directory feature silently misfires with the default video_dir path, and failed-job records always have empty video_paths. Two independent correctness problems exist in eval_runner.py. First, os.path.relpath(video_dir, run_base) produces ../.. -prefixed paths whenever the default absolute video_dir=/eval/videos and a relative --metrics_file path are combined, causing the rerouted video_dir to land outside run_dir entirely — the primary timestamped-isolation feature of this PR fails silently in the default invocation. Second, _write_episode_record is called in the except block before finally closes the env; CameraObsVideoRecorder.close() (the flush) only runs in finally, so video_paths will always be empty for failed jobs even when frames were recorded. isaaclab_arena/evaluation/eval_runner.py — path remapping logic (lines 288-298) and the exception-path record write ordering (lines 374-390). Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[eval_runner main] --> B{any output flag set?}
B -- yes --> C[compute run_ts + run_dir]
C --> D[remap video_dir / metrics_file / episode_record_dir into run_dir]
D --> E[MetricsLogger with timestamped path]
B -- no --> E
E --> F[for each Job]
F --> G[load_env + get_policy]
G --> H{args.video?}
H -- yes --> I[wrap with RecordVideo]
H -- no --> J{args.camera_video?}
I --> J
J -- yes --> K[wrap with CameraObsVideoRecorder]
J -- no --> L[rollout_policy]
K --> L
L --> M[returns metrics + episode_boundaries]
M --> N[metrics_logger.append_job_metrics]
N --> O{episode_record_dir?}
O -- yes --> P[_write_episode_record scans video_dir for mp4s]
P --> Q[write job.name.json]
L -- exception --> R[except: write failed record]
R --> S[finally: _close_job_resources to env.close to CameraObs._flush]
Q --> S
S --> F
F --> T[metrics_logger.save_metrics_to_file]
|
| metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") | ||
|
|
||
| job_manager.print_jobs_info() | ||
|
|
||
| if args_cli.video: | ||
| os.makedirs(args_cli.video_dir, exist_ok=True) | ||
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | ||
| if args_cli.video or args_cli.camera_video or args_cli.episode_record_dir or args_cli.metrics_file: | ||
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") | ||
| if args_cli.video or args_cli.camera_video: | ||
| args_cli.video_dir = os.path.join(args_cli.video_dir, run_ts) | ||
| os.makedirs(args_cli.video_dir, exist_ok=True) | ||
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | ||
| if args_cli.episode_record_dir is not None: | ||
| args_cli.episode_record_dir = os.path.join(args_cli.episode_record_dir, run_ts) | ||
| if args_cli.metrics_file is not None: | ||
| base, ext = os.path.splitext(args_cli.metrics_file) | ||
| args_cli.metrics_file = f"{base}_{run_ts}{ext}" |
There was a problem hiding this comment.
MetricsLogger gets the non-timestamped path — timestamp suffix is never applied
MetricsLogger is constructed at line 208 using args_cli.metrics_file (e.g. "out/metrics.json"). The timestamp is computed and appended to args_cli.metrics_file later at lines 220-222, but that mutation only updates args_cli.metrics_file — it never updates metrics_logger.metrics_file. When save_metrics_to_file() is called at line 326, it uses the original non-timestamped path and overwrites the previous run's output instead of writing a new file named out/metrics_20250610T120000.json.
| job_manager = JobManager(eval_jobs_config["jobs"]) | ||
| metrics_logger = MetricsLogger() | ||
| metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") | ||
|
|
||
| job_manager.print_jobs_info() | ||
|
|
||
| if args_cli.video: | ||
| os.makedirs(args_cli.video_dir, exist_ok=True) | ||
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | ||
| if args_cli.video or args_cli.camera_video or args_cli.episode_record_dir or args_cli.metrics_file: | ||
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") | ||
| if args_cli.video or args_cli.camera_video: | ||
| args_cli.video_dir = os.path.join(args_cli.video_dir, run_ts) | ||
| os.makedirs(args_cli.video_dir, exist_ok=True) | ||
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | ||
| if args_cli.episode_record_dir is not None: | ||
| args_cli.episode_record_dir = os.path.join(args_cli.episode_record_dir, run_ts) | ||
| if args_cli.metrics_file is not None: | ||
| base, ext = os.path.splitext(args_cli.metrics_file) | ||
| args_cli.metrics_file = f"{base}_{run_ts}{ext}" |
There was a problem hiding this comment.
MetricsLogger should be constructed after the timestamp is applied to
args_cli.metrics_file
Moving the MetricsLogger construction to after the timestamp suffix is appended ensures metrics_logger.metrics_file holds the timestamped path, so save_metrics_to_file() actually writes the timestamped file instead of overwriting the bare path on every run.
| job_manager = JobManager(eval_jobs_config["jobs"]) | |
| metrics_logger = MetricsLogger() | |
| metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") | |
| job_manager.print_jobs_info() | |
| if args_cli.video: | |
| os.makedirs(args_cli.video_dir, exist_ok=True) | |
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | |
| if args_cli.video or args_cli.camera_video or args_cli.episode_record_dir or args_cli.metrics_file: | |
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") | |
| if args_cli.video or args_cli.camera_video: | |
| args_cli.video_dir = os.path.join(args_cli.video_dir, run_ts) | |
| os.makedirs(args_cli.video_dir, exist_ok=True) | |
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | |
| if args_cli.episode_record_dir is not None: | |
| args_cli.episode_record_dir = os.path.join(args_cli.episode_record_dir, run_ts) | |
| if args_cli.metrics_file is not None: | |
| base, ext = os.path.splitext(args_cli.metrics_file) | |
| args_cli.metrics_file = f"{base}_{run_ts}{ext}" | |
| job_manager = JobManager(eval_jobs_config["jobs"]) | |
| job_manager.print_jobs_info() | |
| if args_cli.video or args_cli.camera_video or args_cli.episode_record_dir or args_cli.metrics_file: | |
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") | |
| if args_cli.video or args_cli.camera_video: | |
| args_cli.video_dir = os.path.join(args_cli.video_dir, run_ts) | |
| os.makedirs(args_cli.video_dir, exist_ok=True) | |
| print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") | |
| if args_cli.episode_record_dir is not None: | |
| args_cli.episode_record_dir = os.path.join(args_cli.episode_record_dir, run_ts) | |
| if args_cli.metrics_file is not None: | |
| base, ext = os.path.splitext(args_cli.metrics_file) | |
| args_cli.metrics_file = f"{base}_{run_ts}{ext}" | |
| metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") |
alexmillane
left a comment
There was a problem hiding this comment.
Thanks for adding this!
I have a few comments about the requirements for some of these things. I think we should be able to clean things up and simplify drastically.
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Gym wrapper that records one mp4 per camera in ``obs['camera_obs']``. | ||
| """Gym wrapper that records one mp4 per camera per env in ``obs['camera_obs']``. |
There was a problem hiding this comment.
Out of interest, what was the prior behaviour... Perhaps I'll find out if I read on.
There was a problem hiding this comment.
Previously, only environment 0 was recorded, and frames from other parallel environments were silently ignored. Now, all environments are recorded, producing one MP4 file per (env, camera) pair.
| if self.buffers and all( | ||
| len(env_frames) >= self.video_length | ||
| for env_frame_lists in self.buffers.values() | ||
| for env_frames in env_frame_lists | ||
| ): |
There was a problem hiding this comment.
This compound statement is hard to read—suggestion to add a comment (or split it over multiple lines).
Maybe the comment is something like: "flush if all the videos in the buffer exceed the video length.
| for cam, env_frame_lists in self.buffers.items(): | ||
| for env_idx, frames in enumerate(env_frame_lists): | ||
| if not frames: | ||
| continue | ||
| path = os.path.join( | ||
| self.video_folder, | ||
| f"{self.name_prefix}-env{env_idx}-{_sanitize_cam_key(cam)}-step-{self.recording_start_step}.mp4", | ||
| ) | ||
| clip = ImageSequenceClip(list(frames), fps=self.fps) | ||
| clip.write_videofile(path, logger=None, audio=False) | ||
| del clip |
There was a problem hiding this comment.
Am I correct that this could take some time to write? What do you think about adding some logging here to indicate it's started. Maybe just a print at the start of the function?
| metrics_logger.append_job_metrics(job.name, metrics) | ||
|
|
||
| if args_cli.episode_record_dir: | ||
| rec_path = _write_episode_record( |
There was a problem hiding this comment.
Rollout policies above records for multiple episodes, so this function actually writes per-job right?
There was a problem hiding this comment.
Yes, correct, it writes one json per job, not per episode, the name is misleading, I'll rename it
| "job_name": job.name, | ||
| "task_name": job.task_name, | ||
| "embodiment": job.embodiment, | ||
| "env_params": dict(job.env_params), | ||
| "policy_type": job.policy_type, | ||
| "policy_config": dict(job.policy_config_dict), | ||
| "num_envs": job.num_envs, | ||
| "num_steps": job.num_steps, | ||
| "num_episodes": job.num_episodes, |
There was a problem hiding this comment.
All of these job.* fields are present in the config file that is input to the eval_runner.py via the config file. Here, we re-extract the information and reexport it.
Is there a reason that we can't just use the input file (for example see here), rather than reading and then re-exporting the information?
| args_cli.episode_record_dir, | ||
| job, | ||
| {}, | ||
| "failed", |
There was a problem hiding this comment.
Failed is written to the job status 5 lines above here. Consider using Job.status
| if args_cli.metrics_file is not None: | ||
| metrics_logger.save_metrics_to_file() | ||
| print(f"[INFO] Metrics saved to: {metrics_logger.metrics_file}") |
There was a problem hiding this comment.
Thanks for adding that!
| task_name: str = None, | ||
| embodiment: str = None, |
There was a problem hiding this comment.
These two variables are overdetermined. They're contained in the config .json inside the environment.
I guess what you want here is some serialization (i.e. a string) representing the Arena environment? Is that correct?
I think there is a better way to do this. Let's talk about it.
| language_instruction: str = None, | ||
| task_name: str = None, | ||
| embodiment: str = None, | ||
| env_params: dict = None, |
There was a problem hiding this comment.
These are already stored in the class as arena_env_args
| num_episodes: int | None, | ||
| language_instruction: str | None = None, | ||
| ) -> dict[str, Any]: | ||
| ) -> tuple[dict[str, Any] | None, list[dict]]: |
There was a problem hiding this comment.
See comment above. Suggestion to revert this and change our camera recorder to record per-episode videos.
| if args_cli.video or args_cli.camera_video or args_cli.metrics_file or args_cli.episode_record_dir: | ||
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") |
There was a problem hiding this comment.
Let's just unconditionally query the date, and then we get rid of this quadruple if statement and de-indent the following if statements by one level.
Summary
Extend eval runner with per-env camera recording, metrics file output, timestamped run directories, and per-job episode record JSON
Detailed description
CameraObsVideoRecordernow records all parallel envs (one file per env per camera) instead of env 0 only; each run gets a timestamped subdirectory under--video_dirto avoid stale files accumulating across runseval_runnergains--camera_video,--metrics_file, and--episode_record_dirflags;save_metrics_to_file()and camera video recording existed but were never wired up from the eval runner--episode_record_dirwrites one JSON file per job after it completes (or fails), capturing:task_name,embodiment,env_params,policy_type,policy_config,language_instruction,hdf5_path,video_paths,metrics,episode_boundaries,wall_time_secondsrollout_policy()now returns(metrics, episode_boundaries)tracking{env_idx, start_step, end_step}per completed episode; frame index inCameraObsVideoRecorderoutput equals step index so boundaries can be used to slice per-env videos intoindividual episode clips
Jobnow carriestask_name,embodiment, andenv_paramsextracted fromarena_env_argsbefore it is converted to a CLI listEAGAINfile-lock conflicts when a previous run crashed without releasing the lock