diff --git a/src/paimon/format/avro/avro_file_batch_reader.cpp b/src/paimon/format/avro/avro_file_batch_reader.cpp index 13833f97..28dde43f 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader.cpp @@ -142,18 +142,27 @@ Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, if (selection_bitmap) { // TODO(menglingda.mld): support bitmap } - previous_first_row_ = std::numeric_limits::max(); - next_row_to_read_ = std::numeric_limits::max(); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_read_schema, arrow::ImportSchema(read_schema)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_schema, ArrowUtils::DataTypeToSchema(file_data_type_)); - PAIMON_ASSIGN_OR_RAISE(read_fields_projection_, + PAIMON_ASSIGN_OR_RAISE(std::set read_fields_projection, CalculateReadFieldsProjection(file_schema, arrow_read_schema->fields())); - array_builder_->Reset(); std::shared_ptr<::arrow::DataType> read_data_type = arrow::struct_(arrow_read_schema->fields()); - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(array_builder_, + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, arrow::MakeBuilder(read_data_type, arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader, + CreateDataFileReader(input_stream_, pool_)); + + if (reader_) { + reader_->close(); + } + reader_ = std::move(reader); + read_fields_projection_ = std::move(read_fields_projection); + array_builder_ = std::move(array_builder); + previous_first_row_ = std::numeric_limits::max(); + next_row_to_read_ = std::numeric_limits::max(); + close_ = false; return Status::OK(); } diff --git a/src/paimon/format/avro/avro_file_batch_reader_test.cpp b/src/paimon/format/avro/avro_file_batch_reader_test.cpp index a8ed3bb6..30d30382 100644 --- a/src/paimon/format/avro/avro_file_batch_reader_test.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader_test.cpp @@ -343,6 +343,55 @@ TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { ASSERT_TRUE(BatchReader::IsEofBatch(batch5)); } +TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) { + std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro"); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::int32()), + }; + auto file_data_type = arrow::struct_(fields); + auto src_array = arrow::ipc::internal::json::ArrayFromJSON(file_data_type, R"([ + [1, 10], + [2, 20], + [3, 30], + [4, 40] + ])") + .ValueOrDie(); + WriteData(src_array, file_path, /*compression=*/"null"); + + ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_->CreateReaderBuilder(/*batch_size=*/2)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_path)); + ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in)); + + ASSERT_OK_AND_ASSIGN(auto first_batch, reader->NextBatch()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + auto first_array = + arrow::ImportArray(first_batch.first.get(), first_batch.second.get()).ValueOrDie(); + ASSERT_TRUE(first_array->Equals(src_array->Slice(0, 2))) << first_array->ToString(); + + auto read_schema = arrow::schema({arrow::field("f1", arrow::int32())}); + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + ASSERT_OK(reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + ASSERT_EQ(std::numeric_limits::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + + ASSERT_OK_AND_ASSIGN(auto projected_batch, reader->NextBatch()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + auto projected_array = + arrow::ImportArray(projected_batch.first.get(), projected_batch.second.get()).ValueOrDie(); + auto expected_projected_array = arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_({arrow::field("f1", arrow::int32())}), + R"([ + [10], + [20] + ])") + .ValueOrDie(); + ASSERT_TRUE(projected_array->Equals(expected_projected_array)) << projected_array->ToString(); +} + TEST_F(AvroFileBatchReaderTest, TestGetNumberOfRows) { std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro");