Add RLDS writer#149
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: efc08fad18
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| writer = tf.io.TFRecordWriter( | ||
| self.output.abs_path(self._relpath(shard_id)), | ||
| options=options, |
There was a problem hiding this comment.
Use DataFolder I/O path for TFRecord writes
RldsSink opens TFRecord files with tf.io.TFRecordWriter(self.output.abs_path(...)) instead of going through DataFolder.open(...). This bypasses the filesystem abstraction used by other sinks (e.g., JSONL/Parquet), so write_rlds will fail or ignore configured filesystem backends/credentials for non-local outputs (such as s3://... or custom fs), and it also skips DataFolder's auto-mkdir behavior for missing parent directories. In practice, pipelines that successfully write other formats to remote folders can break when switched to write_rlds.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces the RldsSink and a corresponding write_rlds pipeline method to enable exporting robotics data into RLDS-style TFRecord formats. The implementation includes logic for mapping RoboticsRow attributes to TensorFlow features and adds tensorflow as an optional dependency. Review feedback highlights several areas for improvement: expanding the feature set to include all observations rather than a hardcoded subset, optimizing performance by avoiding expensive tolist() calls on large arrays, and improving the efficiency of PyArrow to NumPy conversions.
| _add_array_features(tf, features, "steps/action", row.actions) | ||
| _add_array_features(tf, features, "steps/observation/state", row.states) | ||
| _add_array_features(tf, features, "steps/timestamp", row.timestamps) |
There was a problem hiding this comment.
The current implementation only writes a hardcoded subset of robotics features (action, state, timestamp). It misses all other observations and videos that might be present in the RoboticsRow. To support full RLDS datasets, you should iterate over row.observations() and include all available features in the steps/observation/ namespace.
| if values is None: | ||
| return | ||
| array = _array(values) | ||
| features[name] = _float_feature(tf, array.reshape(-1).astype(float).tolist()) |
There was a problem hiding this comment.
This line has several performance and flexibility issues:
tolist()on a large numpy array is slow and memory-intensive.tf.train.FloatListaccepts any iterable, so you can pass the numpy array directly.- Forcing all array features to
floatprevents the use of integer-based observations or actions (e.g., discrete actions or pixel-based states). astype(float)defaults tofloat64, whileFloatListstoresfloat32. It's better to cast tonp.float32explicitly if floating point is intended.
| features[name] = _float_feature(tf, array.reshape(-1).astype(float).tolist()) | |
| features[name] = _float_feature(tf, array.reshape(-1).astype(np.float32)) if array.dtype.kind == 'f' else _int_feature(tf, array.reshape(-1).astype(np.int64).tolist()) |
| def _array(values: Any) -> np.ndarray: | ||
| if isinstance(values, pa.ChunkedArray | pa.Array): | ||
| return np.asarray(values.to_pylist()) | ||
| return np.asarray(values) |
There was a problem hiding this comment.
Converting PyArrow arrays to numpy via to_pylist() is inefficient. You should use to_numpy() where possible for better performance.
| def _array(values: Any) -> np.ndarray: | |
| if isinstance(values, pa.ChunkedArray | pa.Array): | |
| return np.asarray(values.to_pylist()) | |
| return np.asarray(values) | |
| def _array(values: Any) -> np.ndarray: | |
| if isinstance(values, pa.ChunkedArray | pa.Array): | |
| try: | |
| return values.to_numpy() | |
| except (pa.ArrowInvalid, pa.ArrowNotImplementedError): | |
| return np.asarray(values.to_pylist()) | |
| return np.asarray(values) |
efc08fa to
068cdfa
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 068cdfaba7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| return self._tf | ||
|
|
||
| def write_shard_block(self, shard_id: str, block: Block) -> int: | ||
| rows = block if not isinstance(block, Tabular) else list(block) |
There was a problem hiding this comment.
Stream tabular rows when writing TFRecord shards
The new write_shard_block path eagerly converts Tabular blocks via list(block) before writing, which materializes every row view in memory at once. On large shards this can significantly increase peak memory and cause OOMs or slowdowns even though TFRecord emission is sequential. Iterating the Tabular directly (or in batches) avoids the extra allocation and keeps sink memory bounded.
Useful? React with 👍 / 👎.
| ) | ||
| ) | ||
|
|
||
| def write_rlds( |
There was a problem hiding this comment.
Update docs for new write_rlds pipeline API
This commit introduces a new user-facing feature (RefinerPipeline.write_rlds) but does not update docs/, which violates the repository rule in AGENTS.md (“Any new feature, execution block, architectural change, or user-visible behavior change must include corresponding doc updates in the same change set”). Without docs, users miss required usage/dependency details for the new sink and the published API surface becomes inconsistent with project documentation.
Useful? React with 👍 / 👎.
Adds a standalone RLDS-style TFRecord writer for robotics episode rows.\n\nWhat changed:\n- add RefinerPipeline.write_rlds(...)\n- add RldsSink that writes one TFRecord Example per RoboticsRow episode\n- add optional tensorflow extra\n- add dependency-free feature mapping test plus TensorFlow integration test when TensorFlow is installed\n\nLocal verification:\n- uv run ruff format --check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run ruff check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run ty check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run pytest tests/pipeline/test_rlds_sink.py tests/robotics/test_robotics_row.py -q -> 24 passed, 1 skipped; skipped test requires TensorFlow