Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3b553e3
Add Zarr reader and writer
guipenedo May 21, 2026
08ca3c0
Move Zarr row splitting into reader
guipenedo May 23, 2026
9e09717
Keep Zarr PR reader-only
guipenedo May 23, 2026
b1d6a2c
Tighten Zarr reader selections
guipenedo May 23, 2026
28bd3ba
Shard Zarr row-end reads
guipenedo May 23, 2026
841aaed
Restore generic episode-end robot splits
guipenedo May 23, 2026
55b81bb
Validate Zarr reader metadata names
guipenedo May 23, 2026
6f58278
Reject duplicate Zarr metadata columns
guipenedo May 23, 2026
528182c
Batch Zarr row-end reads
guipenedo May 23, 2026
09e8636
Share reader path selection types
guipenedo May 23, 2026
eb86905
Keep row splitting in Zarr reader
guipenedo May 23, 2026
1b00385
Harden Zarr row-end reads
guipenedo May 23, 2026
2127e32
Plan Zarr reads from metadata
guipenedo May 23, 2026
5a6a29c
Clean up Zarr reader planning
guipenedo May 23, 2026
f271cb5
Avoid repeated Zarr shard reads
guipenedo May 23, 2026
d888a36
Tighten Zarr row boundary validation
guipenedo May 23, 2026
ae02651
Emit leading-axis Zarr rows per index
guipenedo May 23, 2026
77018f3
Add Zarr leading axis row size
guipenedo May 23, 2026
8953a2c
Move reader path selection helpers into utils
guipenedo May 23, 2026
464fc68
Avoid duplicate Zarr read validation
guipenedo May 23, 2026
935e720
Simplify Zarr shard planning
guipenedo May 23, 2026
772d2de
Inline tiny Zarr helpers
guipenedo May 23, 2026
4ec551e
Support Zarr 3 runtime
guipenedo May 23, 2026
bb7e78d
Improve Zarr robotics sharding
guipenedo May 23, 2026
0416d9d
Preserve Zarr filesystem handles
guipenedo May 23, 2026
81ab4f6
Clarify Zarr split sharding docs
guipenedo May 23, 2026
08928c2
Use fsspec for zipped Zarr stores
guipenedo May 23, 2026
89a865b
Add Zarr row batch sizing
guipenedo May 23, 2026
a86e86f
Document Zarr robotics reference datasets
guipenedo May 23, 2026
703253d
Disable fsspec cache for remote Zarr zips
guipenedo May 23, 2026
efd6efc
Clean up Zarr reader branching
guipenedo May 23, 2026
cba3ff0
Trim reader helper wrappers
guipenedo May 23, 2026
63e5541
Address Zarr reader review comments
guipenedo May 23, 2026
c091a0e
Tighten Zarr zip input handling
guipenedo May 23, 2026
0103d15
Reject row_ends output selection
guipenedo May 23, 2026
37b78fe
Support Zarr 3 reader installs
guipenedo May 23, 2026
535a74f
Drop Zarr 3 compatibility path
guipenedo May 23, 2026
3c6ead2
Simplify Zarr zip docs
guipenedo May 23, 2026
10627ae
Trim Zarr docs and cover DataFolder URL roots
guipenedo May 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions docs/reading-and-writing.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,104 @@ datasets or attributes default to raising an error. Set
If a selected column can be missing from every group in an input file, pass
`dtypes` for that column so the reader can emit a stable Arrow type.

## Zarr

Zarr support lives behind the optional `macrodata-refiner[zarr]` extra.

```bash
uv add "macrodata-refiner[zarr]"
```

`read_zarr(...)` reads one Zarr group, including directory stores and
`.zarr.zip` stores. By default, the group becomes one output row and selected
arrays are loaded as full array values.

```python
import refiner as mdr

pipeline = mdr.read_zarr(
"replay_buffer.zarr",
arrays={
"action": "data/action",
"state": "data/state",
},
attrs={"task": "task"},
)
```

`arrays` and `attrs` accept the same selection forms as HDF5: a mapping from
output column name to Zarr path, one path string, or a sequence of path strings
with unique final components. Use a mapping when derived names would collide.

For robotics-style replay buffers, pass `row_ends` to split concatenated arrays
into logical rows, usually episodes:

```python
episodes = mdr.read_zarr(
"replay_buffer.zarr",
arrays={
"action": "data/action",
"observation.state": "data/state",
"frames": "data/rgb",
},
attrs={"task": "task"},
row_ends="meta/episode_ends",
index_column="episode_id",
file_path_column=None,
)
```

For a store shaped like:

```text
replay_buffer.zarr
├── data
│ ├── action # shape [total_steps, action_dim]
│ ├── state # shape [total_steps, state_dim]
│ └── rgb # shape [total_steps, height, width, channels]
└── meta
└── episode_ends # cumulative end offsets, for example [152, 319, 477]
```

this emits one row per `[start:end]` slice. The selected arrays are sliced along
their leading dimension, while selected attrs are repeated on each row.
`index_column` receives the row/episode index when `row_ends` is set. Set it to
`None` to omit that metadata. The final row end must match the leading dimension
of every selected array.

If a Zarr store has aligned arrays but no episode boundaries, use
`split_leading_axis=True` to emit fixed-size rows along the leading axis:

```python
rows = mdr.read_zarr(
"replay_buffer.zarr",
arrays={
"action": "data/action",
"frames": "data/rgb",
},
split_leading_axis=True,
leading_axis_row_size=1,
target_shard_bytes=128 * 1024**2,
)
```

Shard planning in this mode uses chunk metadata from the selected array with the
largest per-row byte size (`dtype.itemsize * product(shape[1:])`). This keeps
large image/video arrays in control of shard boundaries, so tiny action/state
arrays stored as one huge chunk do not force Refiner to load a much larger image
block than necessary.

This mode requires selected arrays to have the same leading dimension, and that
dimension must be divisible by `leading_axis_row_size`. Each output row contains
`leading_axis_row_size` contiguous items from every selected array. Refiner plans
shards from array metadata and tries to keep shard boundaries aligned with the
dominant array's leading-axis chunks. Use `num_shards` when you need a target
shard count instead of byte-sized packing.

By default, split readers load one shard block at a time and slice logical rows
from that block. Set `row_batch_size` to cap how many logical rows are loaded per
block when a shard would otherwise materialize too much data.

## Common Crawl text readers

[Common Crawl](https://commoncrawl.org/) publishes large public web crawls.
Expand Down
40 changes: 40 additions & 0 deletions docs/robotics_conversion.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,43 @@ pipeline = (
)
)
```

## Zarr Replay Buffers

Some robotics replay buffers are stored as unzipped Zarr directory stores with
frame-aligned arrays under `data/` and cumulative episode boundaries under
`meta/episode_ends`.

Reference datasets:

- RoboCasa MT4 N216:
`hf://datasets/ahad-j/robocasa_mt4_N216_zarr/mt4_N216.zarr`
(`https://huggingface.co/datasets/ahad-j/robocasa_mt4_N216_zarr`)
- MetaWorld MT4 N200:
`hf://datasets/runningkiwi/metaworld_mt4_n200_zarr`
(`https://huggingface.co/datasets/runningkiwi/metaworld_mt4_n200_zarr`)

```python
import refiner as mdr

pipeline = (
mdr.read_zarr(
"hf://datasets/ahad-j/robocasa_mt4_N216_zarr/mt4_N216.zarr",
arrays={
"action": "data/action",
"eef_pos": "data/robot0_eef_pos",
"joint_pos": "data/robot0_joint_pos",
"gripper_qpos": "data/robot0_gripper_qpos",
"wrist": "data/robot0_eye_in_hand_rgb",
},
row_ends="meta/episode_ends",
index_column="episode_id",
)
.to_robot_rows(
episode_id_key="episode_id",
action_key="action",
state_key=("eef_pos", "joint_pos", "gripper_qpos"),
video_keys=("wrist",),
)
)
```
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ text = [
hdf5 = [
"h5py",
]
zarr = [
"zarr>=2.18,<3",
"numcodecs<0.16",
]
s3 = [
"s3fs",
]
testing = [
"macrodata-refiner[huggingface]",
"macrodata-refiner[hdf5]",
"macrodata-refiner[robotics]",
"macrodata-refiner[zarr]",
"macrodata-refiner[text]",
"macrodata-refiner[s3]",
"pytest>=8.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/refiner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
read_lerobot,
read_parquet,
read_videos,
read_zarr,
SUPPORTED_CUDA_VERSIONS,
SUPPORTED_GPU_TYPES,
task,
Expand Down Expand Up @@ -54,6 +55,7 @@
"read_lerobot",
"read_parquet",
"read_videos",
"read_zarr",
"from_items",
"from_source",
"task",
Expand Down
12 changes: 8 additions & 4 deletions src/refiner/io/datafolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ def __init__(
auto_mkdir: if True, when opening a file in write mode its parent directories will be automatically created
**storage_options: will be passed to a new fsspec filesystem object, when it is created. Ignored if fs is given
"""
super().__init__(
path=path,
fs=fs if fs is not None else url_to_fs(path, **storage_options)[0],
)
if fs is None:
fs, path = url_to_fs(path, **storage_options)
path = "/" if path is None else path
else:
path = fs._strip_protocol(path)
if path == "":
path = "/"
super().__init__(path=path, fs=fs)
self.auto_mkdir = auto_mkdir

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions src/refiner/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
read_lerobot,
read_parquet,
read_videos,
read_zarr,
task,
)
from refiner.pipeline.resources import (
Expand Down Expand Up @@ -41,6 +42,7 @@
"read_lerobot",
"read_parquet",
"read_videos",
"read_zarr",
"from_items",
"from_source",
"task",
Expand Down
59 changes: 52 additions & 7 deletions src/refiner/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
Hdf5Reader,
JsonReader,
ParquetReader,
ZarrReader,
)
from refiner.pipeline.sources.readers.hdf5 import MissingPolicy
from refiner.pipeline.sources.readers.lerobot import LeRobotEpisodeReader
from refiner.pipeline.sources.readers.hdf5 import MissingPolicy, PathSelection
from refiner.pipeline.sources.items import ItemsSource
from refiner.pipeline.sources.task import TaskSource
from refiner.pipeline.data import datatype
Expand All @@ -59,7 +60,10 @@
)
from refiner.execution.operators.row import ShardDeltaFn
from refiner.pipeline.sources.base import SourceUnit
from refiner.pipeline.sources.readers.utils import DEFAULT_TARGET_SHARD_BYTES
from refiner.pipeline.sources.readers.utils import (
DEFAULT_TARGET_SHARD_BYTES,
PathSelection,
)
import pyarrow as pa

if TYPE_CHECKING:
Expand Down Expand Up @@ -176,7 +180,6 @@ def to_robot_rows(
video_keys: Mapping[str, str] | Iterable[str] | None = None,
stats_key: str | None = "stats",
stats_prefix: str = "stats/",
episode_ends_key: str | None = None,
) -> "RefinerPipeline":
"""Expose rows through the RoboticsRow semantic view.

Expand Down Expand Up @@ -207,11 +210,8 @@ def to_robot_rows(
schema=self.output_schema(),
stats_key=stats_key,
stats_prefix=stats_prefix,
episode_ends_key=episode_ends_key,
)
if episode_ends_key is None:
return self.map(cast(MapFn, converter))
return self.flat_map(cast(FlatMapFn, converter))
return self.map(cast(MapFn, converter))
Comment thread
guipenedo marked this conversation as resolved.

def map_async(
self,
Expand Down Expand Up @@ -809,6 +809,51 @@ def read_hdf5(
)


def read_zarr(
input: DataFolderLike,
Comment thread
guipenedo marked this conversation as resolved.
*,
arrays: PathSelection | None = None,
attrs: PathSelection | None = None,
row_ends: str | None = None,
split_leading_axis: bool = False,
leading_axis_row_size: int = 1,
target_shard_bytes: int = DEFAULT_TARGET_SHARD_BYTES,
num_shards: int | None = None,
row_batch_size: int | None = None,
index_column: str | None = "index",
file_path_column: str | None = "file_path",
dtypes: DTypeMapping | None = None,
) -> RefinerPipeline:
"""Create a pipeline with a Zarr reader source.

The reader has three modes:
- group mode: one Zarr group becomes one row
- row_ends mode: cumulative offsets define whole-row source slices
- split_leading_axis mode: fixed-size leading-axis slices define output rows

Missing selected arrays or attributes raise immediately. `row_ends` and
`split_leading_axis` are mutually exclusive. `target_shard_bytes` and
`num_shards` affect shard planning, not logical row size. `row_batch_size`
bounds how many logical rows are loaded per array block within each shard.
"""
return RefinerPipeline(
source=ZarrReader(
input,
arrays=arrays,
attrs=attrs,
row_ends=row_ends,
split_leading_axis=split_leading_axis,
leading_axis_row_size=leading_axis_row_size,
target_shard_bytes=target_shard_bytes,
num_shards=num_shards,
row_batch_size=row_batch_size,
index_column=index_column,
file_path_column=file_path_column,
dtypes=dtypes,
)
)


def read_parquet(
inputs: DataFileSetLike,
*,
Expand Down
2 changes: 2 additions & 0 deletions src/refiner/pipeline/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
JsonReader,
LeRobotEpisodeReader,
ParquetReader,
ZarrReader,
)

__all__ = [
Expand All @@ -20,4 +21,5 @@
"JsonReader",
"LeRobotEpisodeReader",
"ParquetReader",
"ZarrReader",
]
2 changes: 2 additions & 0 deletions src/refiner/pipeline/sources/readers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from refiner.pipeline.sources.readers.json import JsonReader
from refiner.pipeline.sources.readers.lerobot import LeRobotEpisodeReader
from refiner.pipeline.sources.readers.parquet import ParquetReader
from refiner.pipeline.sources.readers.zarr import ZarrReader
from refiner.robotics.lerobot_format import LeRobotRow

__all__ = [
Expand All @@ -18,4 +19,5 @@
"LeRobotEpisodeReader",
"LeRobotRow",
"ParquetReader",
"ZarrReader",
]
Loading
Loading