Skip to content

refactor: unify Hudi read APIs around ReadOptions and add Python bindings#587

Open
xushiyan wants to merge 12 commits intoapache:mainfrom
xushiyan:xushiyan/py-streaming-apis
Open

refactor: unify Hudi read APIs around ReadOptions and add Python bindings#587
xushiyan wants to merge 12 commits intoapache:mainfrom
xushiyan:xushiyan/py-streaming-apis

Conversation

@xushiyan
Copy link
Copy Markdown
Member

@xushiyan xushiyan commented Apr 30, 2026

Description

Unifies every Hudi read API — snapshot, time-travel, incremental, eager, and streaming — around a single `ReadOptions` struct, and adds full Python bindings for the streaming and incremental paths.

Single shape across all reads (Rust + Python):

  • `get_file_slices(options)` / `get_file_slices_splits(num_splits, options)` — snapshot & time-travel via `as_of_timestamp`
  • `get_file_slices_between(options)` / `get_file_slices_splits_between(num_splits, options)` — incremental via `start_timestamp` / `end_timestamp`
  • `read_snapshot(options)` / `read_incremental_records(options)` — eager records
  • `read_snapshot_stream(options)` / `read_file_slice_stream(slice, options)` — streaming records
  • `FileGroupReader::read_file_slice*(slice, options)` — eager + streaming reads on the file-group reader

`ReadOptions` fields: `filters`, `projection`, `batch_size`, `as_of_timestamp`, `start_timestamp`, `end_timestamp`. `filters` are general column filters (any field) and drive partition pruning, file-level stats pruning, and row-level filtering.

Removed (subsumed by `as_of_timestamp`): `get_file_slices_as_of`, `get_file_slices_splits_as_of`, `read_snapshot_as_of` (Rust + Python). Also removed the unused `row_predicate` field/builder/type alias and the misleading `PartitionFilter` type alias.

New Python types: `HudiReadOptions`, `HudiRecordBatchStream` (single-use iterator over PyArrow `RecordBatch`).

Other adds: `HudiTable.base_url`, `compute_table_stats()`. Drive-bys: bump CI lint job's Python from 3.9 to 3.10 (required by mypy 1.20+); remove a now-unused `# type: ignore` on the pyarrow import.

How are the changes test-covered

  • Automated tests (unit and/or integration tests)

Expose streaming read APIs and remaining gap accessors to Python:

- HudiReadOptions: filters, projection, batch_size, as_of_timestamp
- HudiRecordBatchStream: single-use Python iterator over RecordBatch
- HudiTable: read_snapshot_stream, read_file_slice_stream, base_url,
  get_file_slices_splits_between, compute_table_stats
- HudiFileGroupReader: read_file_slice_stream,
  read_file_slice_from_paths_stream

Also bump check-code Python from 3.9 to 3.10 (matches project's
requires-python; mypy 1.20+ no longer supports 3.9) and remove a
now-unused type: ignore on the pyarrow import.
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

❌ Patch coverage is 69.19643% with 69 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.81%. Comparing base (cf81439) to head (2ae3c52).

Files with missing lines Patch % Lines
crates/core/src/table/mod.rs 67.30% 34 Missing ⚠️
crates/core/src/file_group/reader.rs 71.26% 25 Missing ⚠️
python/src/internal.rs 0.00% 10 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #587      +/-   ##
==========================================
- Coverage   82.10%   81.81%   -0.29%     
==========================================
  Files          77       77              
  Lines        5213     5317     +104     
==========================================
+ Hits         4280     4350      +70     
- Misses        933      967      +34     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Add an optional partition filters parameter to the incremental query
APIs so callers (e.g. Ray Data's Hudi datasource) can push partition
pruning down on incremental reads, matching the snapshot APIs.

Rust:
- Table::get_file_slices_between(start, end, filters)
- Table::get_file_slices_splits_between(num_splits, start, end, filters)
- Table::read_incremental_records(start, end, filters)
- get_file_slices_between_internal now applies a PartitionPruner.

Python (filters as optional last kwarg, source-compatible):
- HudiTable.get_file_slices_between(filters=...)
- HudiTable.get_file_slices_splits_between(filters=...)
- HudiTable.read_incremental_records(filters=...)
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Python bindings for streaming read APIs and additional table accessors, and extends incremental query APIs (Rust + Python) with optional partition filters to support partition pruning.

Changes:

  • Expose HudiReadOptions and HudiRecordBatchStream to Python and add streaming read methods on HudiTable / HudiFileGroupReader.
  • Add optional filters support to incremental-related APIs (get_file_slices_between, get_file_slices_splits_between, read_incremental_records) and update Rust + Python tests/docs.
  • Bump GitHub Actions lint job Python version to 3.10 and update Python stubs/exports.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
python/tests/test_table_misc.py Adds tests for new table accessors (base_url, splits-between, table stats).
python/tests/test_table_incremental_read.py Adds tests validating partition-filter pruning for incremental reads and file-slice APIs.
python/tests/test_streaming_read.py Adds coverage for streaming snapshot/file-slice reads and HudiReadOptions.
python/src/lib.rs Registers new Python-exposed classes (HudiReadOptions, HudiRecordBatchStream).
python/src/internal.rs Implements Python bindings for streaming reads, iterator wrapper, new accessors, and filter plumbing.
python/hudi/_internal.pyi Adds type stubs/docstrings for the new bindings and updated signatures.
python/hudi/init.py Re-exports new Python API surface (HudiReadOptions, HudiRecordBatchStream).
crates/core/tests/table_read_tests.rs Updates call sites for new Rust API signatures requiring filters.
crates/core/src/table/mod.rs Adds partition-filter pruning to incremental file-slice APIs and updates incremental read signature.
README.md Documents incremental partition-filter usage and updates Rust examples for new signatures.
.github/workflows/code.yml Bumps lint job Python from 3.9 to 3.10.

Comment thread crates/core/src/table/mod.rs Outdated
Comment thread README.md Outdated
Replace per-method filter / timestamp arguments with a single
ReadOptions struct shared by every snapshot, time-travel, incremental,
eager, and streaming read API. Adds start_timestamp / end_timestamp
fields to ReadOptions and renames partition_filters to filters.

Removed methods (semantics absorbed by ReadOptions.as_of_timestamp):
- Table::get_file_slices_as_of
- Table::get_file_slices_splits_as_of
- Table::read_snapshot_as_of
- HudiTable.get_file_slices_as_of
- HudiTable.get_file_slices_splits_as_of
- HudiTable.read_snapshot_as_of

New shape (Rust + Python):
- get_file_slices(&options)
- get_file_slices_splits(num_splits, &options)
- get_file_slices_between(&options)
- get_file_slices_splits_between(num_splits, &options)
- read_snapshot(&options)
- read_incremental_records(&options)
- read_snapshot_stream(&options) and read_file_slice_stream already
  took ReadOptions; unchanged.
@xushiyan xushiyan changed the title feat: add python bindings for streaming reads and table accessors refactor: unify Hudi read APIs around ReadOptions and add Python bindings Apr 30, 2026
…eadOptions

Two changes that together close the filter-semantics gap and complete the
ReadOptions standardization across all read APIs.

1. ReadOptions.filters now applies at row level, not just file pruning.
   Previously, filters=[("non_partition_col", "=", "x")] would prune files
   via stats but return all rows in surviving files unchanged. Now filters
   are also evaluated as a row mask after reading each batch, so callers
   get the rows they asked for. row_predicate remains the escape hatch for
   anything not expressible as (field, op, value).

   - Add expr::filter::filters_to_row_mask helper using existing
     SchemableFilter::apply_comparison.
   - Apply mask in FileGroupReader streaming and eager paths.
   - Stop stripping filters in Table::read_snapshot_stream.

2. FileGroupReader eager methods now take &ReadOptions for parity with
   the streaming methods and the Table API. read_file_slice,
   read_file_slice_by_base_file_path, and read_file_slice_from_paths all
   honor filters / row_predicate / projection.

   - Internal read_base_file_eager helper avoids double-applying options
     when merging base + log files.
   - C++ binding passes ReadOptions::default() (no per-call options
     surface yet).
   - Python binding accepts optional HudiReadOptions on each method.
YAGNI cleanup: row_predicate had no production callers — only its own
tests exercised it. Now that filters apply at row level too, the
expressive cases that filters covers are no longer redundant with a
parallel closure-based predicate.

If a future engine integration needs predicates beyond what (field, op,
value) tuples express (cross-column OR, function calls, regex), the
escape hatch can be re-added with a concrete consumer to design against.

- Remove ReadOptions::row_predicate field, with_row_predicate builder,
  RowPredicate type alias.
- Remove apply_row_predicate helper and its application in streaming +
  eager read paths.
- Convert two tests that exercised row_predicate (boolean filter on
  isActive) to use filters=[("isActive", "=", "true")] — same behavior,
  through the kept API.
…ilters

ReadOptions.filters has never been partition-specific — it's a list of
column filters where the column can be partition or data:
- filter on partition column → PartitionPruner skips partitions
- filter on data column with stats → FilePruner skips files
- any filter → applied as row mask after reading

The PartitionFilter alias on (String, String, String) misnamed this.
Drop the alias, inline the tuple type, and clarify the field docstring
to reflect what filters actually do.

Also fix a stale doc comment in hudi-datafusion that said PartitionFilter.
Four issues from external review (codex):

1. read_snapshot_stream ignored ReadOptions.as_of_timestamp and always
   used the latest commit. Streaming time-travel queries silently
   returned latest data instead. Now uses resolve_snapshot_timestamp,
   matching the eager read_snapshot path.

2. Streaming projection could disable row filters: parquet-level
   projection pushed before the filter mask, so filters on columns not
   in the projection were silently skipped (filters_to_row_mask drops
   filters whose column is missing from the batch). Fix: when projection
   is set, augment the read projection with filter columns, then project
   down to the user's requested columns after filtering. Eager already
   filtered before projecting; both paths are now consistent.

3. resolve_incremental_range silently returned None on bad timestamp
   strings (format_timestamp(...).ok()?), causing callers to return an
   empty Vec instead of a parse error. Changed return type to
   Result<Option<...>> so parse errors propagate; None remains the
   "no commits / no end_timestamp" sentinel.

4. Stale doc strings: removed reference to options.row_predicate in
   _internal.pyi; updated the README file-group Rust example to pass
   ReadOptions::new() to the now-required parameter.

Tests added:
- test_read_snapshot_stream_with_as_of_timestamp: cross-validates
  streaming time-travel against eager read_snapshot to prove
  as_of_timestamp is honored.
- test_read_snapshot_stream_filter_column_not_in_projection: projects
  ["id"] but filters on "isActive"; verifies rows are dropped.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 5 comments.

Comment thread crates/core/src/table/read_options.rs
Comment thread README.md
Comment thread python/hudi/_internal.pyi Outdated
Comment thread crates/core/src/file_group/reader.rs Outdated
Comment thread crates/core/src/table/read_options.rs Outdated
Codex:
- Validate filter field names against table+partition schemas upfront
  (previously a typo on a filter column silently no-oped via the
  filters_to_row_mask "skip missing column" branch). New helper
  validate_filter_fields, called from get_file_slices_internal and
  get_file_slices_between_internal. Test:
  test_filter_on_unknown_column_errors covers snapshot and incremental.
- Update stale doc references in README, _internal.pyi, and read_options.rs
  that still called filters "partition filters" or projection
  "streaming only".

Copilot:
- Short-circuit pruner construction in get_file_slices_between_internal
  when filters is empty (skip schema fetch + per-file-group should_include).
- Skip partition columns when widening read projection in
  read_base_file_stream. With hoodie.datasource.write.drop.partition.columns,
  partition columns aren't in parquet at all; widening would cause
  InvalidColumn. Partition pruning already filtered files upstream.
- Update with_filters doc, HudiReadOptions docstring, README API table
  to describe filters as general column filters and projection as
  applicable to both eager and streaming reads.
Three issues from external review:

1. Streaming dropped row filters on timestamp-keygen partition fields.
   read_base_file_stream excluded every column listed in
   `hoodie.table.partition.fields` from the widened parquet projection.
   For timestamp-based keygen the configured partition field is also the
   source data column (e.g. `ts_epoch`); excluding it meant a filter on
   that field was silently skipped by `filters_to_row_mask` because the
   column was missing from the read batch. Fix: only exclude partition
   columns from widening when `hoodie.datasource.write.drop.partition.columns`
   is enabled — the case where the column genuinely isn't in parquet.

2. FileGroupReader-direct reads silently ignored unknown filter fields.
   Public file-group reads (read_file_slice_by_base_file_path,
   read_file_slice, read_file_slice_from_paths and their streaming
   counterparts) bypassed the table-level validation added previously.
   A typoed filter became an all-true mask. Fix:
   `validate_filter_fields_against_batch` runs in apply_eager_options
   and once on the first batch in read_base_file_stream. Filters
   targeting columns that are neither in the batch schema nor in the
   configured partition columns now error.

3. Table::read_file_slice_stream did not normalize as_of_timestamp.
   It used `options.as_of_timestamp` directly, where read_snapshot and
   read_snapshot_stream both run `format_timestamp` via
   `resolve_snapshot_timestamp`. ISO/epoch values worked for the eager
   path but could fall through to file-group commit/log filtering as
   raw strings. Fix: thread through `resolve_snapshot_timestamp`,
   matching the rest of the snapshot family.

Tests:
- test_read_file_slice_stream_normalizes_as_of_timestamp: cross-validates
  streaming time-travel against eager read_snapshot row counts.
- test_file_group_reader_filter_on_unknown_column_errors: covers both
  eager and streaming FileGroupReader paths.
1. FileGroupReader filter validation is now strict.

   Previously `validate_filter_fields_against_batch` allowed any
   configured partition column even when absent from the read batch.
   That's correct for table-level callers (where partition pruning has
   already used the filter) but wrong for direct FGR callers — a
   partition filter on a table with `drop.partition.columns=true` would
   silently no-op because the column isn't in parquet.

   Now FGR validates filters strictly against the read batch schema.
   Direct callers get a schema error instead of a silent no-op.

   Table-level paths absorb the now-required filtering: a new
   `Table::options_for_file_group` helper strips filters whose target
   is a dropped partition column when constructing options for FGR.
   Pruning has already used those filters at the Table level, so
   stripping them before FGR is correct.

2. reader-spec.md updated:
   - The API matrix splits Table::read_file_slice_stream from the
     FileGroupReader streaming methods. Table-level honors
     `as_of_timestamp` (configures FileGroupEndTimestamp via
     resolve_snapshot_timestamp); FGR-direct still ignores it.
   - The filter-validation section reflects the strict FGR behavior
     and the Table-level strip pattern.

ReadOptions now derives Clone (was Default+Debug only) so the strip
helper can construct a per-call modified copy.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants