Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,5 +873,275 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding)
auto partial_concat = arrow::Concatenate(result_partial->chunks()).ValueOrDie();
ASSERT_TRUE(partial_concat->Equals(expected_struct));
}
/// Helper: build a StructArray with a top-level int32 "id" column and a nested struct column
/// "info" containing two int32 fields: "x" and "y".
/// id[i] = i, info.x[i] = i * 100, info.y[i] = i * 100 + 1, for i in [0, N).
///
/// Arrow schema: { id: int32, info: struct<x: int32, y: int32> }
/// Parquet leaf columns: [id (index 0), info.x (index 1), info.y (index 2)]
static std::shared_ptr<arrow::StructArray> MakeNestedStructData(int32_t num_rows) {
arrow::Int32Builder id_builder, x_builder, y_builder;
EXPECT_TRUE(id_builder.Reserve(num_rows).ok());
EXPECT_TRUE(x_builder.Reserve(num_rows).ok());
EXPECT_TRUE(y_builder.Reserve(num_rows).ok());
for (int32_t i = 0; i < num_rows; ++i) {
id_builder.UnsafeAppend(i);
x_builder.UnsafeAppend(i * 100);
y_builder.UnsafeAppend(i * 100 + 1);
}
auto id_array = id_builder.Finish().ValueOrDie();
auto x_array = x_builder.Finish().ValueOrDie();
auto y_array = y_builder.Finish().ValueOrDie();

auto field_x = arrow::field("x", arrow::int32());
auto field_y = arrow::field("y", arrow::int32());
auto inner_struct =
arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie();

auto field_id = arrow::field("id", arrow::int32());
auto field_info = arrow::field("info", arrow::struct_({field_x, field_y}));
return arrow::StructArray::Make({id_array, inner_struct}, {field_id, field_info}).ValueOrDie();
}

/// Test: rowgroup-level filtering on a file with nested struct columns.
///
/// This test exposes the bug where BuildPageFilteredSchema fails to correctly map
/// Parquet leaf column indices to Arrow fields for nested types, and
/// ReadFilteredRowGroup cannot correctly assemble nested column results.
///
/// Schema: { id: int32, info: struct<x: int32, y: int32> }
/// Parquet leaf columns: [id=0, info.x=1, info.y=2]
/// 100 rows, 10 per page, 2 row groups.
/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected.
/// The read schema requests both "id" and "info" columns.
TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnRowGroupFilter) {
std::string file_name = dir_->Str() + "/nested_struct_filter.parquet";
auto data = MakeNestedStructData(100);
WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50);

auto field_x = arrow::field("x", arrow::int32());
auto field_y = arrow::field("y", arrow::int32());
auto read_schema = arrow::schema({arrow::field("id", arrow::int32()),
arrow::field("info", arrow::struct_({field_x, field_y}))});

auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70));

std::shared_ptr<arrow::ChunkedArray> result;
ReadWithPredicateImpl(file_name, read_schema, predicate, &result);

// Should get rows 50-99 = 50 rows
ASSERT_TRUE(result);
ASSERT_EQ(50, result->length());

// Build expected result: rows 50-99 from the original data
auto expected = data->Slice(50, 50);
ASSERT_TRUE(expected->Equals(result->chunk(0)));
}

/// Test: Page-level filtering reading the nested struct column along with the predicate column.
///
/// This verifies that when reading a subset of columns that includes a nested column
/// and the predicate column, the schema mapping and column assembly work correctly.
///
/// Schema: { id: int32, info: struct<x: int32, y: int32> }
/// Read schema: { id: int32, info: struct<x: int32, y: int32> }
/// Predicate on "id": id >= 70.
TEST_F(PageFilteredRowGroupReaderTest, NestedStructColumnOnlyReadIdField) {
std::string file_name = dir_->Str() + "/nested_struct_only_nested.parquet";
auto data = MakeNestedStructData(100);
WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50);

auto field_id = arrow::field("id", arrow::int32());
auto field_x = arrow::field("x", arrow::int32());
auto field_y = arrow::field("y", arrow::int32());
auto field_info = arrow::field("info", arrow::struct_({field_x, field_y}));
// Read "id" column only
auto read_schema = arrow::schema({field_id});

// Predicate is on "id" (field_index=0 in file schema)
auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70));

std::shared_ptr<arrow::ChunkedArray> result;
ReadWithPredicateImpl(file_name, read_schema, predicate, &result);

// Should get rows 70-99 = 30 rows
ASSERT_TRUE(result);
ASSERT_EQ(30, result->length());

auto result_struct = std::dynamic_pointer_cast<arrow::StructArray>(result->chunk(0));
ASSERT_TRUE(result_struct);
ASSERT_TRUE(data->field(0)->Slice(70, 30)->Equals(result_struct->field(0)));
}

/// Helper: build a StructArray with an int32 "id" column and a list<int32> "tags" column.
/// id[i] = i, tags[i] = [i*10, i*10+1], for i in [0, N).
///
/// Arrow schema: { id: int32, tags: list<item: int32> }
/// Parquet leaf columns: [id (index 0), tags.item (index 1)]
static std::shared_ptr<arrow::StructArray> MakeListColumnData(int32_t num_rows) {
arrow::Int32Builder id_builder;
EXPECT_TRUE(id_builder.Reserve(num_rows).ok());
for (int32_t i = 0; i < num_rows; ++i) {
id_builder.UnsafeAppend(i);
}
auto id_array = id_builder.Finish().ValueOrDie();

auto value_builder = std::make_shared<arrow::Int32Builder>();
arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder);
for (int32_t i = 0; i < num_rows; ++i) {
EXPECT_TRUE(list_builder.Append().ok());
EXPECT_TRUE(value_builder->Append(i * 10).ok());
EXPECT_TRUE(value_builder->Append(i * 10 + 1).ok());
}
auto list_array = list_builder.Finish().ValueOrDie();

auto field_id = arrow::field("id", arrow::int32());
auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32())));
return arrow::StructArray::Make({id_array, list_array}, {field_id, field_tags}).ValueOrDie();
}

/// Helper: build a StructArray with an int32 "id" column and a map<utf8, int32> "props" column.
/// id[i] = i, props[i] = {"k_i": i * 100}, for i in [0, N).
///
/// Arrow schema: { id: int32, props: map<utf8, int32> }
/// Parquet leaf columns: [id (index 0), props.key (index 1), props.value (index 2)]
static std::shared_ptr<arrow::StructArray> MakeMapColumnData(int32_t num_rows) {
arrow::Int32Builder id_builder;
EXPECT_TRUE(id_builder.Reserve(num_rows).ok());
for (int32_t i = 0; i < num_rows; ++i) {
id_builder.UnsafeAppend(i);
}
auto id_array = id_builder.Finish().ValueOrDie();

auto key_builder = std::make_shared<arrow::StringBuilder>();
auto value_builder = std::make_shared<arrow::Int32Builder>();
arrow::MapBuilder map_builder(arrow::default_memory_pool(), key_builder, value_builder);
for (int32_t i = 0; i < num_rows; ++i) {
EXPECT_TRUE(map_builder.Append().ok());
std::string key = "k_" + std::to_string(i);
EXPECT_TRUE(key_builder->Append(key).ok());
EXPECT_TRUE(value_builder->Append(i * 100).ok());
}
auto map_array = map_builder.Finish().ValueOrDie();

auto field_id = arrow::field("id", arrow::int32());
auto field_props = arrow::field("props", arrow::map(arrow::utf8(), arrow::int32()));
return arrow::StructArray::Make({id_array, map_array}, {field_id, field_props}).ValueOrDie();
}

/// Test: rowgroup-level filtering on a file with a list column.
///
/// Schema: { id: int32, tags: list<item: int32> }
/// 100 rows, 10 per page, 2 row groups.
/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected.
TEST_F(PageFilteredRowGroupReaderTest, NestedListColumnRowGroupFilter) {
std::string file_name = dir_->Str() + "/nested_list_filter.parquet";
auto data = MakeListColumnData(100);
WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50);
Comment thread
zhf999 marked this conversation as resolved.

auto read_schema =
arrow::schema({arrow::field("id", arrow::int32()),
arrow::field("tags", arrow::list(arrow::field("item", arrow::int32())))});

auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70));

std::shared_ptr<arrow::ChunkedArray> result;
ReadWithPredicateImpl(file_name, read_schema, predicate, &result);

ASSERT_TRUE(result);
ASSERT_EQ(50, result->length());

// Build expected result: rows 50-99 from the original data
auto expected = data->Slice(50, 50);
ASSERT_TRUE(expected->Equals(result->chunk(0)));
}

/// Test: rowgroup filtering on a file with a map column.
///
/// Schema: { id: int32, props: map<utf8, int32> }
/// 100 rows, 10 per page, 2 row groups.
/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected.
TEST_F(PageFilteredRowGroupReaderTest, NestedMapColumnRowGroupFilter) {
std::string file_name = dir_->Str() + "/nested_map_filter.parquet";
auto data = MakeMapColumnData(100);
WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50);

auto read_schema =
arrow::schema({arrow::field("id", arrow::int32()),
arrow::field("props", arrow::map(arrow::utf8(), arrow::int32()))});

auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70));

std::shared_ptr<arrow::ChunkedArray> result;
ReadWithPredicateImpl(file_name, read_schema, predicate, &result);

ASSERT_TRUE(result);
ASSERT_EQ(50, result->length());

// Build expected result: rows 50-99 from the original data
auto expected = data->Slice(50, 50);
ASSERT_TRUE(expected->Equals(result->chunk(0)));
}

/// Test: rowgroup-level filtering with multiple adjacent nested columns (struct + list).
///
/// Schema: { id: int32, info: struct<x: int32, y: int32>, tags: list<item: int32> }
/// This tests the boundary handling when two nested fields are adjacent in the schema.
/// Predicate: id >= 70 → row groups 0 skipped, row groups 1 read → 50 rows expected.
TEST_F(PageFilteredRowGroupReaderTest, MultipleAdjacentNestedColumns) {
std::string file_name = dir_->Str() + "/multi_nested.parquet";

// Build data with id, info (struct), tags (list)
arrow::Int32Builder id_builder, x_builder, y_builder;
ASSERT_TRUE(id_builder.Reserve(100).ok());
ASSERT_TRUE(x_builder.Reserve(100).ok());
ASSERT_TRUE(y_builder.Reserve(100).ok());
auto value_builder = std::make_shared<arrow::Int32Builder>();
arrow::ListBuilder list_builder(arrow::default_memory_pool(), value_builder);

for (int32_t i = 0; i < 100; ++i) {
id_builder.UnsafeAppend(i);
x_builder.UnsafeAppend(i * 100);
y_builder.UnsafeAppend(i * 100 + 1);
ASSERT_TRUE(list_builder.Append().ok());
ASSERT_TRUE(value_builder->Append(i * 10).ok());
}
auto id_array = id_builder.Finish().ValueOrDie();
auto x_array = x_builder.Finish().ValueOrDie();
auto y_array = y_builder.Finish().ValueOrDie();
auto list_array = list_builder.Finish().ValueOrDie();

auto field_x = arrow::field("x", arrow::int32());
auto field_y = arrow::field("y", arrow::int32());
auto inner_struct =
arrow::StructArray::Make({x_array, y_array}, {field_x, field_y}).ValueOrDie();

auto field_id = arrow::field("id", arrow::int32());
auto field_info = arrow::field("info", arrow::struct_({field_x, field_y}));
auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32())));
auto data = arrow::StructArray::Make({id_array, inner_struct, list_array},
{field_id, field_info, field_tags})
.ValueOrDie();

WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50);

auto read_schema = arrow::schema({field_id, field_info, field_tags});
auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/0, /*field_name=*/"id", FieldType::INT, Literal(70));

std::shared_ptr<arrow::ChunkedArray> result;
ReadWithPredicateImpl(file_name, read_schema, predicate, &result);

ASSERT_TRUE(result);
ASSERT_EQ(50, result->length());

// Build expected result: rows 50-99 from the original data
auto expected = data->Slice(50, 50);
ASSERT_TRUE(expected->Equals(result->chunk(0)));
}

} // namespace paimon::parquet::test
12 changes: 11 additions & 1 deletion src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "paimon/common/metrics/metrics_impl.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/options_utils.h"
#include "paimon/core/schema/arrow_schema_validator.h"
#include "paimon/format/parquet/parquet_field_id_converter.h"
#include "paimon/format/parquet/parquet_format_defs.h"
#include "paimon/format/parquet/parquet_timestamp_converter.h"
Expand Down Expand Up @@ -129,6 +130,13 @@ Status ParquetFileBatchReader::SetReadSchema(

PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema, reader_->GetSchema());
std::unordered_map<std::string, std::vector<int32_t>> field_index_map;
bool has_nested_field = false;
for (const auto& field : read_schema->fields()) {
if (ArrowSchemaValidator::IsNestedType(field->type())) {
has_nested_field = true;
break;
}
}
int32_t i = 0;
for (const auto& field : file_schema->fields()) {
std::vector<int32_t> v;
Expand Down Expand Up @@ -166,7 +174,9 @@ Status ParquetFileBatchReader::SetReadSchema(
bool enable_page_index_filter,
OptionsUtils::GetValueFromMap<bool>(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER,
DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER));
if (enable_page_index_filter) {
// walkaround: page index filter does not support nested fields for now, skip page index
// filter if there is any nested field in the schema
if (enable_page_index_filter && !has_nested_field) {
// Build column name to index map for page-level filtering.
// For leaf columns, indices[0] is the correct leaf column index in Parquet.
// For nested types (struct/list/map), FlattenSchema produces multiple leaf indices,
Expand Down