diff --git a/src/paimon/format/avro/avro_file_batch_reader.cpp b/src/paimon/format/avro/avro_file_batch_reader.cpp new file mode 100644 index 0000000..d2d2f53 --- /dev/null +++ b/src/paimon/format/avro/avro_file_batch_reader.cpp @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_file_batch_reader.h" + +#include +#include + +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/format/avro/avro_input_stream_impl.h" +#include "paimon/format/avro/avro_schema_converter.h" +#include "paimon/reader/batch_reader.h" + +namespace paimon::avro { + +AvroFileBatchReader::AvroFileBatchReader(const std::shared_ptr& input_stream, + const std::shared_ptr<::arrow::DataType>& file_data_type, + std::unique_ptr<::avro::DataFileReaderBase>&& reader, + std::unique_ptr&& array_builder, + std::unique_ptr&& arrow_pool, + int32_t batch_size, + const std::shared_ptr& pool) + : pool_(pool), + arrow_pool_(std::move(arrow_pool)), + input_stream_(input_stream), + file_data_type_(file_data_type), + reader_(std::move(reader)), + array_builder_(std::move(array_builder)), + batch_size_(batch_size), + metrics_(std::make_shared()) {} + +AvroFileBatchReader::~AvroFileBatchReader() { + DoClose(); +} + +void AvroFileBatchReader::DoClose() { + if (!close_) { + reader_->close(); + close_ = true; + } +} + +Result> AvroFileBatchReader::Create( + const std::shared_ptr& input_stream, int32_t batch_size, + const std::shared_ptr& pool) { + if (batch_size <= 0) { + return Status::Invalid( + fmt::format("invalid batch size {}, must be larger than 0", batch_size)); + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader, + CreateDataFileReader(input_stream, pool)); + const auto& avro_file_schema = reader->dataSchema(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> file_data_type, + AvroSchemaConverter::AvroSchemaToArrowDataType(avro_file_schema)); + auto arrow_pool = GetArrowPool(pool); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, + arrow::MakeBuilder(file_data_type, arrow_pool.get())); + return std::unique_ptr( + new AvroFileBatchReader(input_stream, file_data_type, std::move(reader), + std::move(array_builder), std::move(arrow_pool), batch_size, pool)); +} + +Result> AvroFileBatchReader::CreateDataFileReader( + const std::shared_ptr& input_stream, const std::shared_ptr& pool) { + PAIMON_RETURN_NOT_OK(input_stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + try { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::InputStream> in, + AvroInputStreamImpl::Create(input_stream, BUFFER_SIZE, pool)); + auto reader = std::make_unique<::avro::DataFileReaderBase>(std::move(in)); + reader->init(); + return reader; + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("build avro reader failed. {}", e.what())); + } catch (...) { + return Status::Invalid("build avro reader failed. unknown error"); + } +} + +Result AvroFileBatchReader::NextBatch() { + if (next_row_to_read_ == std::numeric_limits::max()) { + next_row_to_read_ = 0; + } + try { + while (array_builder_->length() < batch_size_) { + if (!reader_->hasMore()) { + break; + } + reader_->decr(); + PAIMON_RETURN_NOT_OK(AvroDirectDecoder::DecodeAvroToBuilder( + reader_->dataSchema().root(), read_fields_projection_, &reader_->decoder(), + array_builder_.get(), &decode_context_)); + } + previous_first_row_ = next_row_to_read_; + next_row_to_read_ += array_builder_->length(); + if (array_builder_->length() == 0) { + return BatchReader::MakeEofBatch(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, + array_builder_->Finish()); + std::unique_ptr c_array = std::make_unique(); + std::unique_ptr c_schema = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + return make_pair(std::move(c_array), std::move(c_schema)); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro reader next batch failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro reader next batch failed. {}", e.what())); + } catch (...) { + return Status::Invalid("avro reader next batch failed. unknown error"); + } +} + +Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, + const std::shared_ptr& predicate, + const std::optional& selection_bitmap) { + if (!read_schema) { + return Status::Invalid("SetReadSchema failed: read schema cannot be nullptr"); + } + // TODO(menglingda.mld): support predicate + if (selection_bitmap) { + // TODO(menglingda.mld): support bitmap + } + previous_first_row_ = std::numeric_limits::max(); + next_row_to_read_ = std::numeric_limits::max(); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_read_schema, + arrow::ImportSchema(read_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_schema, + ArrowUtils::DataTypeToSchema(file_data_type_)); + PAIMON_ASSIGN_OR_RAISE(read_fields_projection_, + CalculateReadFieldsProjection(file_schema, arrow_read_schema->fields())); + array_builder_->Reset(); + std::shared_ptr<::arrow::DataType> read_data_type = arrow::struct_(arrow_read_schema->fields()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(array_builder_, + arrow::MakeBuilder(read_data_type, arrow_pool_.get())); + return Status::OK(); +} + +Result> AvroFileBatchReader::CalculateReadFieldsProjection( + const std::shared_ptr<::arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) { + std::set projection_set; + PAIMON_ASSIGN_OR_RAISE(std::vector projection, + ArrowUtils::CreateProjection(file_schema, read_fields)); + int32_t prev_index = -1; + for (auto& index : projection) { + if (index <= prev_index) { + return Status::Invalid( + "SetReadSchema failed: read schema fields order is different from file schema"); + } + prev_index = index; + projection_set.insert(index); + } + return projection_set; +} + +Result> AvroFileBatchReader::GetFileSchema() const { + assert(reader_); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*file_data_type_, c_schema.get())); + return c_schema; +} + +Result AvroFileBatchReader::GetNumberOfRows() const { + if (!total_rows_) { + PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos()); + ScopeGuard stream_guard([this, current_pos]() -> void { + // reset input stream position to original position + Status status = input_stream_->Seek(current_pos, SeekOrigin::FS_SEEK_SET); + (void)status; + }); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::avro::DataFileReaderBase> reader, + CreateDataFileReader(input_stream_, pool_)); + ScopeGuard reader_guard([&reader]() -> void { reader->close(); }); + try { + while (reader->hasMore()) { + reader->decr(); + total_rows_ = total_rows_.value_or(0) + 1; + } + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro reader GetNumberOfRows failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro reader GetNumberOfRows failed. {}", e.what())); + } catch (...) { + return Status::Invalid("avro reader GetNumberOfRows failed. unknown error"); + } + } + return *total_rows_; +} + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h new file mode 100644 index 0000000..e54a936 --- /dev/null +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "avro/DataFile.hh" +#include "paimon/format/avro/avro_direct_decoder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/reader/file_batch_reader.h" +#include "paimon/result.h" + +namespace paimon::avro { + +class AvroFileBatchReader : public FileBatchReader { + public: + static Result> Create( + const std::shared_ptr& input_stream, int32_t batch_size, + const std::shared_ptr& pool); + + ~AvroFileBatchReader() override; + + Result NextBatch() override; + + Result> GetFileSchema() const override; + + Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, + const std::optional& selection_bitmap) override; + + Result GetPreviousBatchFirstRowNumber() const override { + return previous_first_row_; + } + + Result GetNumberOfRows() const override; + + std::shared_ptr GetReaderMetrics() const override { + return metrics_; + } + + void Close() override { + DoClose(); + } + + bool SupportPreciseBitmapSelection() const override { + return false; + } + + private: + void DoClose(); + + static Result> CreateDataFileReader( + const std::shared_ptr& input_stream, const std::shared_ptr& pool); + + static Result> CalculateReadFieldsProjection( + const std::shared_ptr<::arrow::Schema>& file_schema, const arrow::FieldVector& read_fields); + + AvroFileBatchReader(const std::shared_ptr& input_stream, + const std::shared_ptr<::arrow::DataType>& file_data_type, + std::unique_ptr<::avro::DataFileReaderBase>&& reader, + std::unique_ptr&& array_builder, + std::unique_ptr&& arrow_pool, int32_t batch_size, + const std::shared_ptr& pool); + + static constexpr size_t BUFFER_SIZE = 1024 * 1024; // 1M + + std::shared_ptr pool_; + std::unique_ptr arrow_pool_; + std::shared_ptr input_stream_; + std::shared_ptr<::arrow::DataType> file_data_type_; + std::unique_ptr<::avro::DataFileReaderBase> reader_; + std::unique_ptr array_builder_; + std::optional> read_fields_projection_; + uint64_t previous_first_row_ = std::numeric_limits::max(); + uint64_t next_row_to_read_ = std::numeric_limits::max(); + mutable std::optional total_rows_ = std::nullopt; + const int32_t batch_size_; + bool close_ = false; + std::shared_ptr metrics_; + // Decode context for reusing scratch buffers + AvroDirectDecoder::DecodeContext decode_context_; +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_file_batch_reader_test.cpp b/src/paimon/format/avro/avro_file_batch_reader_test.cpp new file mode 100644 index 0000000..fa50679 --- /dev/null +++ b/src/paimon/format/avro/avro_file_batch_reader_test.cpp @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_file_batch_reader.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/testing/utils/timezone_guard.h" + +namespace paimon::avro::test { + +class AvroFileBatchReaderTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override { + ASSERT_OK_AND_ASSIGN(file_format_, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); + fs_ = std::make_shared(); + dir_ = ::paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + pool_ = GetDefaultPool(); + } + void TearDown() override {} + + void WriteData(const std::shared_ptr& src_array, const std::string& file_path, + const std::string& compression) { + arrow::Schema src_schema(src_array->type()->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(src_schema, &c_schema).ok()); + ASSERT_OK_AND_ASSIGN(auto writer_builder, + file_format_->CreateWriterBuilder(&c_schema, /*batch_size=*/-1)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/false)); + ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, compression)); + + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*src_array, &arrow_array).ok()); + ASSERT_OK(writer->AddBatch(&arrow_array)); + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + } + + std::pair, std::shared_ptr> ReadData( + const std::string& file_path, int32_t read_batch_size) { + EXPECT_OK_AND_ASSIGN(auto reader_builder, + file_format_->CreateReaderBuilder(read_batch_size)); + EXPECT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_path)); + EXPECT_OK_AND_ASSIGN(auto batch_reader, reader_builder->Build(in)); + EXPECT_OK_AND_ASSIGN(auto result_array, ::paimon::test::ReadResultCollector::CollectResult( + batch_reader.get())); + return std::make_pair(std::move(batch_reader), result_array); + } + + private: + std::shared_ptr pool_; + std::shared_ptr file_format_; + std::shared_ptr fs_; + std::unique_ptr dir_; +}; + +TEST_F(AvroFileBatchReaderTest, TestReadDataWithNull) { + std::string path = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; + auto [reader_holder, result_array] = ReadData(path, /*read_batch_size=*/1024); + + arrow::FieldVector fields = { + arrow::field("_KEY_f0", arrow::utf8(), /*nullable=*/true), + arrow::field("_SEQUENCE_NUMBER", arrow::int64(), /*nullable=*/true), + arrow::field("_VALUE_KIND", arrow::int32(), /*nullable=*/true), + arrow::field("f0", arrow::utf8(), /*nullable=*/true), + arrow::field("f1", arrow::utf8(), /*nullable=*/true), + arrow::field("f2", arrow::int32(), /*nullable=*/true), + arrow::field("f3", arrow::float64(), /*nullable=*/true)}; + + auto arrow_data_type = arrow::struct_(fields); + + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([ + ["Alex", 2, 3, "Alex", "20250326", 18, 10.1], + ["Bob", 3, 3, "Bob", "20250326", 19, 11.1], + ["Evan", 1, 0, "Evan", "20250326", null, 14.1] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)); + ASSERT_TRUE(expected_array->Equals(result_array)); + auto read_metrics = reader_holder->GetReaderMetrics(); + ASSERT_TRUE(read_metrics); +} + +TEST_F(AvroFileBatchReaderTest, TestReadWithDifferentBatchSize) { + std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro"); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int64()), arrow::field("f3", arrow::float32()), + arrow::field("f4", arrow::float64()), arrow::field("f5", arrow::utf8()), + arrow::field("f6", arrow::binary())}; + auto arrow_data_type = arrow::struct_(fields); + + size_t length = 600; + std::string data_str = "["; + for (size_t i = 0; i < length; i++) { + if (i % 3 == 0) { + data_str.append(fmt::format(R"([{}, {}, {}, {}, {}, "str_{}", "bin_{}"])", "true", i, + i * 100000000000L, i * 0.12, i * 123.45678901, i, i)); + } else if (i % 3 == 1) { + data_str.append(fmt::format(R"([{}, -{}, -{}, -{}, -{}, "string_{}", "binary_{}"])", + "false", i, i * 100000000000L, i * 0.12, i * 123.45678901, + i, i)); + } else { + data_str.append("[null, null, null, null, null, null, null]"); + } + if (i != length - 1) { + data_str.append(","); + } + } + data_str.append("]"); + + std::shared_ptr src_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type, data_str).ValueOrDie(); + ASSERT_TRUE(src_array); + WriteData(src_array, file_path, /*compression=*/"zstd"); + + for (int32_t batch_size : {1024, 512, 256, 128, 64, 32, 16, 8, 4, 2, 1}) { + auto [reader_holder, result_array] = ReadData(file_path, batch_size); + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON( + arrow_data_type, {data_str}, &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)); + ASSERT_TRUE(expected_array->Equals(result_array)); + } +} + +TEST_F(AvroFileBatchReaderTest, TestReadAllTypes) { + std::string path = paimon::test::GetDataDir() + "/avro/data/avro_all_types"; + auto [reader_holder, result_array] = ReadData(path, /*read_batch_size=*/1024); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), + arrow::field("f3", arrow::int32()), + arrow::field("f4", arrow::int64()), + arrow::field("f5", arrow::float32()), + arrow::field("f6", arrow::float64()), + arrow::field("f7", arrow::utf8()), + arrow::field("f8", arrow::binary()), + arrow::field("f10", arrow::list(arrow::float32())), + arrow::field("f11", arrow::struct_({arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int64())})), + arrow::field("f12", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("f13", arrow::date32()), + arrow::field("f14", arrow::decimal128(2, 2)), + arrow::field("f15", arrow::decimal128(10, 10)), + arrow::field("f16", arrow::decimal128(19, 19))}; + + auto arrow_data_type = arrow::struct_(fields); + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([ + [true, 127, 32767, 2147483647, 9999999999999, 1234.56, 1234567890.0987654321, "aa", "qq", [0.1, 0.2], [true, null], "1970-01-01 00:02:03.123123", 2456, "0.22", "0.1234567890", "0.1234567890987654321"], + [false, -128, -32768, -2147483648, -9999999999999, -1234.56, -1234567890.0987654321, null, "ww", [-0.1, -0.2, null, 0.3, 0.4], [null, 2], "1970-01-01 00:16:39.999999", null, "-0.22", "-0.1234567890", null], + [null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); + ASSERT_TRUE(expected_array->Equals(result_array)) << result_array->ToString(); +} + +TEST_P(AvroFileBatchReaderTest, TestReadTimestampTypes) { + auto enable_tz = GetParam(); + std::string timezone_str = enable_tz ? "Asia/Tokyo" : "Asia/Shanghai"; + paimon::test::TimezoneGuard tz_guard(timezone_str); + + std::string path = paimon::test::GetDataDir() + + "/avro/append_with_multiple_ts_precision_and_timezone.db/" + "append_with_multiple_ts_precision_and_timezone/bucket-0/" + "data-441e233b-529d-4a8f-a0a4-25c2c84fb965-0.avro"; + + ASSERT_OK_AND_ASSIGN(auto reader_builder, + file_format_->CreateReaderBuilder(/*batch_size=*/1024)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(path)); + ASSERT_OK_AND_ASSIGN(auto batch_reader, reader_builder->Build(in)); + + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + arrow::FieldVector read_fields = { + arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, timezone)), + arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, timezone)), + arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, timezone)), + }; + auto read_schema = arrow::schema(read_fields); + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + EXPECT_OK(batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + + // check array + ASSERT_OK_AND_ASSIGN(auto result_array, + ::paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); + std::shared_ptr expected_array; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow::struct_(read_fields), {R"([ + ["1970-01-01T00:00:01","1970-01-01T00:00:00.001","1970-01-01T00:00:00.000001","1970-01-01T00:00:02","1970-01-01T00:00:00.002","1970-01-01T00:00:00.000002"], + [null,"1970-01-01T00:00:00.003",null,null,"1970-01-01T00:00:00.004",null], + ["1970-01-01T00:00:05",null,"1970-01-01T00:00:00.000005","1970-01-01T00:00:06",null,"1970-01-01T00:00:00.000006"] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); + ASSERT_TRUE(expected_array->Equals(result_array)); +} + +TEST_F(AvroFileBatchReaderTest, TestReadMapTypes) { + std::string path = paimon::test::GetDataDir() + + "/avro/append_with_multiple_map.db/" + "append_with_multiple_map/bucket-0/" + "data-72442742-e49e-48a4-a736-a2475aac2d2c-0.avro"; + + ASSERT_OK_AND_ASSIGN(auto reader_builder, + file_format_->CreateReaderBuilder(/*batch_size=*/1024)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(path)); + ASSERT_OK_AND_ASSIGN(auto batch_reader, reader_builder->Build(in)); + + arrow::FieldVector read_fields = { + arrow::field("f0", arrow::map(arrow::int32(), arrow::int32())), + arrow::field("f1", arrow::map(arrow::float64(), arrow::float64())), + arrow::field("f2", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("f3", arrow::map(arrow::utf8(), arrow::binary())), + arrow::field("f4", arrow::map(arrow::timestamp(arrow::TimeUnit::MICRO), + arrow::timestamp(arrow::TimeUnit::MICRO))), + arrow::field("f5", arrow::map(arrow::utf8(), arrow::list(arrow::float64()))), + arrow::field("f6", arrow::map(arrow::utf8(), arrow::map(arrow::float64(), arrow::utf8()))), + arrow::field("f7", arrow::map(arrow::int64(), + arrow::struct_({field("f0", arrow::int32()), + field("f1", arrow::utf8()), + field("f2", arrow::decimal128(5, 2))})))}; + auto read_schema = arrow::schema(read_fields); + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + EXPECT_OK(batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + + // check array + ASSERT_OK_AND_ASSIGN(auto result_array, + ::paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); + std::shared_ptr expected_array; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow::struct_(read_fields), {R"([ + [ + [[1,10],[2,20]], + [[1.1,10.1],[2.2,20.2]], + [["key1","val1"],["key2","val2"]], + [["123456","abcdef"]], + [["2023-01-01 12:00:00.123000","2023-01-01 12:00:00.123000"],["2023-01-02 13:30:00.456000","2023-01-02 13:30:00.456000"]], + [["arr_key",[1.5, 2.5, 3.5]]], + [["outer_key",[[99.9,"nested_val"]]]], + [[1000, [42, "row_str", "123.45"]]] + ] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()) << array_status.ToString(); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString() << std::endl; + ASSERT_TRUE(expected_array->Equals(result_array)); +} + +TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { + std::string path = paimon::test::GetDataDir() + + "/avro/append_simple.db/" + "append_simple/bucket-0/" + "data-d7d1c416-6e34-4834-af87-341d09418f0c-0.avro"; + + ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_->CreateReaderBuilder(/*batch_size=*/1)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(path)); + ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in)); + + arrow::FieldVector read_fields = { + arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::float64()), + arrow::field("f2", arrow::utf8()), + arrow::field("f3", + arrow::struct_({arrow::field("f0", arrow::map(arrow::utf8(), arrow::int32())), + arrow::field("f1", arrow::list(arrow::int32()))}))}; + + auto read_schema = arrow::schema(read_fields); + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + EXPECT_OK(reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto num_rows, reader->GetNumberOfRows()); + ASSERT_EQ(4, num_rows); + ASSERT_EQ(std::numeric_limits::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch4.first.get()); + ArrowSchemaRelease(batch4.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch5, reader->NextBatch()); + ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch5)); +} + +TEST_F(AvroFileBatchReaderTest, TestGetNumberOfRows) { + std::string file_path = PathUtil::JoinPath(dir_->Str(), "file.avro"); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int64()), arrow::field("f3", arrow::float32()), + arrow::field("f4", arrow::float64()), arrow::field("f5", arrow::utf8()), + arrow::field("f6", arrow::binary())}; + auto arrow_data_type = arrow::struct_(fields); + + size_t length = 102400; + std::string data_str = "["; + for (size_t i = 0; i < length; i++) { + data_str.append(fmt::format(R"([{}, {}, {}, {}, {}, "str_{}", "bin_{}"])", "true", i, + i * 100000000000L, i * 0.12, i * 123.45678901, i, i)); + if (i != length - 1) { + data_str.append(","); + } + } + data_str.append("]"); + + std::shared_ptr src_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow_data_type, data_str).ValueOrDie(); + ASSERT_TRUE(src_array); + WriteData(src_array, file_path, /*compression=*/"null"); + + ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_->CreateReaderBuilder(25600)); + + // check GetNumberOfRows can be called at any position, and continue read + int32_t expected_batches = 4; + for (int32_t pos = 0; pos < expected_batches; pos++) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_path)); + ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in)); + + arrow::ArrayVector result_array_vector; + for (int32_t i = 0; i < pos; i++) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + auto result_array = + arrow::ImportArray(batch.first.get(), batch.second.get()).ValueOrDie(); + result_array_vector.push_back(result_array); + } + // check number of rows, and continue read + ASSERT_OK_AND_ASSIGN(auto num_rows, reader->GetNumberOfRows()); + ASSERT_EQ(length, num_rows); + for (int32_t i = pos; i < expected_batches; i++) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + auto result_array = + arrow::ImportArray(batch.first.get(), batch.second.get()).ValueOrDie(); + result_array_vector.push_back(result_array); + } + ASSERT_OK_AND_ASSIGN(auto eof_batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(eof_batch)); + ASSERT_OK_AND_ASSIGN(num_rows, reader->GetNumberOfRows()); + ASSERT_EQ(length, num_rows); + + auto result_array = arrow::ChunkedArray(result_array_vector); + ASSERT_TRUE(result_array.Equals(arrow::ChunkedArray(src_array))); + } +} + +INSTANTIATE_TEST_SUITE_P(TestParam, AvroFileBatchReaderTest, ::testing::Values(false, true)); + +} // namespace paimon::avro::test diff --git a/src/paimon/format/avro/avro_format_writer.cpp b/src/paimon/format/avro/avro_format_writer.cpp new file mode 100644 index 0000000..e4ad454 --- /dev/null +++ b/src/paimon/format/avro/avro_format_writer.cpp @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_format_writer.h" + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "avro/Compiler.hh" // IWYU pragma: keep +#include "avro/DataFile.hh" +#include "avro/Exception.hh" +#include "avro/Generic.hh" // IWYU pragma: keep +#include "avro/Specific.hh" // IWYU pragma: keep +#include "avro/ValidSchema.hh" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/avro/avro_schema_converter.h" + +namespace arrow { +class Array; +} // namespace arrow +struct ArrowArray; + +namespace paimon::avro { + +AvroFormatWriter::AvroFormatWriter(std::unique_ptr<::avro::DataFileWriterBase>&& file_writer, + const ::avro::ValidSchema& avro_schema, + const std::shared_ptr& data_type, + AvroOutputStreamImpl* avro_output_stream) + : writer_(std::move(file_writer)), + avro_schema_(avro_schema), + data_type_(data_type), + metrics_(std::make_shared()), + avro_output_stream_(avro_output_stream) {} + +Result> AvroFormatWriter::Create( + std::unique_ptr out, const std::shared_ptr& schema, + const ::avro::Codec codec, std::optional compression_level) { + try { + PAIMON_ASSIGN_OR_RAISE(::avro::ValidSchema avro_schema, + AvroSchemaConverter::ArrowSchemaToAvroSchema(schema)); + AvroOutputStreamImpl* avro_output_stream = out.get(); + auto writer = std::make_unique<::avro::DataFileWriterBase>( + std::move(out), avro_schema, DEFAULT_SYNC_INTERVAL, codec, ::avro::Metadata(), + compression_level); + auto data_type = arrow::struct_(schema->fields()); + return std::unique_ptr( + new AvroFormatWriter(std::move(writer), avro_schema, data_type, avro_output_stream)); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro format writer create failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro format writer create failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro format writer create failed: unknown exception"); + } +} + +Status AvroFormatWriter::Flush() { + try { + writer_->flush(); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer flush failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer flush failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer flush failed: unknown exception"); + } + + return Status::OK(); +} + +Status AvroFormatWriter::Finish() { + try { + avro_output_stream_->FlushBuffer(); // we need flush buffer before close writer + writer_->close(); + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer close failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer close failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer close failed: unknown exception"); + } + return Status::OK(); +} + +Result AvroFormatWriter::ReachTargetSize(bool suggested_check, int64_t target_size) const { + if (suggested_check) { + uint64_t current_size = writer_->getCurrentBlockStart(); + return current_size >= static_cast(target_size); + } + return false; +} + +Status AvroFormatWriter::AddBatch(ArrowArray* batch) { + assert(batch); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(batch, data_type_)); + try { + for (int64_t row_index = 0; row_index < arrow_array->length(); ++row_index) { + writer_->syncIfNeeded(); + PAIMON_RETURN_NOT_OK(AvroDirectEncoder::EncodeArrowToAvro( + avro_schema_.root(), *arrow_array, row_index, &writer_->encoder(), &encode_ctx_)); + writer_->incr(); + } + } catch (const ::avro::Exception& e) { + return Status::Invalid(fmt::format("avro writer add batch failed. {}", e.what())); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("avro writer add batch failed: {}", e.what())); + } catch (...) { + return Status::Invalid("avro writer add batch failed: unknown exception"); + } + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); +} + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_format_writer.h b/src/paimon/format/avro/avro_format_writer.h new file mode 100644 index 0000000..64427c3 --- /dev/null +++ b/src/paimon/format/avro/avro_format_writer.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "avro/DataFile.hh" +#include "avro/ValidSchema.hh" +#include "paimon/format/avro/avro_direct_encoder.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/format_writer.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +class Schema; +} // namespace arrow +namespace avro { +class GenericDatum; +} // namespace avro +namespace paimon { +class Metrics; +} // namespace paimon +struct ArrowArray; + +namespace paimon::avro { + +/// A `FormatWriter` implementation that writes data in Avro format. +class AvroFormatWriter : public FormatWriter { + public: + static Result> Create( + std::unique_ptr out, const std::shared_ptr& schema, + const ::avro::Codec codec, std::optional compression_level); + + Status AddBatch(ArrowArray* batch) override; + + Status Flush() override; + + Status Finish() override; + + Result ReachTargetSize(bool suggested_check, int64_t target_size) const override; + + std::shared_ptr GetWriterMetrics() const override { + return metrics_; + } + + private: + static constexpr size_t DEFAULT_SYNC_INTERVAL = 64 * 1024; + + AvroFormatWriter(std::unique_ptr<::avro::DataFileWriterBase>&& file_writer, + const ::avro::ValidSchema& avro_schema, + const std::shared_ptr& data_type, + AvroOutputStreamImpl* avro_output_stream); + + std::unique_ptr<::avro::DataFileWriterBase> writer_; + ::avro::ValidSchema avro_schema_; + std::shared_ptr data_type_; + std::shared_ptr metrics_; + AvroOutputStreamImpl* avro_output_stream_; + // Encode context for reusing scratch buffers + AvroDirectEncoder::EncodeContext encode_ctx_; +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_format_writer_test.cpp b/src/paimon/format/avro/avro_format_writer_test.cpp new file mode 100644 index 0000000..965c183 --- /dev/null +++ b/src/paimon/format/avro/avro_format_writer_test.cpp @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_format_writer.h" + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/io/file.h" +#include "arrow/memory_pool.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/format/avro/avro_file_batch_reader.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/record_batch.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +class AvroFormatWriterTest : public ::testing::Test { + public: + void SetUp() override { + dir_ = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + fs_ = std::make_shared(); + pool_ = GetDefaultPool(); + arrow_pool_ = GetArrowPool(pool_); + } + void TearDown() override {} + + std::pair, std::shared_ptr> PrepareArrowSchema() + const { + auto string_field = arrow::field("col1", arrow::utf8()); + auto int_field = arrow::field("col2", arrow::int32()); + auto bool_field = arrow::field("col3", arrow::boolean()); + auto struct_type = arrow::struct_({string_field, int_field, bool_field}); + return std::make_pair( + arrow::schema(arrow::FieldVector({string_field, int_field, bool_field})), struct_type); + } + + std::shared_ptr CreateFormatWriter(const std::shared_ptr& schema, + const std::shared_ptr& out, + int32_t batch_size) { + ::ArrowSchema c_schema; + EXPECT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + EXPECT_OK_AND_ASSIGN(auto file_format, + FileFormatFactory::Get("avro", {{Options::FILE_FORMAT, "avro"}})); + EXPECT_OK_AND_ASSIGN(auto writer_builder, + file_format->CreateWriterBuilder(&c_schema, batch_size)); + EXPECT_OK_AND_ASSIGN(std::shared_ptr writer, + writer_builder->Build(out, "zstd")); + return writer; + } + + std::shared_ptr PrepareArray(const std::shared_ptr& data_type, + int32_t record_batch_size, + int32_t offset = 0) const { + arrow::StructBuilder struct_builder( + data_type, arrow::default_memory_pool(), + {std::make_shared(), std::make_shared(), + std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + auto int_builder = static_cast(struct_builder.field_builder(1)); + auto bool_builder = static_cast(struct_builder.field_builder(2)); + for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) { + EXPECT_TRUE(struct_builder.Append().ok()); + EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok()); + if (i % 3 == 0) { + // test null + EXPECT_TRUE(int_builder->AppendNull().ok()); + } else { + EXPECT_TRUE(int_builder->Append(i).ok()); + } + EXPECT_TRUE(bool_builder->Append(static_cast(i % 2)).ok()); + } + std::shared_ptr array; + EXPECT_TRUE(struct_builder.Finish(&array).ok()); + return array; + } + + void AddRecordBatchOnce(const std::shared_ptr& format_writer, + const std::shared_ptr& struct_type, + int32_t record_batch_size, int32_t offset) const { + auto array = PrepareArray(struct_type, record_batch_size, offset); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + auto batch = std::make_shared( + /*partition=*/std::map(), /*bucket=*/-1, + /*row_kinds=*/std::vector(), arrow_array.get()); + ASSERT_OK(format_writer->AddBatch(batch->GetData())); + } + + void CheckResult(const std::string& file_path, int32_t row_count) const { + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, fs_->Open(file_path)); + ASSERT_OK_AND_ASSIGN(auto file_reader, + AvroFileBatchReader::Create(input_stream, 1024, pool_)); + ASSERT_OK_AND_ASSIGN(uint64_t num_rows, file_reader->GetNumberOfRows()); + ASSERT_EQ(num_rows, row_count); + + ASSERT_OK_AND_ASSIGN(auto result_array, + ::paimon::test::ReadResultCollector::CollectResult(file_reader.get())); + const auto& struct_array = + std::static_pointer_cast(result_array->chunk(0)); + const auto& string_array = + std::static_pointer_cast(struct_array->field(0)); + ASSERT_TRUE(string_array); + const auto& int_array = std::static_pointer_cast(struct_array->field(1)); + ASSERT_TRUE(int_array); + const auto& bool_array = + std::static_pointer_cast(struct_array->field(2)); + ASSERT_TRUE(bool_array); + ASSERT_EQ(string_array->null_count(), 0); + ASSERT_EQ(int_array->null_count(), (row_count - 1) / 3 + 1); + ASSERT_EQ(bool_array->null_count(), 0); + + for (int32_t i = 0; i < row_count; i++) { + ASSERT_EQ("str_" + std::to_string(i), string_array->GetString(i)); + if (i % 3 == 0) { + ASSERT_TRUE(int_array->IsNull(i)); + } else { + ASSERT_FALSE(int_array->IsNull(i)); + ASSERT_EQ(i, int_array->Value(i)); + } + if (i % 2 == 0) { + ASSERT_EQ(false, bool_array->Value(i)); + } else { + ASSERT_EQ(true, bool_array->Value(i)); + } + } + } + + private: + std::unique_ptr dir_; + std::shared_ptr fs_; + std::shared_ptr pool_; + std::shared_ptr arrow_pool_; +}; + +TEST_F(AvroFormatWriterTest, TestWriteWithVariousBatchSize) { + auto schema_pair = PrepareArrowSchema(); + const auto& arrow_schema = schema_pair.first; + const auto& struct_type = schema_pair.second; + std::map options; + for (auto record_batch_size : {1, 2, 3, 5, 20}) { + for (auto batch_capacity : {1, 2, 3, 5, 20}) { + std::string file_name = + std::to_string(record_batch_size) + "_" + std::to_string(batch_capacity); + std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/false)); + auto format_writer = CreateFormatWriter(arrow_schema, out, batch_capacity); + auto array = PrepareArray(struct_type, record_batch_size); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + + auto batch = std::make_shared( + /*partition=*/std::map(), /*bucket=*/-1, + /*row_kinds=*/std::vector(), arrow_array.get()); + ASSERT_OK(format_writer->AddBatch(batch->GetData())); + ASSERT_OK(format_writer->Flush()); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + CheckResult(file_path, record_batch_size); + } + } +} + +TEST_F(AvroFormatWriterTest, TestWriteMultipleTimes) { + // arrow array length = 6 + 10 + 15 + 6 = 37 + // avro batch capacity = 10 + auto schema_pair = PrepareArrowSchema(); + const auto& arrow_schema = schema_pair.first; + const auto& struct_type = schema_pair.second; + + std::string file_path = PathUtil::JoinPath(dir_->Str(), "write_multiple_times"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/false)); + auto format_writer = CreateFormatWriter(arrow_schema, out, /*batch_size=*/10); + + // add batch first time, 6 rows + AddRecordBatchOnce(format_writer, struct_type, 6, 0); + // add batch second times, 10 rows + AddRecordBatchOnce(format_writer, struct_type, 10, 6); + // add batch third times, 15 rows (expand internal batch) + AddRecordBatchOnce(format_writer, struct_type, 15, 16); + // add batch fourth times, 6 rows + AddRecordBatchOnce(format_writer, struct_type, 6, 31); + + ASSERT_OK(format_writer->Flush()); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + CheckResult(file_path, /*row_count=*/37); + auto metrics = format_writer->GetWriterMetrics(); + ASSERT_TRUE(metrics); +} + +TEST_F(AvroFormatWriterTest, TestGetEstimateLength) { + auto schema_pair = PrepareArrowSchema(); + const auto& arrow_schema = schema_pair.first; + const auto& struct_type = schema_pair.second; + + std::string file_path = PathUtil::JoinPath(dir_->Str(), "get_estimate_length"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/false)); + auto format_writer = CreateFormatWriter(arrow_schema, out, /*batch_size=*/1024); + + // add batch first time, 1 row + AddRecordBatchOnce(format_writer, struct_type, 1, 0); + ASSERT_OK_AND_ASSIGN(bool reach_target_size, + format_writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/102400)); + ASSERT_FALSE(reach_target_size); + + // add batch second times, 9998 rows + AddRecordBatchOnce(format_writer, struct_type, 9998, 1); + ASSERT_OK_AND_ASSIGN(reach_target_size, format_writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/102400)); + ASSERT_FALSE(reach_target_size); + + AddRecordBatchOnce(format_writer, struct_type, 100000, 9999); + ASSERT_OK_AND_ASSIGN(reach_target_size, format_writer->ReachTargetSize(/*suggested_check=*/true, + /*target_size=*/102400)); + ASSERT_TRUE(reach_target_size); + ASSERT_OK(format_writer->Finish()); +} + +} // namespace paimon::avro::test diff --git a/src/paimon/format/avro/avro_stats_extractor.cpp b/src/paimon/format/avro/avro_stats_extractor.cpp new file mode 100644 index 0000000..104983d --- /dev/null +++ b/src/paimon/format/avro/avro_stats_extractor.cpp @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_stats_extractor.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/util/checked_cast.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/defs.h" +#include "paimon/format/avro/avro_file_format.h" +#include "paimon/status.h" + +namespace paimon { +class FileSystem; +class MemoryPool; +} // namespace paimon + +namespace paimon::avro { + +Result> +AvroStatsExtractor::ExtractWithFileInfoInternal(const std::shared_ptr& file_system, + const std::string& path, + const std::shared_ptr& pool, + bool with_file_info) const { + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr input_stream, file_system->Open(path)); + assert(input_stream); + auto avro_file_format = std::make_unique(options_); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr avro_reader_builder, + avro_file_format->CreateReaderBuilder(core_options.GetReadBatchSize())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr avro_reader, + avro_reader_builder->WithMemoryPool(pool)->Build(std::move(input_stream))); + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> c_schema, avro_reader->GetFileSchema()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_schema, + arrow::ImportSchema(c_schema.get())); + ColumnStatsVector result_stats; + result_stats.reserve(arrow_schema->num_fields()); + for (const auto& arrow_field : arrow_schema->fields()) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr stats, + FetchColumnStatistics(arrow_field->type())); + result_stats.push_back(std::move(stats)); + } + if (!with_file_info) { + // Do not return file info if not needed, because GetNumberOfRows in avro reader need I/O + // and performance is poor. + return std::make_pair(result_stats, FileInfo(-1)); + } + PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, avro_reader->GetNumberOfRows()); + return std::make_pair(result_stats, FileInfo(num_rows)); +} + +Result> AvroStatsExtractor::FetchColumnStatistics( + const std::shared_ptr& type) const { + // TODO(jinli.zjw): support stats in avro + arrow::Type::type kind = type->id(); + switch (kind) { + case arrow::Type::type::BOOL: + return ColumnStats::CreateBooleanColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::INT8: + case arrow::Type::type::INT16: + return Status::Invalid( + fmt::format("Unexpected: {} type cannot appear in avro files.", type->ToString())); + case arrow::Type::type::INT32: + return ColumnStats::CreateIntColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::INT64: + return ColumnStats::CreateBigIntColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::FLOAT: + return ColumnStats::CreateFloatColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::DOUBLE: + return ColumnStats::CreateDoubleColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::BINARY: + return ColumnStats::CreateStringColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::STRING: + return ColumnStats::CreateStringColumnStats(std::nullopt, std::nullopt, std::nullopt); + + case arrow::Type::type::DATE32: + return ColumnStats::CreateDateColumnStats(std::nullopt, std::nullopt, std::nullopt); + case arrow::Type::type::TIMESTAMP: { + auto ts_type = arrow::internal::checked_pointer_cast<::arrow::TimestampType>(type); + int32_t precision = DateTimeUtils::GetPrecisionFromType(ts_type); + return ColumnStats::CreateTimestampColumnStats(std::nullopt, std::nullopt, std::nullopt, + precision); + } + case arrow::Type::type::DECIMAL128: { + auto decimal_type = + arrow::internal::checked_pointer_cast<::arrow::Decimal128Type>(type); + int32_t precision = decimal_type->precision(); + int32_t scale = decimal_type->scale(); + return ColumnStats::CreateDecimalColumnStats(std::nullopt, std::nullopt, std::nullopt, + precision, scale); + } + case arrow::Type::type::STRUCT: + return ColumnStats::CreateNestedColumnStats(FieldType::STRUCT, std::nullopt); + case arrow::Type::type::LIST: + return ColumnStats::CreateNestedColumnStats(FieldType::ARRAY, std::nullopt); + case arrow::Type::type::MAP: + return ColumnStats::CreateNestedColumnStats(FieldType::MAP, std::nullopt); + default: + return Status::Invalid("Unknown or unsupported arrow type: ", type->ToString()); + } +} +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_stats_extractor.h b/src/paimon/format/avro/avro_stats_extractor.h new file mode 100644 index 0000000..746768f --- /dev/null +++ b/src/paimon/format/avro/avro_stats_extractor.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/format/column_stats.h" +#include "paimon/format/format_stats_extractor.h" +#include "paimon/result.h" +#include "paimon/type_fwd.h" + +namespace arrow { +class DataType; +} // namespace arrow +namespace paimon { +class FileSystem; +class MemoryPool; +} // namespace paimon + +namespace paimon::avro { + +class AvroStatsExtractor : public FormatStatsExtractor { + public: + explicit AvroStatsExtractor(const std::map& options) + : options_(options) {} + + Result Extract(const std::shared_ptr& file_system, + const std::string& path, + const std::shared_ptr& pool) override { + PAIMON_ASSIGN_OR_RAISE(auto result, ExtractWithFileInfoInternal(file_system, path, pool, + /*with_file_info=*/false)); + return result.first; + } + + Result> ExtractWithFileInfo( + const std::shared_ptr& file_system, const std::string& path, + const std::shared_ptr& pool) override { + return ExtractWithFileInfoInternal(file_system, path, pool, /*with_file_info=*/true); + } + + private: + Result> ExtractWithFileInfoInternal( + const std::shared_ptr& file_system, const std::string& path, + const std::shared_ptr& pool, bool with_file_info) const; + + Result> FetchColumnStatistics( + const std::shared_ptr& type) const; + + private: + std::map options_; +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_stats_extractor_test.cpp b/src/paimon/format/avro/avro_stats_extractor_test.cpp new file mode 100644 index 0000000..081c955 --- /dev/null +++ b/src/paimon/format/avro/avro_stats_extractor_test.cpp @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_stats_extractor.h" + +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/stats/simple_stats_converter.h" +#include "paimon/format/avro/avro_file_format.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +class AvroStatsExtractorTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + void WriteAvroFile(const std::string& file_path, + const std::shared_ptr& src_chunk_array, + const std::shared_ptr& schema) const { + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_format, + FileFormatFactory::Get("avro", options_)); + ASSERT_OK_AND_ASSIGN(auto writer_builder, + file_format->CreateWriterBuilder(&c_schema, /*batch_size=*/1024)); + + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::unique_ptr output_stream, + fs->Create(file_path, true)); + ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(std::move(output_stream), "null")); + + for (const auto& array : src_chunk_array->chunks()) { + ::ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + ASSERT_OK(writer->AddBatch(&c_array)); + } + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + ASSERT_OK_AND_ASSIGN(auto file_status, fs->GetFileStatus(file_path)); + ASSERT_GT(file_status->GetLen(), 0); + } + + private: + std::map options_ = {{Options::FILE_FORMAT, "avro"}, + {Options::MANIFEST_FORMAT, "avro"}}; +}; + +TEST_F(AvroStatsExtractorTest, TestPrimitiveStatsExtractor) { + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + arrow::FieldVector fields = { + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int16()), + arrow::field("f3", arrow::int32()), + arrow::field("f4", arrow::int64()), + arrow::field("f5", arrow::float32()), + arrow::field("f6", arrow::float64()), + arrow::field("f7", arrow::utf8()), + arrow::field("f8", arrow::binary()), + arrow::field("f9", arrow::date32()), + arrow::field("f10", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f11", arrow::decimal128(5, 2)), + arrow::field("f12", arrow::boolean()), + arrow::field("f13", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("f14", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("f15", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("f16", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f17", arrow::timestamp(arrow::TimeUnit::SECOND, timezone)), + arrow::field("f18", arrow::timestamp(arrow::TimeUnit::MILLI, timezone)), + arrow::field("f19", arrow::timestamp(arrow::TimeUnit::MICRO, timezone)), + arrow::field("f20", arrow::timestamp(arrow::TimeUnit::NANO, timezone)), + }; + auto schema = std::make_shared(fields); + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [1, 11, 111, 1111, 1.1, 1.11, "Hello", "你好", 1234, "2033-05-18 03:33:20.0", "1.22", true, "2033-05-18 03:33:20", "2033-05-18 03:33:20.0", "2033-05-18 03:33:20.0", "2033-05-18 03:33:20.0", "2033-05-18 03:33:20", "2033-05-18 03:33:20.0", "2033-05-18 03:33:20.0", "2033-05-18 03:33:20.0"], + [2, 22, 222, 2222, 2.2, 2.22, "World", "世界", -1234, "1899-01-01 00:59:20.001001001", "2.22", false, "1899-01-01 00:59:20", "1899-01-01 00:59:20", "1899-01-01 00:59:20", "1899-01-01 00:59:20.001001001","1899-01-01 00:59:20", "1899-01-01 00:59:20", "1899-01-01 00:59:20", "1899-01-01 00:59:20.001001001"], + [null, null, 0, null, null, 0, null, null, null, null, null, null, null, null, null, null, null, null, null, null] + ])") + .ValueOrDie()); + auto src_chunk_array = std::make_shared(arrow::ArrayVector({array})); + + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string file_path = dir->Str() + "/test.avro"; + WriteAvroFile(file_path, src_chunk_array, schema); + + AvroFileFormat format(options_); + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); + ASSERT_OK_AND_ASSIGN(auto extractor, format.CreateStatsExtractor(&arrow_schema)); + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto stats_with_info, + extractor->ExtractWithFileInfo(fs, file_path, GetDefaultPool())); + const auto& column_stats = stats_with_info.first; + const auto& file_stats = stats_with_info.second; + + ASSERT_EQ(column_stats.size(), 20); + for (const auto& stats : column_stats) { + ASSERT_EQ(stats->ToString(), "min null, max null, null count null"); + } + ASSERT_EQ(3, file_stats.GetRowCount()); +} + +TEST_F(AvroStatsExtractorTest, TestNestedType) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::list(arrow::float32())), + arrow::field("f1", arrow::struct_({arrow::field("sub_f0", arrow::boolean()), + arrow::field("sub_f1", arrow::int64())}))}; + auto schema = arrow::schema(fields); + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [null, [true, 2]], + [[0.1, 0.3], [true, 1]], + [[1.1, 1.2], null] + ])") + .ValueOrDie()); + auto src_chunk_array = std::make_shared(arrow::ArrayVector({array})); + + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string file_path = dir->Str() + "/test.avro"; + WriteAvroFile(file_path, src_chunk_array, schema); + + AvroStatsExtractor extractor(options_); + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto results, extractor.Extract(fs, file_path, GetDefaultPool())); + + ASSERT_EQ(results.size(), 2); + for (const auto& stats : results) { + ASSERT_EQ(stats->ToString(), "min null, max null, null count null"); + } +} + +TEST_F(AvroStatsExtractorTest, TestNullForAllType) { + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int16()), + arrow::field("f3", arrow::int32()), + arrow::field("f4", arrow::int64()), + arrow::field("f5", arrow::float32()), + arrow::field("f6", arrow::float64()), + arrow::field("f7", arrow::utf8()), + arrow::field("f8", arrow::binary()), + arrow::field("f9", arrow::list(arrow::struct_({arrow::field("key", arrow::int8()), + arrow::field("value", arrow::int16())}))), + arrow::field("f10", arrow::list(arrow::float32())), + arrow::field("f11", arrow::struct_({arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int64())})), + arrow::field("f12", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f13", arrow::date32()), + arrow::field("f14", arrow::decimal128(2, 2)), + arrow::field("f15", arrow::decimal128(30, 2)), + arrow::field("f16", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("f17", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("f18", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("f19", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f20", arrow::timestamp(arrow::TimeUnit::SECOND, timezone)), + arrow::field("f21", arrow::timestamp(arrow::TimeUnit::MILLI, timezone)), + arrow::field("f22", arrow::timestamp(arrow::TimeUnit::MICRO, timezone)), + arrow::field("f23", arrow::timestamp(arrow::TimeUnit::NANO, timezone)), + }; + auto schema = std::make_shared(fields); + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null] + ])") + .ValueOrDie()); + auto src_chunk_array = std::make_shared(arrow::ArrayVector({src_array})); + + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string file_path = dir->Str() + "/test.avro"; + WriteAvroFile(file_path, src_chunk_array, schema); + + AvroStatsExtractor extractor(options_); + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto column_stats, extractor.Extract(fs, file_path, GetDefaultPool())); + + ASSERT_OK_AND_ASSIGN(auto stats, + SimpleStatsConverter::ToBinary(column_stats, GetDefaultPool().get())); + ASSERT_EQ(stats.min_values_.HashCode(), 0xf890741a); + ASSERT_EQ(stats.max_values_.HashCode(), 0xf890741a); +} + +} // namespace paimon::avro::test diff --git a/src/paimon/format/avro/avro_writer_builder.h b/src/paimon/format/avro/avro_writer_builder.h new file mode 100644 index 0000000..dd24c3a --- /dev/null +++ b/src/paimon/format/avro/avro_writer_builder.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "avro/DataFile.hh" +#include "avro/Stream.hh" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/format/avro/avro_format_defs.h" +#include "paimon/format/avro/avro_format_writer.h" +#include "paimon/format/avro/avro_output_stream_impl.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow +namespace paimon { +class FormatWriter; +class OutputStream; +} // namespace paimon + +namespace paimon::avro { + +class AvroWriterBuilder : public WriterBuilder { + public: + AvroWriterBuilder(const std::shared_ptr& schema, int32_t batch_size, + const std::map& options) + : pool_(GetDefaultPool()), schema_(schema), options_(options) {} + + WriterBuilder* WithMemoryPool(const std::shared_ptr& pool) override { + pool_ = pool; + return this; + } + + Result> Build(const std::shared_ptr& out, + const std::string& compression) override { + auto output_stream = std::make_unique(out, BUFFER_SIZE, pool_); + PAIMON_ASSIGN_OR_RAISE( + std::string file_compression, + OptionsUtils::GetValueFromMap(options_, AVRO_CODEC, compression)); + PAIMON_ASSIGN_OR_RAISE(::avro::Codec codec, + ToAvroCompressionKind(StringUtils::ToLowerCase(file_compression))); + PAIMON_ASSIGN_OR_RAISE(std::optional compression_level, + GetAvroCompressionLevel(codec)); + return AvroFormatWriter::Create(std::move(output_stream), schema_, codec, + compression_level); + } + + private: + static constexpr int32_t BUFFER_SIZE = 1024 * 1024; + + static Result<::avro::Codec> ToAvroCompressionKind(const std::string& file_compression) { + if (file_compression == "zstd" || file_compression == "zstandard") { + return ::avro::Codec::ZSTD_CODEC; + } else if (file_compression == "snappy") { + return ::avro::Codec::SNAPPY_CODEC; + } else if (file_compression == "null" || file_compression == "none") { + return ::avro::Codec::NULL_CODEC; + } else if (file_compression == "deflate") { + return ::avro::Codec::DEFLATE_CODEC; + } else { + return Status::Invalid("unknown compression " + file_compression); + } + } + Result> GetAvroCompressionLevel(const ::avro::Codec& codec) { + std::optional compression_level; + if (codec == ::avro::Codec::ZSTD_CODEC) { + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_)); + compression_level = core_options.GetFileCompressionZstdLevel(); + } + return compression_level; + } + + std::shared_ptr pool_; + std::shared_ptr schema_; + const std::map options_; +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_writer_builder_test.cpp b/src/paimon/format/avro/avro_writer_builder_test.cpp new file mode 100644 index 0000000..5e309e5 --- /dev/null +++ b/src/paimon/format/avro/avro_writer_builder_test.cpp @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/avro/avro_writer_builder.h" + +#include "avro/DataFile.hh" +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +TEST(AvroWriterBuilderTest, HandlesValidCompressions) { + ASSERT_OK_AND_ASSIGN(::avro::Codec zstd_codec, + AvroWriterBuilder::ToAvroCompressionKind("zstd")); + ASSERT_EQ(zstd_codec, ::avro::Codec::ZSTD_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec zstandard_codec, + AvroWriterBuilder::ToAvroCompressionKind("zstandard")); + ASSERT_EQ(zstandard_codec, ::avro::Codec::ZSTD_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec snappy_codec, + AvroWriterBuilder::ToAvroCompressionKind("snappy")); + ASSERT_EQ(snappy_codec, ::avro::Codec::SNAPPY_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec null_codec, + AvroWriterBuilder::ToAvroCompressionKind("null")); + ASSERT_EQ(null_codec, ::avro::Codec::NULL_CODEC); + + ASSERT_OK_AND_ASSIGN(::avro::Codec deflate_codec, + AvroWriterBuilder::ToAvroCompressionKind("deflate")); + ASSERT_EQ(deflate_codec, ::avro::Codec::DEFLATE_CODEC); +} + +TEST(AvroWriterBuilderTest, HandlesInvalidCompression) { + ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("unknown_compression")); +} + +TEST(AvroWriterBuilderTest, HandlesEmptyString) { + ASSERT_NOK(AvroWriterBuilder::ToAvroCompressionKind("")); +} + +TEST(AvroWriterBuilderTest, CheckAvroCodec) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto schema = std::make_shared(fields); + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "snappy"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::SNAPPY_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "deflate"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::DEFLATE_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, {"avro.codec", "zstd"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 1); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "zstd"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::ZSTD_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, 3); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "null"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(auto file_writer, builder.Build(nullptr, "zstd")); + auto* avro_file_writer = dynamic_cast(file_writer.get()); + ASSERT_EQ(avro_file_writer->writer_->codec_, ::avro::Codec::NULL_CODEC); + ASSERT_EQ(avro_file_writer->writer_->compressionLevel_, std::nullopt); + } + { + AvroWriterBuilder builder(schema, -1, + {{Options::FILE_FORMAT, "avro"}, + {"avro.codec", "test"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_NOK(builder.Build(nullptr, "zstd")); + } +} + +TEST(AvroWriterBuilderTest, CheckAvroCompressionLevel) { + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional zstd_level, + builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC)); + ASSERT_TRUE(zstd_level.has_value()); + ASSERT_EQ(zstd_level.value(), 1); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::SNAPPY_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::DEFLATE_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder(nullptr, -1, {{Options::FILE_FORMAT, "avro"}}); + ASSERT_OK_AND_ASSIGN(std::optional compression_level, + builder.GetAvroCompressionLevel(::avro::Codec::NULL_CODEC)); + ASSERT_FALSE(compression_level.has_value()); + } + { + AvroWriterBuilder builder( + nullptr, -1, + {{Options::FILE_FORMAT, "avro"}, {Options::FILE_COMPRESSION_ZSTD_LEVEL, "3"}}); + ASSERT_OK_AND_ASSIGN(std::optional zstd_level, + builder.GetAvroCompressionLevel(::avro::Codec::ZSTD_CODEC)); + ASSERT_TRUE(zstd_level.has_value()); + ASSERT_EQ(zstd_level.value(), 3); + } +} + +} // namespace paimon::avro::test