refactor: unify Hudi read APIs around ReadOptions and add Python bindings#587
Open
xushiyan wants to merge 12 commits intoapache:mainfrom
Open
refactor: unify Hudi read APIs around ReadOptions and add Python bindings#587xushiyan wants to merge 12 commits intoapache:mainfrom
xushiyan wants to merge 12 commits intoapache:mainfrom
Conversation
Expose streaming read APIs and remaining gap accessors to Python: - HudiReadOptions: filters, projection, batch_size, as_of_timestamp - HudiRecordBatchStream: single-use Python iterator over RecordBatch - HudiTable: read_snapshot_stream, read_file_slice_stream, base_url, get_file_slices_splits_between, compute_table_stats - HudiFileGroupReader: read_file_slice_stream, read_file_slice_from_paths_stream Also bump check-code Python from 3.9 to 3.10 (matches project's requires-python; mypy 1.20+ no longer supports 3.9) and remove a now-unused type: ignore on the pyarrow import.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #587 +/- ##
==========================================
- Coverage 82.10% 81.81% -0.29%
==========================================
Files 77 77
Lines 5213 5317 +104
==========================================
+ Hits 4280 4350 +70
- Misses 933 967 +34 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Add an optional partition filters parameter to the incremental query APIs so callers (e.g. Ray Data's Hudi datasource) can push partition pruning down on incremental reads, matching the snapshot APIs. Rust: - Table::get_file_slices_between(start, end, filters) - Table::get_file_slices_splits_between(num_splits, start, end, filters) - Table::read_incremental_records(start, end, filters) - get_file_slices_between_internal now applies a PartitionPruner. Python (filters as optional last kwarg, source-compatible): - HudiTable.get_file_slices_between(filters=...) - HudiTable.get_file_slices_splits_between(filters=...) - HudiTable.read_incremental_records(filters=...)
There was a problem hiding this comment.
Pull request overview
Adds Python bindings for streaming read APIs and additional table accessors, and extends incremental query APIs (Rust + Python) with optional partition filters to support partition pruning.
Changes:
- Expose
HudiReadOptionsandHudiRecordBatchStreamto Python and add streaming read methods onHudiTable/HudiFileGroupReader. - Add optional
filterssupport to incremental-related APIs (get_file_slices_between,get_file_slices_splits_between,read_incremental_records) and update Rust + Python tests/docs. - Bump GitHub Actions lint job Python version to 3.10 and update Python stubs/exports.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/tests/test_table_misc.py | Adds tests for new table accessors (base_url, splits-between, table stats). |
| python/tests/test_table_incremental_read.py | Adds tests validating partition-filter pruning for incremental reads and file-slice APIs. |
| python/tests/test_streaming_read.py | Adds coverage for streaming snapshot/file-slice reads and HudiReadOptions. |
| python/src/lib.rs | Registers new Python-exposed classes (HudiReadOptions, HudiRecordBatchStream). |
| python/src/internal.rs | Implements Python bindings for streaming reads, iterator wrapper, new accessors, and filter plumbing. |
| python/hudi/_internal.pyi | Adds type stubs/docstrings for the new bindings and updated signatures. |
| python/hudi/init.py | Re-exports new Python API surface (HudiReadOptions, HudiRecordBatchStream). |
| crates/core/tests/table_read_tests.rs | Updates call sites for new Rust API signatures requiring filters. |
| crates/core/src/table/mod.rs | Adds partition-filter pruning to incremental file-slice APIs and updates incremental read signature. |
| README.md | Documents incremental partition-filter usage and updates Rust examples for new signatures. |
| .github/workflows/code.yml | Bumps lint job Python from 3.9 to 3.10. |
Replace per-method filter / timestamp arguments with a single ReadOptions struct shared by every snapshot, time-travel, incremental, eager, and streaming read API. Adds start_timestamp / end_timestamp fields to ReadOptions and renames partition_filters to filters. Removed methods (semantics absorbed by ReadOptions.as_of_timestamp): - Table::get_file_slices_as_of - Table::get_file_slices_splits_as_of - Table::read_snapshot_as_of - HudiTable.get_file_slices_as_of - HudiTable.get_file_slices_splits_as_of - HudiTable.read_snapshot_as_of New shape (Rust + Python): - get_file_slices(&options) - get_file_slices_splits(num_splits, &options) - get_file_slices_between(&options) - get_file_slices_splits_between(num_splits, &options) - read_snapshot(&options) - read_incremental_records(&options) - read_snapshot_stream(&options) and read_file_slice_stream already took ReadOptions; unchanged.
…eadOptions
Two changes that together close the filter-semantics gap and complete the
ReadOptions standardization across all read APIs.
1. ReadOptions.filters now applies at row level, not just file pruning.
Previously, filters=[("non_partition_col", "=", "x")] would prune files
via stats but return all rows in surviving files unchanged. Now filters
are also evaluated as a row mask after reading each batch, so callers
get the rows they asked for. row_predicate remains the escape hatch for
anything not expressible as (field, op, value).
- Add expr::filter::filters_to_row_mask helper using existing
SchemableFilter::apply_comparison.
- Apply mask in FileGroupReader streaming and eager paths.
- Stop stripping filters in Table::read_snapshot_stream.
2. FileGroupReader eager methods now take &ReadOptions for parity with
the streaming methods and the Table API. read_file_slice,
read_file_slice_by_base_file_path, and read_file_slice_from_paths all
honor filters / row_predicate / projection.
- Internal read_base_file_eager helper avoids double-applying options
when merging base + log files.
- C++ binding passes ReadOptions::default() (no per-call options
surface yet).
- Python binding accepts optional HudiReadOptions on each method.
YAGNI cleanup: row_predicate had no production callers — only its own
tests exercised it. Now that filters apply at row level too, the
expressive cases that filters covers are no longer redundant with a
parallel closure-based predicate.
If a future engine integration needs predicates beyond what (field, op,
value) tuples express (cross-column OR, function calls, regex), the
escape hatch can be re-added with a concrete consumer to design against.
- Remove ReadOptions::row_predicate field, with_row_predicate builder,
RowPredicate type alias.
- Remove apply_row_predicate helper and its application in streaming +
eager read paths.
- Convert two tests that exercised row_predicate (boolean filter on
isActive) to use filters=[("isActive", "=", "true")] — same behavior,
through the kept API.
…ilters ReadOptions.filters has never been partition-specific — it's a list of column filters where the column can be partition or data: - filter on partition column → PartitionPruner skips partitions - filter on data column with stats → FilePruner skips files - any filter → applied as row mask after reading The PartitionFilter alias on (String, String, String) misnamed this. Drop the alias, inline the tuple type, and clarify the field docstring to reflect what filters actually do. Also fix a stale doc comment in hudi-datafusion that said PartitionFilter.
Four issues from external review (codex): 1. read_snapshot_stream ignored ReadOptions.as_of_timestamp and always used the latest commit. Streaming time-travel queries silently returned latest data instead. Now uses resolve_snapshot_timestamp, matching the eager read_snapshot path. 2. Streaming projection could disable row filters: parquet-level projection pushed before the filter mask, so filters on columns not in the projection were silently skipped (filters_to_row_mask drops filters whose column is missing from the batch). Fix: when projection is set, augment the read projection with filter columns, then project down to the user's requested columns after filtering. Eager already filtered before projecting; both paths are now consistent. 3. resolve_incremental_range silently returned None on bad timestamp strings (format_timestamp(...).ok()?), causing callers to return an empty Vec instead of a parse error. Changed return type to Result<Option<...>> so parse errors propagate; None remains the "no commits / no end_timestamp" sentinel. 4. Stale doc strings: removed reference to options.row_predicate in _internal.pyi; updated the README file-group Rust example to pass ReadOptions::new() to the now-required parameter. Tests added: - test_read_snapshot_stream_with_as_of_timestamp: cross-validates streaming time-travel against eager read_snapshot to prove as_of_timestamp is honored. - test_read_snapshot_stream_filter_column_not_in_projection: projects ["id"] but filters on "isActive"; verifies rows are dropped.
Codex: - Validate filter field names against table+partition schemas upfront (previously a typo on a filter column silently no-oped via the filters_to_row_mask "skip missing column" branch). New helper validate_filter_fields, called from get_file_slices_internal and get_file_slices_between_internal. Test: test_filter_on_unknown_column_errors covers snapshot and incremental. - Update stale doc references in README, _internal.pyi, and read_options.rs that still called filters "partition filters" or projection "streaming only". Copilot: - Short-circuit pruner construction in get_file_slices_between_internal when filters is empty (skip schema fetch + per-file-group should_include). - Skip partition columns when widening read projection in read_base_file_stream. With hoodie.datasource.write.drop.partition.columns, partition columns aren't in parquet at all; widening would cause InvalidColumn. Partition pruning already filtered files upstream. - Update with_filters doc, HudiReadOptions docstring, README API table to describe filters as general column filters and projection as applicable to both eager and streaming reads.
Three issues from external review: 1. Streaming dropped row filters on timestamp-keygen partition fields. read_base_file_stream excluded every column listed in `hoodie.table.partition.fields` from the widened parquet projection. For timestamp-based keygen the configured partition field is also the source data column (e.g. `ts_epoch`); excluding it meant a filter on that field was silently skipped by `filters_to_row_mask` because the column was missing from the read batch. Fix: only exclude partition columns from widening when `hoodie.datasource.write.drop.partition.columns` is enabled — the case where the column genuinely isn't in parquet. 2. FileGroupReader-direct reads silently ignored unknown filter fields. Public file-group reads (read_file_slice_by_base_file_path, read_file_slice, read_file_slice_from_paths and their streaming counterparts) bypassed the table-level validation added previously. A typoed filter became an all-true mask. Fix: `validate_filter_fields_against_batch` runs in apply_eager_options and once on the first batch in read_base_file_stream. Filters targeting columns that are neither in the batch schema nor in the configured partition columns now error. 3. Table::read_file_slice_stream did not normalize as_of_timestamp. It used `options.as_of_timestamp` directly, where read_snapshot and read_snapshot_stream both run `format_timestamp` via `resolve_snapshot_timestamp`. ISO/epoch values worked for the eager path but could fall through to file-group commit/log filtering as raw strings. Fix: thread through `resolve_snapshot_timestamp`, matching the rest of the snapshot family. Tests: - test_read_file_slice_stream_normalizes_as_of_timestamp: cross-validates streaming time-travel against eager read_snapshot row counts. - test_file_group_reader_filter_on_unknown_column_errors: covers both eager and streaming FileGroupReader paths.
1. FileGroupReader filter validation is now strict.
Previously `validate_filter_fields_against_batch` allowed any
configured partition column even when absent from the read batch.
That's correct for table-level callers (where partition pruning has
already used the filter) but wrong for direct FGR callers — a
partition filter on a table with `drop.partition.columns=true` would
silently no-op because the column isn't in parquet.
Now FGR validates filters strictly against the read batch schema.
Direct callers get a schema error instead of a silent no-op.
Table-level paths absorb the now-required filtering: a new
`Table::options_for_file_group` helper strips filters whose target
is a dropped partition column when constructing options for FGR.
Pruning has already used those filters at the Table level, so
stripping them before FGR is correct.
2. reader-spec.md updated:
- The API matrix splits Table::read_file_slice_stream from the
FileGroupReader streaming methods. Table-level honors
`as_of_timestamp` (configures FileGroupEndTimestamp via
resolve_snapshot_timestamp); FGR-direct still ignores it.
- The filter-validation section reflects the strict FGR behavior
and the Table-level strip pattern.
ReadOptions now derives Clone (was Default+Debug only) so the strip
helper can construct a per-call modified copy.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Unifies every Hudi read API — snapshot, time-travel, incremental, eager, and streaming — around a single `ReadOptions` struct, and adds full Python bindings for the streaming and incremental paths.
Single shape across all reads (Rust + Python):
`ReadOptions` fields: `filters`, `projection`, `batch_size`, `as_of_timestamp`, `start_timestamp`, `end_timestamp`. `filters` are general column filters (any field) and drive partition pruning, file-level stats pruning, and row-level filtering.
Removed (subsumed by `as_of_timestamp`): `get_file_slices_as_of`, `get_file_slices_splits_as_of`, `read_snapshot_as_of` (Rust + Python). Also removed the unused `row_predicate` field/builder/type alias and the misleading `PartitionFilter` type alias.
New Python types: `HudiReadOptions`, `HudiRecordBatchStream` (single-use iterator over PyArrow `RecordBatch`).
Other adds: `HudiTable.base_url`, `compute_table_stats()`. Drive-bys: bump CI lint job's Python from 3.9 to 3.10 (required by mypy 1.20+); remove a now-unused `# type: ignore` on the pyarrow import.
How are the changes test-covered