diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index a963efab..a3cb9869 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -873,5 +873,275 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) auto partial_concat = arrow::Concatenate(result_partial->chunks()).ValueOrDie(); ASSERT_TRUE(partial_concat->Equals(expected_struct)); } +/// Helper: build a StructArray with a top-level int32 "id" column and a nested struct column +/// "info" containing two int32 fields: "x" and "y". +/// id[i] = i, info.x[i] = i * 100, info.y[i] = i * 100 + 1, for i in [0, N). +/// +/// Arrow schema: { id: int32, info: struct } +/// Parquet leaf columns: [id (index 0), info.x (index 1), info.y (index 2)] +static std::shared_ptr MakeNestedStructData(int32_t num_rows) { + arrow::Int32Builder id_builder, x_builder, y_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + EXPECT_TRUE(x_builder.Reserve(num_rows).ok()); + EXPECT_TRUE(y_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + x_builder.UnsafeAppend(i * 100); + y_builder.UnsafeAppend(i * 100 + 1); + } + auto id_array = id_builder.Finish().ValueOrDie(); + auto x_array = x_builder.Finish().ValueOrDie(); + auto y_array = y_builder.Finish().ValueOrDie(); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto inner_struct = + arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + return arrow::StructArray::Make({id_array, inner_struct}, {field_id, field_info}).ValueOrDie(); +} + +/// Test: rowgroup-level filtering on a file with nested struct columns. +/// +/// This test exposes the bug where BuildPageFilteredSchema fails to correctly map +/// Parquet leaf column indices to Arrow fields for nested types, and +/// ReadFilteredRowGroup cannot correctly assemble nested column results. +/// +/// Schema: { id: int32, info: struct } +/// Parquet leaf columns: [id=0, info.x=1, info.y=2] +/// 100 rows, 10 per page, 2 row groups. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +/// The read schema requests both "id" and "info" columns. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnRowGroupFilter) { + std::string file_name = dir_->Str() + "/nested_struct_filter.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("info", arrow::struct_({field_x, field_y}))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + // Should get rows 50-99 = 50 rows + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: Page-level filtering reading the nested struct column along with the predicate column. +/// +/// This verifies that when reading a subset of columns that includes a nested column +/// and the predicate column, the schema mapping and column assembly work correctly. +/// +/// Schema: { id: int32, info: struct } +/// Read schema: { id: int32, info: struct } +/// Predicate on "id": id >= 70. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnOnlyReadIdField) { + std::string file_name = dir_->Str() + "/nested_struct_only_nested.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + // Read "id" column only + auto read_schema = arrow::schema({field_id}); + + // Predicate is on "id" (field_index=0 in file schema) + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + // Should get rows 70-99 = 30 rows + ASSERT_TRUE(result); + ASSERT_EQ(30, result->length()); + + auto result_struct = std::dynamic_pointer_cast(result->chunk(0)); + ASSERT_TRUE(result_struct); + ASSERT_TRUE(data->field(0)->Slice(70, 30)->Equals(result_struct->field(0))); +} + +/// Helper: build a StructArray with an int32 "id" column and a list "tags" column. +/// id[i] = i, tags[i] = [i*10, i*10+1], for i in [0, N). +/// +/// Arrow schema: { id: int32, tags: list } +/// Parquet leaf columns: [id (index 0), tags.item (index 1)] +static std::shared_ptr MakeListColumnData(int32_t num_rows) { + arrow::Int32Builder id_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + } + auto id_array = id_builder.Finish().ValueOrDie(); + + auto value_builder = std::make_shared(); + arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder); + for (int32_t i = 0; i < num_rows; ++i) { + EXPECT_TRUE(list_builder.Append().ok()); + EXPECT_TRUE(value_builder->Append(i * 10).ok()); + EXPECT_TRUE(value_builder->Append(i * 10 + 1).ok()); + } + auto list_array = list_builder.Finish().ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + return arrow::StructArray::Make({id_array, list_array}, {field_id, field_tags}).ValueOrDie(); +} + +/// Helper: build a StructArray with an int32 "id" column and a map "props" column. +/// id[i] = i, props[i] = {"k_i": i * 100}, for i in [0, N). +/// +/// Arrow schema: { id: int32, props: map } +/// Parquet leaf columns: [id (index 0), props.key (index 1), props.value (index 2)] +static std::shared_ptr MakeMapColumnData(int32_t num_rows) { + arrow::Int32Builder id_builder; + EXPECT_TRUE(id_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + id_builder.UnsafeAppend(i); + } + auto id_array = id_builder.Finish().ValueOrDie(); + + auto key_builder = std::make_shared(); + auto value_builder = std::make_shared(); + arrow::MapBuilder map_builder(arrow::default_memory_pool(), key_builder, value_builder); + for (int32_t i = 0; i < num_rows; ++i) { + EXPECT_TRUE(map_builder.Append().ok()); + std::string key = "k_" + std::to_string(i); + EXPECT_TRUE(key_builder->Append(key).ok()); + EXPECT_TRUE(value_builder->Append(i * 100).ok()); + } + auto map_array = map_builder.Finish().ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_props = arrow::field("props", arrow::map(arrow::utf8(), arrow::int32())); + return arrow::StructArray::Make({id_array, map_array}, {field_id, field_props}).ValueOrDie(); +} + +/// Test: rowgroup-level filtering on a file with a list column. +/// +/// Schema: { id: int32, tags: list } +/// 100 rows, 10 per page, 2 row groups. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, NestedListColumnRowGroupFilter) { + std::string file_name = dir_->Str() + "/nested_list_filter.parquet"; + auto data = MakeListColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::list(arrow::field("item", arrow::int32())))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: rowgroup filtering on a file with a map column. +/// +/// Schema: { id: int32, props: map } +/// 100 rows, 10 per page, 2 row groups. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, NestedMapColumnRowGroupFilter) { + std::string file_name = dir_->Str() + "/nested_map_filter.parquet"; + auto data = MakeMapColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("props", arrow::map(arrow::utf8(), arrow::int32()))}); + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: rowgroup-level filtering with multiple adjacent nested columns (struct + list). +/// +/// Schema: { id: int32, info: struct, tags: list } +/// This tests the boundary handling when two nested fields are adjacent in the schema. +/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected. +TEST_F(PageFilteredRowGroupReaderTest, MultipleAdjacentNestedColumns) { + std::string file_name = dir_->Str() + "/multi_nested.parquet"; + + // Build data with id, info (struct), tags (list) + arrow::Int32Builder id_builder, x_builder, y_builder; + ASSERT_TRUE(id_builder.Reserve(100).ok()); + ASSERT_TRUE(x_builder.Reserve(100).ok()); + ASSERT_TRUE(y_builder.Reserve(100).ok()); + auto value_builder = std::make_shared(); + arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder); + + for (int32_t i = 0; i < 100; ++i) { + id_builder.UnsafeAppend(i); + x_builder.UnsafeAppend(i * 100); + y_builder.UnsafeAppend(i * 100 + 1); + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(value_builder->Append(i * 10).ok()); + } + auto id_array = id_builder.Finish().ValueOrDie(); + auto x_array = x_builder.Finish().ValueOrDie(); + auto y_array = y_builder.Finish().ValueOrDie(); + auto list_array = list_builder.Finish().ValueOrDie(); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto inner_struct = + arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie(); + + auto field_id = arrow::field("id", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + auto data = arrow::StructArray::Make({id_array, inner_struct, list_array}, + {field_id, field_info, field_tags}) + .ValueOrDie(); + + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto read_schema = arrow::schema({field_id, field_info, field_tags}); + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70)); + + std::shared_ptr result; + ReadWithPredicateImpl(file_name, read_schema, predicate, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + // Build expected result: rows 50-99 from the original data + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 7533cb99..121f65d9 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -39,6 +39,7 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" +#include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/format/parquet/parquet_field_id_converter.h" #include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/format/parquet/parquet_timestamp_converter.h" @@ -129,6 +130,13 @@ Status ParquetFileBatchReader::SetReadSchema( PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_schema, reader_->GetSchema()); std::unordered_map> field_index_map; + bool has_nested_field = false; + for (const auto& field : read_schema->fields()) { + if (ArrowSchemaValidator::IsNestedType(field->type())) { + has_nested_field = true; + break; + } + } int32_t i = 0; for (const auto& field : file_schema->fields()) { std::vector v; @@ -166,7 +174,9 @@ Status ParquetFileBatchReader::SetReadSchema( bool enable_page_index_filter, OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); - if (enable_page_index_filter) { + // walkaround: page index filter does not support nested fields for now, skip page index + // filter if there is any nested field in the schema + if (enable_page_index_filter && !has_nested_field) { // Build column name to index map for page-level filtering. // For leaf columns, indices[0] is the correct leaf column index in Parquet. // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,