diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index e7d6bf606..090da9122 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -164,14 +164,15 @@ void FileReaderWrapper::AdvanceToNextRowGroup() { current_row_group_idx_++; // Skip row groups excluded by read range. while (current_row_group_idx_ < target_row_groups_.size() && - target_row_groups_[current_row_group_idx_].excluded_by_read_range) { + target_row_groups_[current_row_group_idx_].IsExcludedByReadRange()) { current_row_group_idx_++; } if (current_row_group_idx_ >= target_row_groups_.size()) { next_row_to_read_ = num_rows_; } else { next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[current_row_group_idx_].row_group_index].first; + all_row_group_ranges_[target_row_groups_[current_row_group_idx_].GetRowGroupIndex()] + .first; } } @@ -181,10 +182,10 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { filtered_global_offset_ = 0; for (uint64_t i = 0; i < target_row_groups_.size(); i++) { - if (target_row_groups_[i].excluded_by_read_range) { + if (target_row_groups_[i].IsExcludedByReadRange()) { continue; } - int32_t rg_id = target_row_groups_[i].row_group_index; + int32_t rg_id = target_row_groups_[i].GetRowGroupIndex(); uint64_t rg_start = all_row_group_ranges_[rg_id].first; uint64_t rg_end = all_row_group_ranges_[rg_id].second; if (row_number > rg_start && row_number < rg_end) { @@ -200,9 +201,9 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { // Rebuild batch_reader_ for non-page-filtered RGs at/after seek position. std::vector fully_matched_indices; for (uint64_t j = i; j < target_row_groups_.size(); j++) { - if (!target_row_groups_[j].excluded_by_read_range && - !target_row_groups_[j].is_partially_matched) { - fully_matched_indices.push_back(target_row_groups_[j].row_group_index); + if (!target_row_groups_[j].IsExcludedByReadRange() && + !target_row_groups_[j].IsPartiallyMatched()) { + fully_matched_indices.push_back(target_row_groups_[j].GetRowGroupIndex()); } } if (!fully_matched_indices.empty()) { @@ -222,7 +223,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { } Result> FileReaderWrapper::NextPageFiltered() { - int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex(); // Construct the per-RG streaming reader on demand. if (!current_page_filtered_reader_) { @@ -237,7 +238,7 @@ Result> FileReaderWrapper::NextPageFiltered( file_reader_->parquet_reader(), target_rg, target_column_indices_, page_filtered_read_schema_, file_reader_->properties().cache_options(), pre_buffered, page_ranges, max_chunksize, pool_)); - current_filtered_row_ranges_ = target_rg.row_ranges; + current_filtered_row_ranges_ = target_rg.GetRowRanges(); current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; filtered_global_offset_ = 0; } @@ -273,7 +274,7 @@ Result> FileReaderWrapper::NextFullyMatched( return std::shared_ptr(); } - int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex(); uint64_t rg_end = all_row_group_ranges_[rg_id].second; int64_t num_rows = record_batch->num_rows(); @@ -298,7 +299,7 @@ Result> FileReaderWrapper::Next() { while (current_row_group_idx_ < target_row_groups_.size()) { bool is_partially_matched = - target_row_groups_[current_row_group_idx_].is_partially_matched; + target_row_groups_[current_row_group_idx_].IsPartiallyMatched(); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, is_partially_matched ? NextPageFiltered() : NextFullyMatched()); if (batch) { @@ -368,9 +369,9 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( auto file_metadata = file_reader_->parquet_reader()->metadata(); for (const auto& trg : target_row_groups_) { - if (trg.excluded_by_read_range) continue; + if (trg.IsExcludedByReadRange()) continue; - if (trg.is_partially_matched) { + if (trg.IsPartiallyMatched()) { // Page-filtered RGs: only matching page byte ranges. auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( file_reader_->parquet_reader(), trg, column_indices); @@ -378,7 +379,7 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( std::make_move_iterator(page_ranges.end())); } else { // Fully-matched RGs: entire column chunk ranges. - auto rg_metadata = file_metadata->RowGroup(trg.row_group_index); + auto rg_metadata = file_metadata->RowGroup(trg.GetRowGroupIndex()); for (int32_t col_idx : column_indices) { auto col_chunk = rg_metadata->ColumnChunk(col_idx); int64_t offset = col_chunk->data_page_offset(); @@ -416,12 +417,12 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t std::vector fully_matched_row_groups; uint64_t active_count = 0; for (const auto& trg : target_row_groups_) { - if (trg.excluded_by_read_range) { + if (trg.IsExcludedByReadRange()) { continue; } active_count++; - if (!trg.is_partially_matched) { - fully_matched_row_groups.push_back(trg.row_group_index); + if (!trg.IsPartiallyMatched()) { + fully_matched_row_groups.push_back(trg.GetRowGroupIndex()); } } @@ -455,14 +456,15 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t // Reset read state. Find the first non-excluded row group. uint64_t first_active_idx = 0; while (first_active_idx < target_row_groups_.size() && - target_row_groups_[first_active_idx].excluded_by_read_range) { + target_row_groups_[first_active_idx].IsExcludedByReadRange()) { first_active_idx++; } if (first_active_idx >= target_row_groups_.size()) { next_row_to_read_ = num_rows_; } else { next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[first_active_idx].row_group_index].first; + all_row_group_ranges_[target_row_groups_[first_active_idx].GetRowGroupIndex()] + .first; } previous_first_row_ = std::numeric_limits::max(); current_row_group_idx_ = first_active_idx; @@ -476,7 +478,7 @@ Status FileReaderWrapper::ApplyReadRanges( const std::vector>& read_ranges) { if (read_ranges.empty()) { for (auto& trg : target_row_groups_) { - trg.excluded_by_read_range = true; + trg.SetExcludedByReadRange(true); } reader_initialized_ = false; return Status::OK(); @@ -492,7 +494,7 @@ Status FileReaderWrapper::ApplyReadRanges( } // Mark each target row group as excluded or not based on the matching set. for (auto& trg : target_row_groups_) { - trg.excluded_by_read_range = matching_rg_indices.count(trg.row_group_index) == 0; + trg.SetExcludedByReadRange(matching_rg_indices.count(trg.GetRowGroupIndex()) == 0); } reader_initialized_ = false; return Status::OK(); diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index 748d4052f..29ecb2bb8 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -33,6 +33,7 @@ #include "arrow/type_fwd.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/result.h" #include "paimon/status.h" #include "parquet/arrow/reader.h" diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 9c87438b8..20c5efb97 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -234,8 +234,8 @@ Result> PageFilteredRowGroupReader::Re const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, const std::vector<::arrow::io::ReadRange>& page_ranges, int64_t max_chunksize, std::shared_ptr<::arrow::MemoryPool> pool) { - const auto& row_ranges = target_row_group.row_ranges; - int32_t row_group_index = target_row_group.row_group_index; + const auto& row_ranges = target_row_group.GetRowRanges(); + int32_t row_group_index = target_row_group.GetRowGroupIndex(); if (row_ranges.IsEmpty()) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr empty_table, @@ -289,8 +289,8 @@ Result> PageFilteredRowGroupReader::Re std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRanges( ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, const std::vector& column_indices) { - int32_t row_group_index = target_row_group.row_group_index; - const auto& row_ranges = target_row_group.row_ranges; + int32_t row_group_index = target_row_group.GetRowGroupIndex(); + const auto& row_ranges = target_row_group.GetRowRanges(); std::vector<::arrow::io::ReadRange> ranges; auto file_metadata = parquet_reader->metadata(); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index 5092bb5ca..c7376512f 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -27,6 +27,7 @@ #include "arrow/record_batch.h" #include "arrow/type.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/result.h" #include "parquet/column_reader.h" #include "parquet/file_reader.h" diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index a963efaba..16008f105 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -44,6 +44,7 @@ #include "paimon/status.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/utils/roaring_bitmap32.h" #include "parquet/arrow/reader.h" #include "parquet/file_reader.h" #include "parquet/properties.h" @@ -129,6 +130,29 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); } + /// Read back a Parquet file with a predicate, a bitmap, and page index filter enabled. + void ReadWithPredicateAndBitmapImpl(const std::string& file_name, + const std::shared_ptr& read_schema, + const std::shared_ptr& predicate, + const RoaringBitmap32& bitmap, + std::shared_ptr* out, + int32_t batch_size = 1024) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_name)); + ASSERT_OK_AND_ASSIGN(int64_t length, in->Length()); + auto in_stream = std::make_shared(in, arrow_pool_, length); + + std::map options; + options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = "true"; + ASSERT_OK_AND_ASSIGN( + auto batch_reader, + ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size)); + auto c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate, bitmap)); + ASSERT_OK_AND_ASSIGN(*out, + paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); + } + protected: std::shared_ptr arrow_pool_; std::shared_ptr pool_; @@ -873,5 +897,562 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) auto partial_concat = arrow::Concatenate(result_partial->chunks()).ValueOrDie(); ASSERT_TRUE(partial_concat->Equals(expected_struct)); } +/// Helper: build a StructArray with a top-level int32 "id" column and a nested struct column +/// "info" containing two int32 fields: "x" and "y". +/// id[i] = i, info.x[i] = i * 100, info.y[i] = i * 100 + 1, for i in [0, N). +/// +/// Arrow schema: { id: int32, info: struct } +/// Parquet leaf columns: [id (index 0), info.x (index 1), info.y (index 2)] +static std::shared_ptr MakeNestedStructData(int32_t num_rows) { + arrow::Int32Builder id_builder, x_builder, y_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + EXPECT_TRUE(x_builder.Reserve(num_rows).ok()); + EXPECT_TRUE(y_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + x_builder.UnsafeAppend(i * 100); + y_builder.UnsafeAppend(i * 100 + 1); + } + auto id_array = id_builder.Finish().ValueOrDie(); + auto x_array = x_builder.Finish().ValueOrDie(); + auto y_array = y_builder.Finish().ValueOrDie(); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto inner_struct = + arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + return arrow::StructArray::Make({id_array, inner_struct}, {field_id, field_info}).ValueOrDie(); +} + +/// Test: rowgroup-level filtering on a file with nested struct columns. +/// +/// This test exposes the bug where BuildPageFilteredSchema fails to correctly map +/// Parquet leaf column indices to Arrow fields for nested types, and +/// ReadFilteredRowGroup cannot correctly assemble nested column results. +/// +/// Schema: { id: int32, info: struct } +/// Parquet leaf columns: [id=0, info.x=1, info.y=2] +/// 100 rows, 10 per page, 1 row group. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +/// The read schema requests both "id" and "info" columns. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnPageFilter) { + std::string file_name = dir_->Str() + "/nested_struct_filter.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("info", arrow::struct_({field_x, field_y}))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + // Should get rows 50-99 = 50 rows + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: rowgroup-level filtering reading the nested struct column along with the predicate column. +/// +/// This verifies that when reading a subset of columns that includes a nested column +/// and the predicate column, the schema mapping and column assembly work correctly. +/// +/// Schema: { id: int32, info: struct } +/// Read schema: { id: int32, info: struct } +/// Predicate on "id": id >= 50. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnOnlyReadNestedField) { + std::string file_name = dir_->Str() + "/nested_struct_only_nested.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + // Read both "id" (needed for predicate evaluation) and "info" columns + auto read_schema = arrow::schema({field_id, field_info}); + + // Predicate is on "id" (field_index=0 in file schema) + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + // Should get rows 50-99 = 50 rows + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected: "id" and "info" fields from rows 50-99 + auto sliced = std::dynamic_pointer_cast(data->Slice(50, 50)); + ASSERT_TRUE(sliced); + auto expected = + arrow::StructArray::Make({sliced->field(0), sliced->field(1)}, {field_id, field_info}) + .ValueOrDie(); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Helper: build a StructArray with an int32 "id" column and a list "tags" column. +/// id[i] = i, tags[i] = [i*10, i*10+1], for i in [0, N). +/// +/// Arrow schema: { id: int32, tags: list } +/// Parquet leaf columns: [id (index 0), tags.item (index 1)] +static std::shared_ptr MakeListColumnData(int32_t num_rows) { + arrow::Int32Builder id_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + } + auto id_array = id_builder.Finish().ValueOrDie(); + + auto value_builder = std::make_shared(); + arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder); + for (int32_t i = 0; i < num_rows; ++i) { + EXPECT_TRUE(list_builder.Append().ok()); + EXPECT_TRUE(value_builder->Append(i * 10).ok()); + EXPECT_TRUE(value_builder->Append(i * 10 + 1).ok()); + } + auto list_array = list_builder.Finish().ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + return arrow::StructArray::Make({id_array, list_array}, {field_id, field_tags}).ValueOrDie(); +} + +/// Helper: build a StructArray with an int32 "id" column and a map "props" column. +/// id[i] = i, props[i] = {"k_i": i * 100}, for i in [0, N). +/// +/// Arrow schema: { id: int32, props: map } +/// Parquet leaf columns: [id (index 0), props.key (index 1), props.value (index 2)] +static std::shared_ptr MakeMapColumnData(int32_t num_rows) { + arrow::Int32Builder id_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + } + auto id_array = id_builder.Finish().ValueOrDie(); + + auto key_builder = std::make_shared(); + auto value_builder = std::make_shared(); + arrow::MapBuilder map_builder(arrow::default_memory_pool(), key_builder, value_builder); + for (int32_t i = 0; i < num_rows; ++i) { + EXPECT_TRUE(map_builder.Append().ok()); + std::string key = "k_" + std::to_string(i); + EXPECT_TRUE(key_builder->Append(key).ok()); + EXPECT_TRUE(value_builder->Append(i * 100).ok()); + } + auto map_array = map_builder.Finish().ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_props = arrow::field("props", arrow::map(arrow::utf8(), arrow::int32())); + return arrow::StructArray::Make({id_array, map_array}, {field_id, field_props}).ValueOrDie(); +} + +/// Test: rowgroup-level filtering on a file with a list column. +/// +/// Schema: { id: int32, tags: list } +/// 100 rows, 10 per page, 1 row group. +/// Predicate: id >= 50 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, NestedListColumnPageFilter) { + std::string file_name = dir_->Str() + "/nested_list_filter.parquet"; + auto data = MakeListColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::list(arrow::field("item", arrow::int32())))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: rowgroup filtering on a file with a map column. +/// +/// Schema: { id: int32, props: map } +/// 100 rows, 10 per page, 1 row group. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, NestedMapColumnPageFilter) { + std::string file_name = dir_->Str() + "/nested_map_filter.parquet"; + auto data = MakeMapColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("props", arrow::map(arrow::utf8(), arrow::int32()))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: nested map projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, props: map } +/// 100 rows, 10 per page, 1 row group. +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read, +/// so rows [0, 99] should all be returned. +TEST_F(PageFilteredRowGroupReaderTest, NestedMapBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_map_projection_fallback.parquet"; + auto data = MakeMapColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_props = arrow::field("props", arrow::map(arrow::utf8(), arrow::int32())); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_props}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + // Because page-level filtering is skipped for nested schemas, we read full row groups. + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: nested list projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, tags: list } +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read. +TEST_F(PageFilteredRowGroupReaderTest, NestedListBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_list_projection_fallback.parquet"; + auto data = MakeListColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_tags}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: nested struct projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, info: struct } +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_struct_projection_fallback.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_info}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: rowgroup-level filtering with multiple adjacent nested columns (struct + list). +/// +/// Schema: { id: int32, info: struct, tags: list } +/// This tests the boundary handling when two nested fields are adjacent in the schema. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, MultipleAdjacentNestedColumns) { + std::string file_name = dir_->Str() + "/multi_nested.parquet"; + + // Build data with id, info (struct), tags (list) + arrow::Int32Builder id_builder, x_builder, y_builder; + ASSERT_TRUE(id_builder.Reserve(100).ok()); + ASSERT_TRUE(x_builder.Reserve(100).ok()); + ASSERT_TRUE(y_builder.Reserve(100).ok()); + auto value_builder = std::make_shared(); + arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder); + + for (int32_t i = 0; i < 100; ++i) { + id_builder.UnsafeAppend(i); + x_builder.UnsafeAppend(i * 100); + y_builder.UnsafeAppend(i * 100 + 1); + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(value_builder->Append(i * 10).ok()); + } + auto id_array = id_builder.Finish().ValueOrDie(); + auto x_array = x_builder.Finish().ValueOrDie(); + auto y_array = y_builder.Finish().ValueOrDie(); + auto list_array = list_builder.Finish().ValueOrDie(); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto inner_struct = + arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + auto data = arrow::StructArray::Make({id_array, inner_struct, list_array}, + {field_id, field_info, field_tags}) + .ValueOrDie(); + + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = arrow::schema({field_id, field_info, field_tags}); + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} +/// Test: bitmap hits all pages of a subset of row groups (no predicate). +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// RG0: rows 0-99, RG1: rows 100-199. +/// Bitmap: {0..99} hits all pages of RG0, RG1 is excluded entirely. +/// Expected: 100 rows (0-99). +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllPagesSomeRowGroups) { + std::string file_name = dir_->Str() + "/bitmap_all_pages_rg.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // hits all of RG0 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(100, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(i, val_arr->Value(i)); + } +} + +/// Test: bitmap hits partial pages of a row group (no predicate). +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {30..59} hits pages 3-5 of RG0 (rows 30-59), RG1 excluded. +/// Expected: 30 rows (30-59). +TEST_F(PageFilteredRowGroupReaderTest, BitmapPartialPagesSingleRowGroup) { + std::string file_name = dir_->Str() + "/bitmap_partial_pages_rg.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(90, 110); // hits pages 3-5 of RG0 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(20, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 20; ++i) { + ASSERT_EQ(90 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap hits all pages of some row groups and partial pages of others. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} hits all of RG0 + {120..149} hits pages 2-4 of RG1. +/// Expected: 100 (RG0) + 30 (RG1 partial) = 130 rows. +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllAndPartialPagesMixed) { + std::string file_name = dir_->Str() + "/bitmap_all_and_partial.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // all of RG0 + bitmap.AddRange(120, 150); // pages 2-4 of RG1 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(130, result->length()); + + // Verify: rows 0-99 + 120-149 + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(i, val_arr->Value(i)); + } + for (int32_t i = 0; i < 30; ++i) { + ASSERT_EQ(120 + i, val_arr->Value(100 + i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits all pages of some row groups. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} hits all of RG0. +/// Predicate: val >= 50. Page-level filtering on RG0: pages 5-9. +/// Expected: 50 rows (50-99). +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllPagesWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_all_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // hits all of RG0 + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(50)); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(50 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits partial pages of a row group. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {30..59} hits pages 3-5 of RG0 (rows 30-59). +/// Predicate: val >= 40. Page-level filtering further narrows to pages 4-5 (rows 40-59). +/// Expected: 20 rows (40-59). +TEST_F(PageFilteredRowGroupReaderTest, BitmapPartialPagesWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_partial_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(30, 60); // hits pages 3-5 of RG0 + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(40)); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(20, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 20; ++i) { + ASSERT_EQ(40 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits all pages of some RG and +/// partial pages of another. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} (all of RG0) + {120..149} (pages 2-4 of RG1). +/// Predicate: val >= 50 AND val < 160. +/// RG0: all pages → page-filtered to val>=50 → rows 50-99 (50 rows) +/// RG1: pages 2-4 (120-149) → page-filtered to val>=50 AND val<160 → all match (30 rows) +/// Expected: 80 rows (50-99 + 120-149). +TEST_F(PageFilteredRowGroupReaderTest, BitmapMixedWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_mixed_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // all of RG0 + bitmap.AddRange(120, 150); // pages 2-4 of RG1 + + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::And( + {PredicateBuilder::GreaterOrEqual(/*field_index=*/0, /*field_name=*/"val", + FieldType::INT, Literal(50)), + PredicateBuilder::LessThan(/*field_index=*/0, /*field_name=*/"val", FieldType::INT, + Literal(160))})); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(80, result->length()); + + // Verify: rows 50-99 + 120-149 + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(50 + i, val_arr->Value(i)); + } + for (int32_t i = 0; i < 30; ++i) { + ASSERT_EQ(120 + i, val_arr->Value(50 + i)); + } +} } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 7533cb99a..8f221bf6d 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -39,8 +39,10 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" +#include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/format/parquet/parquet_field_id_converter.h" #include "paimon/format/parquet/parquet_format_defs.h" +#include "paimon/format/parquet/parquet_schema_util.h" #include "paimon/format/parquet/parquet_timestamp_converter.h" #include "paimon/format/parquet/predicate_converter.h" #include "paimon/reader/batch_reader.h" @@ -129,6 +131,13 @@ Status ParquetFileBatchReader::SetReadSchema( PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_schema, reader_->GetSchema()); std::unordered_map> field_index_map; + bool has_nested_field = false; + for (const auto& field : read_schema->fields()) { + if (ArrowSchemaValidator::IsNestedType(field->type())) { + has_nested_field = true; + break; + } + } int32_t i = 0; for (const auto& field : file_schema->fields()) { std::vector v; @@ -147,26 +156,32 @@ Status ParquetFileBatchReader::SetReadSchema( } } - std::vector row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups()); + TargetRowGroups target_row_groups = + TargetRowGroup::MakeSerialRowGroups(reader_->GetNumberOfRowGroups()); if (predicate) { - PAIMON_ASSIGN_OR_RAISE(row_groups, - FilterRowGroupsByPredicate(predicate, file_schema, row_groups)); + PAIMON_ASSIGN_OR_RAISE( + target_row_groups, + FilterRowGroupsByPredicate(predicate, file_schema, target_row_groups)); } if (selection_bitmap) { - PAIMON_ASSIGN_OR_RAISE(row_groups, - FilterRowGroupsByBitmap(selection_bitmap.value(), row_groups)); + // walkaround: page index filter does not support nested fields for now, skip page index + // bitmap pushdown if there is any nested field in the schema + PAIMON_ASSIGN_OR_RAISE(target_row_groups, + FilterRowGroupsByBitmap(selection_bitmap.value(), + target_row_groups, has_nested_field)); } // Apply page-level filtering after bitmap pruning so we don't read page index // pages for row groups that the bitmap already excluded. // If no predicate is provided, skip page-level filtering, row_group_row_ranges will be // empty - std::map row_group_row_ranges; - if (predicate && !row_groups.empty()) { + if (predicate && !target_row_groups.empty()) { PAIMON_ASSIGN_OR_RAISE( bool enable_page_index_filter, OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); - if (enable_page_index_filter) { + // walkaround: page index filter does not support nested fields for now, skip page index + // filter if there is any nested field in the schema + if (enable_page_index_filter && !has_nested_field) { // Build column name to index map for page-level filtering. // For leaf columns, indices[0] is the correct leaf column index in Parquet. // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, @@ -178,13 +193,9 @@ Status ParquetFileBatchReader::SetReadSchema( column_name_to_index[name] = indices[0]; } } - - std::pair, std::map> page_filter_result; PAIMON_ASSIGN_OR_RAISE( - page_filter_result, - FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); - row_groups = std::move(page_filter_result.first); - row_group_row_ranges = std::move(page_filter_result.second); + target_row_groups, + FilterRowGroupsByPageIndex(predicate, column_name_to_index, target_row_groups)); } } @@ -192,30 +203,17 @@ Status ParquetFileBatchReader::SetReadSchema( metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups()); - metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, row_groups.size()); - - // Build TargetRowGroup list with page-filter info in one shot. - std::vector target_row_groups; - for (int32_t rg_id : row_groups) { - auto it = row_group_row_ranges.find(rg_id); - if (it != row_group_row_ranges.end()) { - target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true, - /*ranges=*/it->second); - } else { - target_row_groups.emplace_back(/*rg_index=*/rg_id, - /*is_partially_matched=*/false, - /*ranges=*/RowRanges()); - } - } + metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, + target_row_groups.size()); PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") return Status::OK(); } -Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( +Result ParquetFileBatchReader::FilterRowGroupsByPredicate( const std::shared_ptr& predicate, const std::shared_ptr file_schema, - const std::vector& src_row_groups) const { + const TargetRowGroups& src_row_groups) const { if (!predicate) { return Status::Invalid("cannot pushdown an empty predicate"); } @@ -238,58 +236,129 @@ Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( std::shared_ptr file_fragment, parquet_file_format->MakeFragment( file_source, /*partition_expression=*/PredicateConverter::AlwaysTrue(), - /*physical_schema=*/nullptr, /*row_groups=*/src_row_groups)); + /*physical_schema=*/nullptr, + /*row_groups=*/TargetRowGroup::GetRowGroupIndices(src_row_groups))); PAIMON_RETURN_NOT_OK_FROM_ARROW( file_fragment->EnsureCompleteMetadata(reader_->GetFileReader())); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::dataset::FragmentVector target_fragments, file_fragment->SplitByRowGroup(bind_expr)); - std::vector target_row_groups; + TargetRowGroups target_row_groups; target_row_groups.reserve(src_row_groups.size()); for (const auto& fragment : target_fragments) { auto parquet_fragment = dynamic_cast(fragment.get()); if (!parquet_fragment) { return Status::Invalid("cannot cast to ParquetFileFragment in ParquetFileBatchReader"); } - target_row_groups.insert(target_row_groups.end(), parquet_fragment->row_groups().begin(), - parquet_fragment->row_groups().end()); + for (auto rg_index : parquet_fragment->row_groups()) { + target_row_groups.emplace_back(rg_index); + } } return target_row_groups; } -Result> ParquetFileBatchReader::FilterRowGroupsByBitmap( - const RoaringBitmap32& bitmap, const std::vector& src_row_groups) const { +Result ParquetFileBatchReader::FilterRowGroupsByBitmap( + const RoaringBitmap32& bitmap, const TargetRowGroups& src_row_groups, + bool has_nested_column) const { if (bitmap.IsEmpty()) { return Status::Invalid("cannot push down an empty bitmap to ParquetFileBatchReader"); } + + auto meta_data = reader_->GetFileReader()->parquet_reader()->metadata(); const auto& all_row_group_ranges = reader_->GetAllRowGroupRanges(); - // filter row groups by row range - std::vector target_row_groups; - for (const auto& row_group_idx : src_row_groups) { + + TargetRowGroups target_row_groups; + for (const auto& row_group : src_row_groups) { + int32_t row_group_idx = row_group.GetRowGroupIndex(); if (static_cast(row_group_idx) >= all_row_group_ranges.size()) { return Status::Invalid( fmt::format("src row group {} not in row group meta", row_group_idx)); } const auto& [start_row_idx, end_row_idx] = all_row_group_ranges[row_group_idx]; - if (bitmap.ContainsAny(start_row_idx, end_row_idx)) { - target_row_groups.push_back(row_group_idx); + if (!bitmap.ContainsAny(start_row_idx, end_row_idx)) { + continue; + } + + int64_t rg_row_count = meta_data->RowGroup(row_group_idx)->num_rows(); + if (has_nested_column) { + // For nested schema, we cannot apply page-level filtering, so we directly add the whole + // row group if bitmap matches. + target_row_groups.emplace_back(row_group_idx); + continue; + } + auto page_ranges = FilterPagesByBitmap(bitmap, row_group_idx, start_row_idx, rg_row_count); + if (page_ranges.has_value()) { + target_row_groups.emplace_back(/*row_group_idx=*/row_group_idx, + /*is_partially_matched=*/true, + /*row_ranges=*/page_ranges.value()); + } else { + target_row_groups.emplace_back(row_group_idx); } } return target_row_groups; } +std::optional ParquetFileBatchReader::FilterPagesByBitmap(const RoaringBitmap32& bitmap, + int32_t row_group_idx, + uint64_t rg_start_row, + int64_t rg_row_count) const { + int32_t column_with_offset_index = FindColumnWithOffsetIndex(row_group_idx); + if (column_with_offset_index < 0) { + return std::nullopt; + } + + auto page_index_reader = reader_->GetPageIndexReader(); + if (!page_index_reader) { + return std::nullopt; + } + + auto rg_page_index_reader = page_index_reader->RowGroup(row_group_idx); + if (!rg_page_index_reader) { + return std::nullopt; + } + + auto offset_index = rg_page_index_reader->GetOffsetIndex(column_with_offset_index); + if (!offset_index) { + return std::nullopt; + } + + const auto& pages = offset_index->page_locations(); + auto num_pages = static_cast(pages.size()); + RowRanges filtered_row_ranges; + for (int64_t i = 0; i < num_pages; ++i) { + int64_t page_start_row = pages[i].first_row_index; + // The bitmap is [from, to) while page row range is [from, to] + int64_t page_end_row = + (i + 1 < num_pages) ? pages[i + 1].first_row_index - 1 : rg_row_count - 1; + if (bitmap.ContainsAny(rg_start_row + page_start_row, rg_start_row + page_end_row + 1)) { + filtered_row_ranges.Add(RowRanges::Range(page_start_row, page_end_row)); + } + } + return filtered_row_ranges; +} + +int32_t ParquetFileBatchReader::FindColumnWithOffsetIndex(int32_t row_group_idx) const { + auto rg_meta = reader_->GetFileReader()->parquet_reader()->metadata()->RowGroup(row_group_idx); + if (!rg_meta) { + return -1; + } + for (int col = 0; col < rg_meta->num_columns(); ++col) { + if (rg_meta->ColumnChunk(col)->GetOffsetIndexLocation().has_value()) { + return col; + } + } + return -1; +} + // Uses page-level column index statistics to filter row groups and store per-row-group // RowRanges for true page-level skipping. A row group is excluded if ALL its pages are // determined to not match the predicate. For partially matched row groups, RowRanges // are stored for page-level filtering during reading. -Result, std::map>> -ParquetFileBatchReader::FilterRowGroupsByPageIndex( +Result ParquetFileBatchReader::FilterRowGroupsByPageIndex( const std::shared_ptr& predicate, const std::map& column_name_to_index, - const std::vector& src_row_groups) { - std::map rg_row_ranges; - + const TargetRowGroups& src_row_groups) const { if (!predicate) { - return std::make_pair(src_row_groups, rg_row_ranges); + return src_row_groups; } auto page_index_reader = reader_->GetPageIndexReader(); @@ -297,35 +366,41 @@ ParquetFileBatchReader::FilterRowGroupsByPageIndex( PAIMON_LOG_DEBUG(logger_, "Page index not available in file, skipping page-level filtering (%s)", PARQUET_WRITE_ENABLE_PAGE_INDEX); - return std::make_pair(src_row_groups, rg_row_ranges); + return src_row_groups; } auto file_metadata = reader_->GetFileReader()->parquet_reader()->metadata(); - std::vector target_row_groups; - target_row_groups.reserve(src_row_groups.size()); + TargetRowGroups target_row_groups; - for (int32_t row_group_idx : src_row_groups) { + for (const auto& row_group : src_row_groups) { + int32_t row_group_idx = row_group.GetRowGroupIndex(); auto result = reader_->CalculateFilteredRowRanges(row_group_idx, predicate, column_name_to_index); if (!result.ok()) { - target_row_groups.push_back(row_group_idx); + target_row_groups.emplace_back(row_group); continue; } const auto& row_ranges = result.value(); if (!row_ranges.IsEmpty()) { - target_row_groups.push_back(row_group_idx); - int64_t rg_row_count = file_metadata->RowGroup(row_group_idx)->num_rows(); - if (row_ranges.RowCount() < rg_row_count) { - rg_row_ranges[row_group_idx] = row_ranges; + auto intersection = row_group.IsPartiallyMatched() + ? RowRanges::Intersection(row_group.GetRowRanges(), row_ranges) + : row_ranges; + if (intersection.IsEmpty()) { + continue; + } + if (intersection.RowCount() < rg_row_count) { + target_row_groups.emplace_back(row_group_idx, true, intersection); + } else { + target_row_groups.emplace_back(row_group_idx); } } } - return std::make_pair(std::move(target_row_groups), std::move(rg_row_ranges)); + return target_row_groups; } Result ParquetFileBatchReader::NextBatch() { diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 8dc412c30..035d85b43 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -37,6 +37,7 @@ #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/file_reader_wrapper.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/logging.h" #include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" @@ -119,7 +120,7 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { } bool SupportPreciseBitmapSelection() const override { - return false; + return true; } private: @@ -149,22 +150,28 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { index_vector->push_back((*index)++); } } + int32_t FindColumnWithOffsetIndex(int32_t row_group_idx) const; + + std::optional FilterPagesByBitmap(const RoaringBitmap32& bitmap, + int32_t row_group_idx, uint64_t rg_start_row, + int64_t rg_row_count) const; // precondition: predicate supposed not be empty - Result> FilterRowGroupsByPredicate( + Result FilterRowGroupsByPredicate( const std::shared_ptr& predicate, const std::shared_ptr file_schema, - const std::vector& src_row_groups) const; + const TargetRowGroups& src_row_groups) const; - Result> FilterRowGroupsByBitmap( - const RoaringBitmap32& bitmap, const std::vector& src_row_groups) const; + Result FilterRowGroupsByBitmap(const RoaringBitmap32& bitmap, + const TargetRowGroups& src_row_groups, + bool has_nested_column) const; // Apply page-level filtering using column index. // Returns (filtered row groups, per-row-group RowRanges for partial matches). - Result, std::map>> - FilterRowGroupsByPageIndex(const std::shared_ptr& predicate, - const std::map& column_name_to_index, - const std::vector& src_row_groups); + Result FilterRowGroupsByPageIndex( + const std::shared_ptr& predicate, + const std::map& column_name_to_index, + const TargetRowGroups& src_row_groups) const; private: std::map options_; diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 46c3f4d21..b2b8338db 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -105,21 +105,4 @@ class RowRanges { private: std::vector ranges_; }; - -struct TargetRowGroup { - int32_t row_group_index{-1}; - bool is_partially_matched{false}; - // page-filtered row ranges, only valid if is_partially_matched is true. - RowRanges row_ranges; - // Whether this row group has been excluded by ApplyReadRanges. - // When true, this row group is logically skipped during iteration - // but retained so that a subsequent wider ApplyReadRanges can restore it. - bool excluded_by_read_range{false}; - - TargetRowGroup() = default; - TargetRowGroup(int32_t rg_index, bool is_partially_matched, RowRanges ranges) - : row_group_index(rg_index), - is_partially_matched(is_partially_matched), - row_ranges(std::move(ranges)) {} -}; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/target_row_group.h b/src/paimon/format/parquet/target_row_group.h new file mode 100644 index 000000000..d0efb7e6b --- /dev/null +++ b/src/paimon/format/parquet/target_row_group.h @@ -0,0 +1,89 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/format/parquet/row_ranges.h" + +namespace paimon::parquet { +class TargetRowGroup; +using TargetRowGroups = std::vector; +class TargetRowGroup { + public: + explicit TargetRowGroup(int32_t rg_index) : row_group_index(rg_index) {} + TargetRowGroup(int32_t rg_index, bool is_partially_matched, RowRanges ranges) + : row_group_index(rg_index), + is_partially_matched(is_partially_matched), + row_ranges(std::move(ranges)) {} + + TargetRowGroup(const TargetRowGroup& other) = default; + + bool IsExcludedByReadRange() const { + return excluded_by_read_range; + } + + void SetExcludedByReadRange(bool excluded) { + excluded_by_read_range = excluded; + } + + int32_t GetRowGroupIndex() const { + return row_group_index; + } + + bool IsPartiallyMatched() const { + return is_partially_matched; + } + + const RowRanges& GetRowRanges() const { + return row_ranges; + } + + static TargetRowGroups MakeSerialRowGroups(int32_t num_row_groups) { + TargetRowGroups target_row_groups; + target_row_groups.reserve(num_row_groups); + for (int32_t i = 0; i < num_row_groups; ++i) { + target_row_groups.emplace_back(i); + } + return target_row_groups; + } + + static std::vector GetRowGroupIndices(const TargetRowGroups& target_row_groups) { + std::vector indices; + indices.reserve(target_row_groups.size()); + for (const auto& rg : target_row_groups) { + indices.push_back(rg.GetRowGroupIndex()); + } + return indices; + } + + private: + int32_t row_group_index{-1}; + bool is_partially_matched{false}; + // page-filtered row ranges, only valid if is_partially_matched is true. + RowRanges row_ranges; + // Whether this row group has been excluded by ApplyReadRanges. + // When true, this row group is logically skipped during iteration + // but retained so that a subsequent wider ApplyReadRanges can restore it. + bool excluded_by_read_range{false}; +}; + +} // namespace paimon::parquet