Skip to content

Add RLDS writer#149

Open
Guilherme Penedo (guipenedo) wants to merge 1 commit into
mainfrom
codex/rlds-writer-clean
Open

Add RLDS writer#149
Guilherme Penedo (guipenedo) wants to merge 1 commit into
mainfrom
codex/rlds-writer-clean

Conversation

@guipenedo
Copy link
Copy Markdown
Collaborator

@guipenedo Guilherme Penedo (guipenedo) commented May 21, 2026

Adds a standalone RLDS-style TFRecord writer for robotics episode rows.\n\nWhat changed:\n- add RefinerPipeline.write_rlds(...)\n- add RldsSink that writes one TFRecord Example per RoboticsRow episode\n- add optional tensorflow extra\n- add dependency-free feature mapping test plus TensorFlow integration test when TensorFlow is installed\n\nLocal verification:\n- uv run ruff format --check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run ruff check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run ty check src/refiner/pipeline/sinks/rlds.py src/refiner/pipeline/sinks/init.py src/refiner/pipeline/pipeline.py tests/pipeline/test_rlds_sink.py\n- uv run pytest tests/pipeline/test_rlds_sink.py tests/robotics/test_robotics_row.py -q -> 24 passed, 1 skipped; skipped test requires TensorFlow

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: efc08fad18

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

Comment on lines +60 to +62
writer = tf.io.TFRecordWriter(
self.output.abs_path(self._relpath(shard_id)),
options=options,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use DataFolder I/O path for TFRecord writes

RldsSink opens TFRecord files with tf.io.TFRecordWriter(self.output.abs_path(...)) instead of going through DataFolder.open(...). This bypasses the filesystem abstraction used by other sinks (e.g., JSONL/Parquet), so write_rlds will fail or ignore configured filesystem backends/credentials for non-local outputs (such as s3://... or custom fs), and it also skips DataFolder's auto-mkdir behavior for missing parent directories. In practice, pipelines that successfully write other formats to remote folders can break when switched to write_rlds.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the RldsSink and a corresponding write_rlds pipeline method to enable exporting robotics data into RLDS-style TFRecord formats. The implementation includes logic for mapping RoboticsRow attributes to TensorFlow features and adds tensorflow as an optional dependency. Review feedback highlights several areas for improvement: expanding the feature set to include all observations rather than a hardcoded subset, optimizing performance by avoiding expensive tolist() calls on large arrays, and improving the efficiency of PyArrow to NumPy conversions.

Comment on lines +101 to +103
_add_array_features(tf, features, "steps/action", row.actions)
_add_array_features(tf, features, "steps/observation/state", row.states)
_add_array_features(tf, features, "steps/timestamp", row.timestamps)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation only writes a hardcoded subset of robotics features (action, state, timestamp). It misses all other observations and videos that might be present in the RoboticsRow. To support full RLDS datasets, you should iterate over row.observations() and include all available features in the steps/observation/ namespace.

if values is None:
return
array = _array(values)
features[name] = _float_feature(tf, array.reshape(-1).astype(float).tolist())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line has several performance and flexibility issues:

  1. tolist() on a large numpy array is slow and memory-intensive. tf.train.FloatList accepts any iterable, so you can pass the numpy array directly.
  2. Forcing all array features to float prevents the use of integer-based observations or actions (e.g., discrete actions or pixel-based states).
  3. astype(float) defaults to float64, while FloatList stores float32. It's better to cast to np.float32 explicitly if floating point is intended.
Suggested change
features[name] = _float_feature(tf, array.reshape(-1).astype(float).tolist())
features[name] = _float_feature(tf, array.reshape(-1).astype(np.float32)) if array.dtype.kind == 'f' else _int_feature(tf, array.reshape(-1).astype(np.int64).tolist())

Comment on lines +158 to +161
def _array(values: Any) -> np.ndarray:
if isinstance(values, pa.ChunkedArray | pa.Array):
return np.asarray(values.to_pylist())
return np.asarray(values)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Converting PyArrow arrays to numpy via to_pylist() is inefficient. You should use to_numpy() where possible for better performance.

Suggested change
def _array(values: Any) -> np.ndarray:
if isinstance(values, pa.ChunkedArray | pa.Array):
return np.asarray(values.to_pylist())
return np.asarray(values)
def _array(values: Any) -> np.ndarray:
if isinstance(values, pa.ChunkedArray | pa.Array):
try:
return values.to_numpy()
except (pa.ArrowInvalid, pa.ArrowNotImplementedError):
return np.asarray(values.to_pylist())
return np.asarray(values)

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 068cdfaba7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

return self._tf

def write_shard_block(self, shard_id: str, block: Block) -> int:
rows = block if not isinstance(block, Tabular) else list(block)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stream tabular rows when writing TFRecord shards

The new write_shard_block path eagerly converts Tabular blocks via list(block) before writing, which materializes every row view in memory at once. On large shards this can significantly increase peak memory and cause OOMs or slowdowns even though TFRecord emission is sequential. Iterating the Tabular directly (or in batches) avoids the extra allocation and keeps sink memory bounded.

Useful? React with 👍 / 👎.

)
)

def write_rlds(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Update docs for new write_rlds pipeline API

This commit introduces a new user-facing feature (RefinerPipeline.write_rlds) but does not update docs/, which violates the repository rule in AGENTS.md (“Any new feature, execution block, architectural change, or user-visible behavior change must include corresponding doc updates in the same change set”). Without docs, users miss required usage/dependency details for the new sink and the published API surface becomes inconsistent with project documentation.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant