Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
75f66cf
Add Zarr reader and writer
guipenedo May 21, 2026
a6d303d
Keep Zarr PR reader-only
guipenedo May 23, 2026
fb1161a
Add Zarr writer
guipenedo May 23, 2026
4c12e60
Stream Zarr writer outputs
guipenedo May 23, 2026
cf2e0cc
Harden Zarr writer outputs
guipenedo May 23, 2026
e25d100
Batch and clean Zarr writer stores
guipenedo May 23, 2026
917b496
Stabilize default Zarr writer arrays
guipenedo May 23, 2026
0457fc1
Validate Zarr writer schemas
guipenedo May 23, 2026
7836a3f
Stream Zarr writer appends
guipenedo May 23, 2026
c03209c
Document Zarr writer
guipenedo May 23, 2026
e5226fc
Stream video arrays in Zarr writer
guipenedo May 23, 2026
75d8a52
Harden Zarr writer streaming
guipenedo May 23, 2026
db2c453
Optimize Zarr writer merge IO
guipenedo May 24, 2026
d4ade58
Expose Zarr reducer batch sizing
guipenedo May 24, 2026
6f17c04
Harden Zarr writer reduction
guipenedo May 24, 2026
7d5bb18
Harden Zarr writer retry semantics
guipenedo May 24, 2026
71431cd
Tighten Zarr no-overwrite checks
guipenedo May 24, 2026
a3fa167
Fix Zarr reducer edge cases
guipenedo May 24, 2026
942392b
Guard Zarr reducer reserved paths
guipenedo May 24, 2026
8a98fb4
Harden Zarr writer retries
guipenedo May 24, 2026
1124099
Close Zarr writer retry gaps
guipenedo May 24, 2026
45cf8e9
Harden Zarr overwrite cleanup
guipenedo May 24, 2026
d58b5a5
Default Zarr writes to single store
guipenedo May 24, 2026
a745da4
Inline Zarr IO helpers
guipenedo May 24, 2026
af4fcfe
Remove Zarr overwrite mode
guipenedo May 24, 2026
f36cc86
Simplify Zarr write cleanup
guipenedo May 24, 2026
eba19e0
Remove recursive cleanup mode
guipenedo May 24, 2026
0114d8f
Simplify file cleanup reducer
guipenedo May 24, 2026
f28e3f7
Prune file cleanup listing
guipenedo May 24, 2026
8476546
Split Zarr reducer sinks
guipenedo May 24, 2026
d80c8bf
Unify Zarr reducer sink
guipenedo May 24, 2026
a84e93b
Simplify reducer cleanup code
guipenedo May 24, 2026
47f121e
Tighten Zarr reducer implementation
guipenedo May 24, 2026
538800d
Simplify Zarr writer batching
guipenedo May 24, 2026
2e79710
Consolidate Zarr writer validation
guipenedo May 24, 2026
e15c7d3
Align Zarr reducer default
guipenedo May 24, 2026
5c06898
Preserve Zarr writer attrs
guipenedo May 25, 2026
f7981f2
Simplify Zarr video writes
guipenedo May 25, 2026
11f9851
Address Zarr writer review comments
guipenedo May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/reading-and-writing.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ Built-in sinks:
| `.write_jsonl(output, ...)` | JSON Lines files | one output file per worker/shard according to the filename template |
| `.write_parquet(output, ...)` | Parquet files | columnar output with optional compression |
| `.write_lerobot(output, ...)` | LeRobot-compatible robotics datasets | materializes frame/video assets and dataset metadata |
| `.write_zarr(output, ...)` | Zarr stores | one store per shard/worker according to the store template |

Example:

Expand Down Expand Up @@ -488,6 +489,43 @@ pipeline = pipeline.map(
pipeline = pipeline.cast(video=mdr.datatype.video_path())
```

Use `write_zarr(...)` when you want chunked array output, usually for robotics
episode rows or replay-buffer style data:

```python
import refiner as mdr

(
mdr.read_lerobot("hf://datasets/user/robot-data")
.write_zarr(
"s3://my-bucket/robot-data-zarr/",
arrays={
"data/action": "action",
"data/state": "observation.state",
},
)
)
```

The `arrays` mapping is from output Zarr path to source row key. For
`RoboticsRow` inputs, omitting `arrays` writes the available default robotics
arrays: actions, states, and timestamps. The default schema is inferred once and
later rows must expose the same fields. Video sources selected through `arrays`
are decoded as RGB frame arrays and appended in bounded batches controlled by
`video_frame_batch_size`. `array_chunk_bytes` controls the target chunk size for
new arrays and the reducer read/write batch size when shard-local stores are
merged into a final store.

By default, `write_zarr(...)` also writes cumulative episode boundaries to
`meta/episode_ends`. Set `episode_ends_path=None` to omit them.

Launched runs write isolated stores per shard/worker using
`store_template="{shard_id}__w{worker_id}.zarr"`. This avoids concurrent workers
mutating the same Zarr group. By default, a reducer stage streams those
shard-local stores into one final Zarr group at the requested output path. Set
`reduce_to_single_store=False` to keep the isolated stores and read them
individually.

When you run a writer through `launch_local(...)` or `launch_cloud(...)`, some
sinks add a reducer stage after the main writer stage. For `write_jsonl(...)`
and `write_parquet(...)`, that reducer removes stale shard/worker files and
Expand Down
49 changes: 48 additions & 1 deletion src/refiner/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
VectorizedSegmentStep,
WithColumnsStep,
)
from refiner.pipeline.sinks import BaseSink, JsonlSink, ParquetSink
from refiner.pipeline.sinks import BaseSink, JsonlSink, ParquetSink, ZarrSink
from refiner.pipeline.sinks.assets import MissingAssetPolicy
from refiner.pipeline.sources import (
BaseSource,
Expand Down Expand Up @@ -431,6 +431,53 @@ def write_parquet(
)
)

def write_zarr(
self,
output: DataFolderLike,
*,
arrays: Mapping[str, str] | None = None,
attrs: Mapping[str, str] | None = None,
episode_ends_path: str | None = "meta/episode_ends",
store_template: str = "{shard_id}__w{worker_id}.zarr",
video_frame_batch_size: int = 8,
array_chunk_bytes: int = 8 * 1024 * 1024,
reduce_to_single_store: bool = True,
) -> "RefinerPipeline":
"""Write rows to Zarr array stores.

Args:
output: Output folder or URL prefix for the Zarr store(s).
arrays: Mapping from output Zarr array path to source row key. If
omitted for ``RoboticsRow`` inputs, writes the available default
robotics arrays: actions, states, and timestamps.
attrs: Mapping from output Zarr root attribute name to source row key.
Attribute values must be stable across rows in each output store.
episode_ends_path: Output Zarr path for cumulative row/episode end
offsets. Set to None to omit episode boundaries.
store_template: Per-shard store path template. Must include
``{shard_id}`` and ``{worker_id}``.
video_frame_batch_size: Maximum decoded video frames to append per
video write batch.
array_chunk_bytes: Target byte size for chunks created for newly
written arrays and for read/write batches when reducing shard
stores into a single store.
reduce_to_single_store: If True, add a reducer stage that merges
shard-local stores into one Zarr group at ``output``. Defaults
to True.
"""
return self.with_sink(
ZarrSink(
output=output,
arrays=arrays,
attrs=attrs,
episode_ends_path=episode_ends_path,
store_template=store_template,
video_frame_batch_size=video_frame_batch_size,
array_chunk_bytes=array_chunk_bytes,
reduce_to_single_store=reduce_to_single_store,
)
)

def __iter__(self) -> Iterator[Row]:
return iter(self.iter_rows())

Expand Down
2 changes: 2 additions & 0 deletions src/refiner/pipeline/sinks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from refiner.pipeline.sinks.jsonl import JsonlSink
from refiner.pipeline.sinks.parquet import ParquetSink
from refiner.pipeline.sinks.reducer import FileCleanupReducerSink, LeRobotMetaReduceSink
from refiner.pipeline.sinks.zarr import ZarrSink

__all__ = [
"BaseSink",
Expand All @@ -10,4 +11,5 @@
"JsonlSink",
"LeRobotMetaReduceSink",
"ParquetSink",
"ZarrSink",
]
4 changes: 4 additions & 0 deletions src/refiner/pipeline/sinks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def on_shard_complete(self, shard_id: str) -> None:
"""
del shard_id

def on_shard_finalized(self, shard_id: str) -> None:
"""Run cleanup after the shard has been marked complete."""
del shard_id

def close(self) -> None:
"""Finalize sink resources after all shard work is complete.

Expand Down
2 changes: 2 additions & 0 deletions src/refiner/pipeline/sinks/reducer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from refiner.pipeline.sinks.reducer.file import FileCleanupReducerSink
from refiner.pipeline.sinks.reducer.lerobot import LeRobotMetaReduceSink
from refiner.pipeline.sinks.reducer.zarr import ZarrReducerSink

__all__ = [
"FileCleanupReducerSink",
"LeRobotMetaReduceSink",
"ZarrReducerSink",
]
145 changes: 80 additions & 65 deletions src/refiner/pipeline/sinks/reducer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,36 @@
_DEFAULT_FIELD_PATTERN = r"[^/]+"


def _compile_managed_path_pattern(filename_template: str) -> re.Pattern[str]:
parts: list[str] = []
def _compile_output_path_patterns(filename_template: str) -> list[re.Pattern[str]]:
path_parts: list[str] = []
patterns: list[re.Pattern[str]] = []
seen_fields: set[str] = set()

for literal_text, field_name, format_spec, conversion in Formatter().parse(
filename_template
):
parts.append(re.escape(literal_text))
if field_name is None:
continue
if conversion is not None or format_spec:
raise ValueError(
"filename_template reducer matching only supports plain "
"named fields without conversion or format specifiers"
)
if not field_name.isidentifier():
raise ValueError(
"filename_template reducer matching only supports plain named fields"
)
if field_name in seen_fields:
# Repeated fields in the template must resolve to the same path segment.
parts.append(f"(?P={field_name})")
continue
pattern = _FIELD_PATTERNS.get(field_name, _DEFAULT_FIELD_PATTERN)
parts.append(f"(?P<{field_name}>{pattern})")
seen_fields.add(field_name)
for segment in (part for part in filename_template.split("/") if part):
segment_parts: list[str] = []
for literal_text, field_name, format_spec, conversion in Formatter().parse(
segment
):
segment_parts.append(re.escape(literal_text))
if field_name is None:
continue
if conversion is not None or format_spec:
raise ValueError(
"filename_template reducer matching only supports plain "
"named fields without conversion or format specifiers"
)
if not field_name.isidentifier():
raise ValueError(
"filename_template reducer matching only supports plain named fields"
)
if field_name in seen_fields:
segment_parts.append(f"(?P={field_name})")
continue
pattern = _FIELD_PATTERNS.get(field_name, _DEFAULT_FIELD_PATTERN)
segment_parts.append(f"(?P<{field_name}>{pattern})")
seen_fields.add(field_name)
path_parts.append("".join(segment_parts))
patterns.append(re.compile("^" + "/".join(path_parts) + "$"))

missing_fields = sorted(_REQUIRED_TEMPLATE_FIELDS.difference(seen_fields))
if missing_fields:
Expand All @@ -54,7 +58,7 @@ def _compile_managed_path_pattern(filename_template: str) -> re.Pattern[str]:
+ ", ".join(f"{{{field_name}}}" for field_name in missing_fields)
)

return re.compile("^" + "".join(parts) + "$")
return patterns


class FileCleanupReducerSink(BaseSink):
Expand All @@ -72,7 +76,7 @@ def __init__(
self.filename_template = filename_template
self.reducer_name = reducer_name
self.assets_subdir = assets_subdir
self._managed_path_pattern = _compile_managed_path_pattern(filename_template)
self._output_path_patterns = _compile_output_path_patterns(filename_template)
self._cleanup_ran = False

def write_shard_block(self, shard_id, block) -> None:
Expand Down Expand Up @@ -108,56 +112,67 @@ def _run_cleanup(self) -> None:
)

keep_pairs = {
(
row.shard_id,
row.worker_token,
)
(row.shard_id, row.worker_token)
for row in get_finalized_workers(stage_index=stage_index - 1)
}

try:
listed_paths = self.output.find("")
except FileNotFoundError:
listed_paths = []

assets_prefix = (
f"{self.assets_subdir.rstrip('/')}/"
if self.assets_subdir is not None
else None
literal_prefix = ""
for literal_text, field_name, _format_spec, _conversion in Formatter().parse(
self.filename_template
):
literal_prefix += literal_text
if field_name is not None:
break
listing_prefix = (
"" if "/" not in literal_prefix else literal_prefix.rsplit("/", 1)[0]
)
paths = [listing_prefix]
prefix_parts = [part for part in listing_prefix.split("/") if part]
for pattern in self._output_path_patterns[len(prefix_parts) :]:
next_paths: list[str] = []
for path in paths:
try:
next_paths.extend(
item
for item in self.output.ls(path, detail=False)
if pattern.fullmatch(item)
)
except (FileNotFoundError, NotADirectoryError):
continue
paths = next_paths

removed_asset_attempts: set[str] = set()
# Extra template fields are treated as structure only. Authority is decided
# solely from the finalized (shard_id, worker_id) pair extracted from each
# managed path.
for rel_path in listed_paths:
if not isinstance(rel_path, str) or not rel_path or rel_path == ".":
paths_to_delete: set[str] = set()
# Extra template fields are structure only. Authority is decided from
# the finalized (shard_id, worker_id) pair extracted from the path.
for rel_path in paths:
match = self._output_path_patterns[-1].fullmatch(rel_path)
if match is None:
continue
if assets_prefix is not None and (
rel_path == self.assets_subdir or rel_path.startswith(assets_prefix)
):
attempt_dir = rel_path[len(assets_prefix) :].split("/", maxsplit=1)[0]
if (match.group("shard_id"), match.group("worker_id")) not in keep_pairs:
paths_to_delete.add(rel_path)

if self.assets_subdir is not None:
asset_prefix = f"{self.assets_subdir.rstrip('/')}/"
try:
asset_paths = self.output.find(self.assets_subdir)
except FileNotFoundError:
asset_paths = []
for rel_path in asset_paths:
if not rel_path.startswith(asset_prefix):
continue
attempt_dir = rel_path[len(asset_prefix) :].split("/", maxsplit=1)[0]
match = ASSET_ATTEMPT_DIR_RE.fullmatch(attempt_dir)
if match is None:
continue
if (match.group("shard_id"), match.group("worker_id")) in keep_pairs:
continue
if attempt_dir in removed_asset_attempts:
continue
removed_asset_attempts.add(attempt_dir)
try:
self.output.rm(f"{assets_prefix}{attempt_dir}", recursive=True)
except FileNotFoundError:
continue
continue
if (
match.group("shard_id"),
match.group("worker_id"),
) not in keep_pairs:
paths_to_delete.add(f"{asset_prefix}{attempt_dir}")

match = self._managed_path_pattern.fullmatch(rel_path)
if match is None:
continue
if (match.group("shard_id"), match.group("worker_id")) in keep_pairs:
continue
for path in sorted(paths_to_delete):
try:
self.output.rm(rel_path)
self.output.rm(path, recursive=True)
except FileNotFoundError:
continue

Expand Down
Loading
Loading