Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/paimon/format/avro/avro_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,27 @@ Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema,
if (selection_bitmap) {
// TODO(menglingda.mld): support bitmap
}
previous_first_row_ = std::numeric_limits<uint64_t>::max();
next_row_to_read_ = std::numeric_limits<uint64_t>::max();
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> arrow_read_schema,
arrow::ImportSchema(read_schema));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema,
ArrowUtils::DataTypeToSchema(file_data_type_));
PAIMON_ASSIGN_OR_RAISE(read_fields_projection_,
PAIMON_ASSIGN_OR_RAISE(std::set<size_t> read_fields_projection,
CalculateReadFieldsProjection(file_schema, arrow_read_schema->fields()));
array_builder_->Reset();
std::shared_ptr<::arrow::DataType> read_data_type = arrow::struct_(arrow_read_schema->fields());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(array_builder_,
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr<arrow::ArrayBuilder> array_builder,
arrow::MakeBuilder(read_data_type, arrow_pool_.get()));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader,
CreateDataFileReader(input_stream_, pool_));

if (reader_) {
reader_->close();
}
reader_ = std::move(reader);
read_fields_projection_ = std::move(read_fields_projection);
array_builder_ = std::move(array_builder);
previous_first_row_ = std::numeric_limits<uint64_t>::max();
next_row_to_read_ = std::numeric_limits<uint64_t>::max();
close_ = false;
Comment on lines +154 to +165
return Status::OK();
}

Expand Down
49 changes: 49 additions & 0 deletions src/paimon/format/avro/avro_file_batch_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,55 @@ TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) {
ASSERT_TRUE(BatchReader::IsEofBatch(batch5));
}

TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) {
std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro");

arrow::FieldVector fields = {
arrow::field("f0", arrow::int32()),
arrow::field("f1", arrow::int32()),
};
auto file_data_type = arrow::struct_(fields);
auto src_array = arrow::ipc::internal::json::ArrayFromJSON(file_data_type, R"([
[1, 10],
[2, 20],
[3, 30],
[4, 40]
])")
.ValueOrDie();
WriteData(src_array, file_path, /*compression=*/"null");

ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_->CreateReaderBuilder(/*batch_size=*/2));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file_path));
ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in));

ASSERT_OK_AND_ASSIGN(auto first_batch, reader->NextBatch());
ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value());
auto first_array =
arrow::ImportArray(first_batch.first.get(), first_batch.second.get()).ValueOrDie();
ASSERT_TRUE(first_array->Equals(src_array->Slice(0, 2))) << first_array->ToString();

auto read_schema = arrow::schema({arrow::field("f1", arrow::int32())});
std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok());
ASSERT_OK(reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr,
/*selection_bitmap=*/std::nullopt));
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
reader->GetPreviousBatchFirstRowNumber().value());

ASSERT_OK_AND_ASSIGN(auto projected_batch, reader->NextBatch());
ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value());
auto projected_array =
arrow::ImportArray(projected_batch.first.get(), projected_batch.second.get()).ValueOrDie();
auto expected_projected_array = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({arrow::field("f1", arrow::int32())}),
R"([
[10],
[20]
])")
.ValueOrDie();
ASSERT_TRUE(projected_array->Equals(expected_projected_array)) << projected_array->ToString();
}

TEST_F(AvroFileBatchReaderTest, TestGetNumberOfRows) {
std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro");

Expand Down