diff --git a/LICENSE b/LICENSE index 175f5e7..2665c1e 100644 --- a/LICENSE +++ b/LICENSE @@ -213,6 +213,21 @@ License: https://www.apache.org/licenses/LICENSE-2.0 -------------------------------------------------------------------------------- +This product includes code from Apache Iceberg C++. + +* Avro direct decoder/encoder: + * src/paimon/format/avro/avro_direct_decoder.cpp + * src/paimon/format/avro/avro_direct_decoder.h + * src/paimon/format/avro/avro_direct_encoder.cpp + * src/paimon/format/avro/avro_direct_encoder.h +* Avro input stream in src/paimon/format/avro/avro_direct_decoder.cpp + +Copyright: 2024-2025 The Apache Software Foundation. +Home page: https://iceberg.apache.org/ +License: https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product includes code from RocksDB. * endian utility in src/paimon/common/utils/math.h diff --git a/NOTICE b/NOTICE index 3be42fb..b98bb34 100644 --- a/NOTICE +++ b/NOTICE @@ -10,6 +10,9 @@ Copyright 2014-present Alibaba Inc. Apache Arrow Copyright 2016-2024 The Apache Software Foundation +Apache Iceberg C++ +Copyright 2024-2025 The Apache Software Foundation + Apache ORC Copyright 2013 and onwards The Apache Software Foundation diff --git a/src/paimon/format/avro/avro_direct_decoder.cpp b/src/paimon/format/avro/avro_direct_decoder.cpp new file mode 100644 index 0000000..b5bbd1d --- /dev/null +++ b/src/paimon/format/avro/avro_direct_decoder.cpp @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Adapted from Apache Iceberg C++ +// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_direct_decoder.cc + +#include "paimon/format/avro/avro_direct_decoder.h" + +#include "arrow/api.h" +#include "arrow/util/checked_cast.h" +#include "avro/Decoder.hh" +#include "avro/Node.hh" +#include "avro/Types.hh" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/format/avro/avro_utils.h" + +namespace paimon::avro { + +namespace { + +/// Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder* decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder->decodeNull(); + return Status::OK(); + + case ::avro::AVRO_BOOL: + decoder->decodeBool(); + return Status::OK(); + + case ::avro::AVRO_INT: + decoder->decodeInt(); + return Status::OK(); + + case ::avro::AVRO_LONG: + decoder->decodeLong(); + return Status::OK(); + + case ::avro::AVRO_FLOAT: + decoder->decodeFloat(); + return Status::OK(); + + case ::avro::AVRO_DOUBLE: + decoder->decodeDouble(); + return Status::OK(); + + case ::avro::AVRO_STRING: + decoder->skipString(); + return Status::OK(); + + case ::avro::AVRO_BYTES: + decoder->skipBytes(); + return Status::OK(); + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + PAIMON_RETURN_NOT_OK(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return Status::OK(); + } + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder->skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + PAIMON_RETURN_NOT_OK(SkipAvroValue(element_node, decoder)); + } + block_count = decoder->arrayNext(); + } + return Status::OK(); + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder->skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder->skipString(); // Skip key (always string in Avro maps) + PAIMON_RETURN_NOT_OK(SkipAvroValue(value_node, decoder)); + } + block_count = decoder->mapNext(); + } + return Status::OK(); + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder->decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return Status::Invalid(fmt::format("Union branch index {} out of range [0, {})", + branch_index, num_branches)); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return Status::Invalid(fmt::format("Unsupported Avro type for skipping: {}", + AvroUtils::ToString(avro_node))); + } +} + +/// Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return Status::Invalid( + fmt::format("Expected Avro record, got type: {}", AvroUtils::ToString(avro_node))); + } + + auto* struct_builder = arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->Append()); + + size_t skipped_fields = 0; + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + if (projection && projection->find(avro_idx) == projection->end()) { + skipped_fields++; + PAIMON_RETURN_NOT_OK(SkipAvroValue(avro_node->leafAt(avro_idx), decoder)); + } else { + // Decode this field + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(avro_idx - skipped_fields); + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(avro_field_node, /*projection=*/std::nullopt, + decoder, field_builder, ctx)); + } + } + + return Status::OK(); +} + +/// Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder* decoder, + arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return Status::Invalid( + fmt::format("Expected Avro array, got type: {}", AvroUtils::ToString(avro_node))); + } + + auto* list_builder = arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + + // Read array block count + int64_t block_count = decoder->arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(element_node, /*projection=*/std::nullopt, + decoder, value_builder, ctx)); + } + block_count = decoder->arrayNext(); + } + + return Status::OK(); +} + +/// Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder* decoder, + arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx) { + auto* map_builder = arrow::internal::checked_cast(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + + PAIMON_RETURN_NOT_OK_FROM_ARROW(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder->mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(key_node, /*projection=*/std::nullopt, + decoder, key_builder, ctx)); + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(value_node, /*projection=*/std::nullopt, + decoder, item_builder, ctx)); + } + block_count = decoder->mapNext(); + } + return Status::OK(); + } else if (avro_node->type() == ::avro::AVRO_ARRAY && AvroUtils::HasMapLogicalType(avro_node)) { + // Handle array-based map: list> + PAIMON_RETURN_NOT_OK_FROM_ARROW(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return Status::Invalid( + fmt::format("Array-based map must contain records with exactly 2 fields, got: {}", + AvroUtils::ToString(record_node))); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder->arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(key_node, /*projection=*/std::nullopt, + decoder, key_builder, ctx)); + PAIMON_RETURN_NOT_OK(DecodeFieldToBuilder(value_node, /*projection=*/std::nullopt, + decoder, item_builder, ctx)); + } + block_count = decoder->arrayNext(); + } + return Status::OK(); + } else { + return Status::Invalid( + fmt::format("Expected Avro map or array with map logical type, got: {}", + AvroUtils::ToString(avro_node))); + } +} + +/// Decode Avro data directly to Arrow array builder. +Status DecodeAvroValueToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx) { + auto type = avro_node->type(); + auto logical_type = avro_node->logicalType(); + + switch (type) { + case ::avro::AVRO_BOOL: { + auto* builder = arrow::internal::checked_cast(array_builder); + bool value = decoder->decodeBool(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + + case ::avro::AVRO_INT: { + int32_t value = decoder->decodeInt(); + auto arrow_type = array_builder->type(); + switch (arrow_type->id()) { + case arrow::Type::INT8: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case arrow::Type::INT16: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case arrow::Type::INT32: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case arrow::Type::DATE32: { + if (logical_type.type() != ::avro::LogicalType::Type::DATE) { + return Status::TypeError( + fmt::format("Unexpected avro type [{}] with arrow type [{}].", type, + arrow_type->ToString())); + } + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + default: + return Status::TypeError( + fmt::format("Unexpected avro type [{}] with arrow type [{}].", type, + arrow_type->ToString())); + } + } + + case ::avro::AVRO_LONG: { + int64_t value = decoder->decodeLong(); + switch (logical_type.type()) { + case ::avro::LogicalType::Type::NONE: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case ::avro::LogicalType::Type::TIMESTAMP_MILLIS: + case ::avro::LogicalType::Type::TIMESTAMP_MICROS: + case ::avro::LogicalType::Type::TIMESTAMP_NANOS: + case ::avro::LogicalType::Type::LOCAL_TIMESTAMP_MILLIS: + case ::avro::LogicalType::Type::LOCAL_TIMESTAMP_MICROS: + case ::avro::LogicalType::Type::LOCAL_TIMESTAMP_NANOS: { + auto* builder = + arrow::internal::checked_cast(array_builder); + auto ts_type = + arrow::internal::checked_cast(builder->type().get()); + // for arrow second, we need to convert it from avro millisecond + if (ts_type->unit() == arrow::TimeUnit::type::SECOND) { + value /= DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::MILLISECOND]; + } + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + default: + return Status::TypeError( + fmt::format("Unexpected avro type [{}] with arrow type [{}].", type, + array_builder->type()->ToString())); + } + } + + case ::avro::AVRO_FLOAT: { + auto* builder = arrow::internal::checked_cast(array_builder); + float value = decoder->decodeFloat(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case ::avro::AVRO_DOUBLE: { + auto* builder = arrow::internal::checked_cast(array_builder); + double value = decoder->decodeDouble(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(value)); + return Status::OK(); + } + case ::avro::AVRO_STRING: { + auto* builder = arrow::internal::checked_cast(array_builder); + decoder->decodeString(ctx->string_scratch); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(ctx->string_scratch)); + return Status::OK(); + } + + case ::avro::AVRO_BYTES: { + decoder->decodeBytes(ctx->bytes_scratch); + switch (logical_type.type()) { + case ::avro::LogicalType::Type::NONE: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->Append(ctx->bytes_scratch.data(), + static_cast(ctx->bytes_scratch.size()))); + return Status::OK(); + } + case ::avro::LogicalType::Type::DECIMAL: { + auto* builder = + arrow::internal::checked_cast(array_builder); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + arrow::Decimal128 decimal, + arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(), + ctx->bytes_scratch.size())); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(decimal)); + return Status::OK(); + } + default: + return Status::TypeError( + fmt::format("Unexpected avro type [{}] with arrow type [{}].", type, + array_builder->type()->ToString())); + } + } + + case ::avro::AVRO_RECORD: { + return DecodeStructToBuilder(avro_node, projection, decoder, array_builder, ctx); + } + case ::avro::AVRO_ARRAY: { + if (AvroUtils::HasMapLogicalType(avro_node)) { + return DecodeMapToBuilder(avro_node, decoder, array_builder, ctx); + } else { + return DecodeListToBuilder(avro_node, decoder, array_builder, ctx); + } + } + case ::avro::AVRO_MAP: { + return DecodeMapToBuilder(avro_node, decoder, array_builder, ctx); + } + default: + return Status::Invalid(fmt::format("Unsupported avro type: {}", type)); + } +} + +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, arrow::ArrayBuilder* array_builder, + AvroDirectDecoder::DecodeContext* ctx) { + if (avro_node->type() == ::avro::AVRO_UNION) { + const size_t branch_index = decoder->decodeUnionIndex(); + + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return Status::Invalid(fmt::format("Union branch index {} out of range [0, {})", + branch_index, num_branches)); + } + + const auto& branch_node = avro_node->leafAt(branch_index); + if (branch_node->type() == ::avro::AVRO_NULL) { + decoder->decodeNull(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(array_builder->AppendNull()); + return Status::OK(); + } else { + return DecodeFieldToBuilder(branch_node, projection, decoder, array_builder, ctx); + } + } + + return DecodeAvroValueToBuilder(avro_node, projection, decoder, array_builder, ctx); +} + +} // namespace + +Status AvroDirectDecoder::DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, + arrow::ArrayBuilder* array_builder, + DecodeContext* ctx) { + return DecodeFieldToBuilder(avro_node, projection, decoder, array_builder, ctx); +} + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_direct_decoder.h b/src/paimon/format/avro/avro_direct_decoder.h new file mode 100644 index 0000000..a5bb7cd --- /dev/null +++ b/src/paimon/format/avro/avro_direct_decoder.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Adapted from Apache Iceberg C++ +// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_direct_decoder_internal.h + +#pragma once + +#include + +#include "arrow/array/builder_base.h" +#include "avro/Decoder.hh" +#include "avro/Node.hh" +#include "paimon/status.h" + +namespace paimon::avro { + +class AvroDirectDecoder { + public: + /// Context for reusing scratch buffers during Avro decoding + /// + /// Avoids frequent small allocations by reusing temporary buffers across multiple decode + /// operations. This is particularly important for string, binary, and decimal data types. + struct DecodeContext { + // Scratch buffer for string decoding (reused across rows) + std::string string_scratch; + // Scratch buffer for binary/decimal data (reused across rows) + std::vector bytes_scratch; + }; + + /// Directly decode Avro data to Arrow array builders without GenericDatum + /// + /// Eliminates the GenericDatum intermediate layer by directly calling Avro decoder + /// methods and immediately appending to Arrow builders. + /// + /// @param avro_node The Avro schema node for the data being decoded + /// @param decoder The Avro decoder positioned at the data to read + /// @param array_builder The Arrow array builder to append decoded data to + /// @param ctx Decode context for reusing scratch buffers + /// @return Status indicating success, or an error status + static Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, + const std::optional>& projection, + ::avro::Decoder* decoder, arrow::ArrayBuilder* array_builder, + DecodeContext* ctx); +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_direct_encoder.cpp b/src/paimon/format/avro/avro_direct_encoder.cpp new file mode 100644 index 0000000..d8ace9f --- /dev/null +++ b/src/paimon/format/avro/avro_direct_encoder.cpp @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Adapted from Apache Iceberg C++ +// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_direct_encoder.cc + +#include "paimon/format/avro/avro_direct_encoder.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "fmt/format.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/format/avro/avro_utils.h" +#include "paimon/result.h" + +namespace paimon::avro { + +namespace { + +// Utility struct for union branch information +struct UnionBranches { + size_t null_index; + size_t value_index; + ::avro::NodePtr value_node; +}; + +Result ValidateUnion(const ::avro::NodePtr& union_node) { + if (PAIMON_UNLIKELY(union_node->leaves() != 2)) { + return Status::Invalid( + fmt::format("Union must have exactly 2 branches, got {}", union_node->leaves())); + } + + const auto& branch_0 = union_node->leafAt(0); + const auto& branch_1 = union_node->leafAt(1); + + if (branch_0->type() == ::avro::AVRO_NULL && branch_1->type() != ::avro::AVRO_NULL) { + return UnionBranches{.null_index = 0, .value_index = 1, .value_node = branch_1}; + } + if (branch_1->type() == ::avro::AVRO_NULL && branch_0->type() != ::avro::AVRO_NULL) { + return Status::Invalid( + "Unexpected: In paimon, we expect the null branch to be the first branch in a union."); + } + return Status::Invalid("Union must have exactly one null branch"); +} + +} // namespace + +Status AvroDirectEncoder::EncodeArrowToAvro(const ::avro::NodePtr& avro_node, + const arrow::Array& array, int64_t row_index, + ::avro::Encoder* encoder, EncodeContext* ctx) { + if (PAIMON_UNLIKELY(row_index < 0 || row_index >= array.length())) { + return Status::Invalid( + fmt::format("Row index {} out of bounds {}", row_index, array.length())); + } + + const bool is_null = array.IsNull(row_index); + + if (avro_node->type() == ::avro::AVRO_UNION) { + PAIMON_ASSIGN_OR_RAISE(UnionBranches branches, ValidateUnion(avro_node)); + + if (is_null) { + encoder->encodeUnionIndex(branches.null_index); + encoder->encodeNull(); + return Status::OK(); + } + + encoder->encodeUnionIndex(branches.value_index); + return EncodeArrowToAvro(branches.value_node, array, row_index, encoder, ctx); + } + + if (is_null) { + return Status::Invalid("Null value in non-nullable field"); + } + + switch (avro_node->type()) { + case ::avro::AVRO_BOOL: { + const auto& bool_array = + arrow::internal::checked_cast(array); + encoder->encodeBool(bool_array.Value(row_index)); + return Status::OK(); + } + + case ::avro::AVRO_INT: { + // AVRO_INT can represent: int8, int16, int32, date (days since epoch) + switch (array.type()->id()) { + case arrow::Type::INT8: { + const auto& int8_array = + arrow::internal::checked_cast(array); + encoder->encodeInt(int8_array.Value(row_index)); + return Status::OK(); + } + case arrow::Type::INT16: { + const auto& int16_array = + arrow::internal::checked_cast(array); + encoder->encodeInt(int16_array.Value(row_index)); + return Status::OK(); + } + + case arrow::Type::INT32: { + const auto& int32_array = + arrow::internal::checked_cast(array); + encoder->encodeInt(int32_array.Value(row_index)); + return Status::OK(); + } + case arrow::Type::DATE32: { + const auto& date_array = + arrow::internal::checked_cast(array); + encoder->encodeInt(date_array.Value(row_index)); + return Status::OK(); + } + default: + return Status::Invalid( + fmt::format("AVRO_INT expects Int8Array or Int16Array or Int32Array or " + "Date32Array, got {}", + array.type()->ToString())); + } + } + + case ::avro::AVRO_LONG: { + // AVRO_LONG can represent: int64, timestamp + switch (array.type()->id()) { + case arrow::Type::INT64: { + const auto& int64_array = + arrow::internal::checked_cast(array); + encoder->encodeLong(int64_array.Value(row_index)); + return Status::OK(); + } + case arrow::Type::TIMESTAMP: { + const auto& timestamp_array = + arrow::internal::checked_cast(array); + int64_t timestamp = timestamp_array.Value(row_index); + + auto ts_type = + arrow::internal::checked_pointer_cast(array.type()); + arrow::TimeUnit::type unit = ts_type->unit(); + const auto& logical_type = avro_node->logicalType().type(); + + // NOTE: Java Avro only support TIMESTAMP_MILLIS && TIMESTAMP_MICROS + if (((logical_type == ::avro::LogicalType::TIMESTAMP_MILLIS || + logical_type == ::avro::LogicalType::LOCAL_TIMESTAMP_MILLIS) && + unit == arrow::TimeUnit::MILLI) || + ((logical_type == ::avro::LogicalType::TIMESTAMP_MICROS || + logical_type == ::avro::LogicalType::LOCAL_TIMESTAMP_MICROS) && + unit == arrow::TimeUnit::MICRO) || + ((logical_type == ::avro::LogicalType::TIMESTAMP_NANOS || + logical_type == ::avro::LogicalType::LOCAL_TIMESTAMP_NANOS) && + unit == arrow::TimeUnit::NANO)) { + encoder->encodeLong(timestamp); + } else if ((logical_type == ::avro::LogicalType::TIMESTAMP_MILLIS || + logical_type == ::avro::LogicalType::LOCAL_TIMESTAMP_MILLIS) && + unit == arrow::TimeUnit::SECOND) { + // for arrow second, we need to convert it to avro millisecond + encoder->encodeLong( + timestamp * + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::MILLISECOND]); + } else { + return Status::Invalid( + fmt::format("Unsupported timestamp type with avro logical type {} and " + "arrow time unit {}.", + AvroUtils::ToString(avro_node->logicalType()), + DateTimeUtils::GetArrowTimeUnitStr(unit))); + } + return Status::OK(); + } + default: + return Status::Invalid( + fmt::format("AVRO_LONG expects Int64Array, or TimestampArray, got {}", + array.type()->ToString())); + } + } + + case ::avro::AVRO_FLOAT: { + const auto& float_array = + arrow::internal::checked_cast(array); + encoder->encodeFloat(float_array.Value(row_index)); + return Status::OK(); + } + + case ::avro::AVRO_DOUBLE: { + const auto& double_array = + arrow::internal::checked_cast(array); + encoder->encodeDouble(double_array.Value(row_index)); + return Status::OK(); + } + + case ::avro::AVRO_STRING: { + const auto& string_array = + arrow::internal::checked_cast(array); + std::string_view value = string_array.GetView(row_index); + encoder->encodeString(std::string(value)); + return Status::OK(); + } + + case ::avro::AVRO_BYTES: { + // Handle DECIMAL + if (avro_node->logicalType().type() == ::avro::LogicalType::DECIMAL) { + const auto& decimal_array = + arrow::internal::checked_cast(array); + std::string_view decimal_value = decimal_array.GetView(row_index); + ctx->assign(decimal_value.begin(), decimal_value.end()); + // Arrow Decimal128 bytes are in little-endian order, Avro requires big-endian + std::reverse(ctx->begin(), ctx->end()); + encoder->encodeBytes(ctx->data(), ctx->size()); + return Status::OK(); + } + + // Handle regular BYTES + const auto& binary_array = + arrow::internal::checked_cast(array); + std::string_view value = binary_array.GetView(row_index); + encoder->encodeBytes(reinterpret_cast(value.data()), value.size()); + return Status::OK(); + } + + case ::avro::AVRO_RECORD: { + if (PAIMON_UNLIKELY(array.type()->id() != arrow::Type::STRUCT)) { + return Status::Invalid(fmt::format("AVRO_RECORD expects StructArray, got {}", + array.type()->ToString())); + } + + const auto& struct_array = + arrow::internal::checked_cast(array); + const size_t num_fields = avro_node->leaves(); + + if (PAIMON_UNLIKELY(struct_array.num_fields() != static_cast(num_fields))) { + return Status::Invalid(fmt::format( + "Field count mismatch: Arrow struct has {} fields, Avro node has {} fields", + struct_array.num_fields(), num_fields)); + } + + for (size_t i = 0; i < num_fields; ++i) { + const auto& field_node = avro_node->leafAt(i); + const auto& field_array = struct_array.field(static_cast(i)); + + PAIMON_RETURN_NOT_OK( + EncodeArrowToAvro(field_node, *field_array, row_index, encoder, ctx)); + } + return Status::OK(); + } + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + + // Handle ListArray + if (array.type()->id() == arrow::Type::LIST) { + const auto& list_array = + arrow::internal::checked_cast(array); + + const auto start = list_array.value_offset(row_index); + const auto end = list_array.value_offset(row_index + 1); + const auto length = end - start; + + encoder->arrayStart(); + if (length > 0) { + encoder->setItemCount(length); + const auto& values = list_array.values(); + + for (int64_t i = start; i < end; ++i) { + encoder->startItem(); + PAIMON_RETURN_NOT_OK( + EncodeArrowToAvro(element_node, *values, i, encoder, ctx)); + } + } + encoder->arrayEnd(); + return Status::OK(); + } else if (array.type()->id() == arrow::Type::MAP && + AvroUtils::HasMapLogicalType(avro_node)) { + // Handle MapArray (for Avro maps with non-string keys) + if (PAIMON_UNLIKELY(element_node->type() != ::avro::AVRO_RECORD || + element_node->leaves() != 2)) { + return Status::Invalid( + fmt::format("Expected AVRO_RECORD for map key-value pair, got {}", + AvroUtils::ToString(element_node))); + } + + const auto& map_array = + arrow::internal::checked_cast(array); + + const auto start = map_array.value_offset(row_index); + const auto end = map_array.value_offset(row_index + 1); + const auto length = end - start; + + encoder->arrayStart(); + if (length > 0) { + encoder->setItemCount(length); + const auto& keys = map_array.keys(); + const auto& values = map_array.items(); + + // The element_node should be a RECORD with "key" and "value" fields + for (int64_t i = start; i < end; ++i) { + const auto& key_node = element_node->leafAt(0); + const auto& value_node = element_node->leafAt(1); + + encoder->startItem(); + PAIMON_RETURN_NOT_OK(EncodeArrowToAvro(key_node, *keys, i, encoder, ctx)); + PAIMON_RETURN_NOT_OK( + EncodeArrowToAvro(value_node, *values, i, encoder, ctx)); + } + } + encoder->arrayEnd(); + return Status::OK(); + } + + return Status::Invalid(fmt::format( + "AVRO_ARRAY must map to ListArray or MapArray, got {}", array.type()->ToString())); + } + + case ::avro::AVRO_MAP: { + if (PAIMON_UNLIKELY(array.type()->id() != arrow::Type::MAP)) { + return Status::Invalid( + fmt::format("AVRO_MAP expects MapArray, got {}", array.type()->ToString())); + } + const auto& map_array = arrow::internal::checked_cast(array); + + const auto start = map_array.value_offset(row_index); + const auto end = map_array.value_offset(row_index + 1); + const auto length = end - start; + + encoder->mapStart(); + if (length > 0) { + encoder->setItemCount(length); + const auto& keys = map_array.keys(); + const auto& values = map_array.items(); + const auto& value_node = avro_node->leafAt(1); + + if (PAIMON_UNLIKELY(keys->type()->id() != arrow::Type::STRING)) { + return Status::Invalid(fmt::format("AVRO_MAP keys must be StringArray, got {}", + keys->type()->ToString())); + } + + for (int64_t i = start; i < end; ++i) { + encoder->startItem(); + const auto& string_array = + arrow::internal::checked_cast(*keys); + std::string_view key_value = string_array.GetView(i); + encoder->encodeString(std::string(key_value)); + + PAIMON_RETURN_NOT_OK(EncodeArrowToAvro(value_node, *values, i, encoder, ctx)); + } + } + encoder->mapEnd(); + return Status::OK(); + } + + case ::avro::AVRO_NULL: + case ::avro::AVRO_UNION: + // Already handled above + return Status::Invalid(fmt::format("Unexpected Avro type handling: {}", + ::avro::toString(avro_node->type()))); + default: + return Status::Invalid( + fmt::format("Unsupported Avro type: {}", ::avro::toString(avro_node->type()))); + } +} + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_direct_encoder.h b/src/paimon/format/avro/avro_direct_encoder.h new file mode 100644 index 0000000..6d0f995 --- /dev/null +++ b/src/paimon/format/avro/avro_direct_encoder.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Adapted from Apache Iceberg C++ +// https://github.com/apache/iceberg-cpp/blob/main/src/iceberg/avro/avro_direct_encoder_internal.h + +#pragma once + +#include + +#include "arrow/api.h" +#include "avro/Encoder.hh" +#include "avro/Node.hh" +#include "paimon/status.h" + +namespace paimon::avro { + +class AvroDirectEncoder { + public: + /// Context for reusing scratch buffers during Avro encoding + /// + /// Avoids frequent small allocations by reusing temporary buffers across multiple encode + /// operations. This is particularly important for binary/decimal types (reused across rows). + using EncodeContext = std::vector; + + /// Directly encode Arrow data to Avro without GenericDatum + /// + /// Eliminates the GenericDatum intermediate layer by directly calling Avro encoder + /// methods from Arrow arrays. + /// + /// @param avro_node The Avro schema node for the data being encoded + /// @param array The Arrow array containing the data to encode + /// @param row_index The index of the row to encode within the array + /// @param encoder The Avro encoder to write data to + /// @param ctx Encode context for reusing scratch buffers + /// @return Status indicating success, or an error status + static Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, const arrow::Array& array, + int64_t row_index, ::avro::Encoder* encoder, + EncodeContext* ctx); +}; + +} // namespace paimon::avro diff --git a/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp new file mode 100644 index 0000000..bb5bd3c --- /dev/null +++ b/src/paimon/format/avro/avro_direct_encoder_decoder_test.cpp @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "arrow/api.h" +#include "arrow/ipc/api.h" +#include "avro/Compiler.hh" +#include "avro/Decoder.hh" +#include "avro/Encoder.hh" +#include "avro/Stream.hh" +#include "avro/ValidSchema.hh" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/avro/avro_direct_decoder.h" +#include "paimon/format/avro/avro_direct_encoder.h" +#include "paimon/format/avro/avro_schema_converter.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::avro::test { + +class AvroDirectEncoderDecoderTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + Result> EncodeData( + const ::avro::NodePtr& avro_node, const std::shared_ptr& input_array) { + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + for (int64_t i = 0; i < input_array->length(); ++i) { + PAIMON_RETURN_NOT_OK(AvroDirectEncoder::EncodeArrowToAvro(avro_node, *input_array, i, + encoder.get(), &encode_ctx_)); + } + return output_stream; + } + + Result> DecodeWithEncodedData( + const ::avro::NodePtr& avro_node, std::unique_ptr<::avro::OutputStream>&& encoded_data, + const std::optional>& projection, int32_t expected_count, + arrow::ArrayBuilder* builder) { + auto input_stream = ::avro::memoryInputStream(*encoded_data); + auto decoder = ::avro::binaryDecoder(); + decoder->init(*input_stream); + + for (int32_t i = 0; i < expected_count; ++i) { + PAIMON_RETURN_NOT_OK(AvroDirectDecoder::DecodeAvroToBuilder( + avro_node, projection, decoder.get(), builder, &decode_ctx_)); + } + + std::shared_ptr decoded_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&decoded_array)); + EXPECT_EQ(decoded_array->length(), expected_count); + return decoded_array; + } + + void CheckResult(const std::string& schema_json, + const std::shared_ptr& input_array, + arrow::ArrayBuilder* builder) { + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + ASSERT_OK_AND_ASSIGN(auto encoded_data, EncodeData(avro_schema.root(), input_array)); + ASSERT_OK_AND_ASSIGN( + auto decoded_array, + DecodeWithEncodedData(avro_schema.root(), std::move(encoded_data), + /*projection=*/std::nullopt, input_array->length(), builder)); + ASSERT_TRUE(decoded_array->Equals(*input_array)); + } + + Result> GetProjectedArray( + const std::shared_ptr& input_array, + const std::set& projection) { + auto struct_type = input_array->struct_type(); + arrow::FieldVector projected_fields; + projected_fields.reserve(projection.size()); + arrow::ArrayVector projected_field_arrays; + projected_field_arrays.reserve(projection.size()); + for (size_t index : projection) { + if (index >= static_cast(struct_type->num_fields())) { + return Status::Invalid( + fmt::format("Projection index {} out of range for struct with {} fields", index, + struct_type->num_fields())); + } + projected_fields.push_back(struct_type->field(index)); + projected_field_arrays.push_back(input_array->field(index)); + } + auto projected_struct_type = arrow::struct_(projected_fields); + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto projected_array, + arrow::StructArray::Make(projected_field_arrays, projected_fields)); + return projected_array; + } + + void CheckResultWithProjection(const std::shared_ptr& src_array, + const std::set& projection) { + auto src_struct_array = std::dynamic_pointer_cast(src_array); + ASSERT_OK_AND_ASSIGN(auto avro_schema, + AvroSchemaConverter::ArrowSchemaToAvroSchema( + arrow::schema(src_struct_array->struct_type()->fields()))); + ASSERT_OK_AND_ASSIGN(auto encoded_data, EncodeData(avro_schema.root(), src_array)); + + ASSERT_OK_AND_ASSIGN(auto projected_array, GetProjectedArray(src_struct_array, projection)); + auto decoded_array_builder = arrow::MakeBuilder(projected_array->type()).ValueOrDie(); + ASSERT_OK_AND_ASSIGN( + auto decoded_array, + DecodeWithEncodedData(avro_schema.root(), std::move(encoded_data), projection, + src_array->length(), decoded_array_builder.get())); + ASSERT_TRUE(decoded_array->Equals(*projected_array)); + } + + protected: + AvroDirectEncoder::EncodeContext encode_ctx_; + AvroDirectDecoder::DecodeContext decode_ctx_; +}; + +TEST_F(AvroDirectEncoderDecoderTest, TestBooleanType) { + std::string schema_json = R"({"type": "boolean"})"; + arrow::BooleanBuilder builder; + ASSERT_TRUE(builder.Append(true).ok()); + ASSERT_TRUE(builder.Append(false).ok()); + ASSERT_TRUE(builder.Append(true).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestIntegerTypes) { + // Test INT8 + { + std::string schema_json = R"({"type": "int"})"; + arrow::Int8Builder builder; + ASSERT_TRUE(builder.Append(1).ok()); + ASSERT_TRUE(builder.Append(-128).ok()); + ASSERT_TRUE(builder.Append(127).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test INT32 + { + std::string schema_json = R"({"type": "int"})"; + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + ASSERT_TRUE(builder.Append(-2147483648).ok()); + ASSERT_TRUE(builder.Append(2147483647).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test INT64 + { + std::string schema_json = R"({"type": "long"})"; + arrow::Int64Builder builder; + ASSERT_TRUE(builder.Append(123456789L).ok()); + ASSERT_TRUE(builder.Append(-9223372036854775807L).ok()); + ASSERT_TRUE(builder.Append(9223372036854775807L).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestFloatingPointTypes) { + // Test FLOAT + { + std::string schema_json = R"({"type": "float"})"; + arrow::FloatBuilder builder; + ASSERT_TRUE(builder.Append(3.14f).ok()); + ASSERT_TRUE(builder.Append(-2.71f).ok()); + ASSERT_TRUE(builder.Append(0.0f).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test DOUBLE + { + std::string schema_json = R"({"type": "double"})"; + arrow::DoubleBuilder builder; + ASSERT_TRUE(builder.Append(3.141592653589793).ok()); + ASSERT_TRUE(builder.Append(-2.718281828459045).ok()); + ASSERT_TRUE(builder.Append(0.0).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestStringType) { + std::string schema_json = R"({"type": "string"})"; + arrow::StringBuilder builder; + ASSERT_TRUE(builder.Append("hello").ok()); + ASSERT_TRUE(builder.Append("world").ok()); + ASSERT_TRUE(builder.Append("").ok()); + ASSERT_TRUE(builder.Append("测试中文").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestBytesType) { + std::string schema_json = R"({"type": "bytes"})"; + arrow::BinaryBuilder builder; + ASSERT_TRUE(builder.Append("binary_data").ok()); + ASSERT_TRUE(builder.Append(std::string("\x00\x01\x02\x03", 4)).ok()); + ASSERT_TRUE(builder.Append("").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDate32Type) { + std::string schema_json = R"({"type": "int", "logicalType": "date"})"; + arrow::Date32Builder builder; + ASSERT_TRUE(builder.Append(18628).ok()); // 2021-01-01 + ASSERT_TRUE(builder.Append(0).ok()); // 1970-01-01 + ASSERT_TRUE(builder.Append(-1).ok()); // 1969-12-31 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestTimestampType) { + // Test timestamp-millis + { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-millis"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::MILLI), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123L).ok()); // 2021-01-01 00:00:00.123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } + + // Test timestamp-micros + { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-micros"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::MICRO), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123123L).ok()); // 2021-01-01 00:00:00.123123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidTimestampType) { + std::string schema_json = R"({"type": "long", "logicalType": "timestamp-millis"})"; + arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::NANO), + arrow::default_memory_pool()); + ASSERT_TRUE(builder.Append(1609459200123L).ok()); // 2021-01-01 00:00:00.123 + ASSERT_TRUE(builder.Append(0L).ok()); // 1970-01-01 00:00:00 + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "Unsupported timestamp type with avro logical type \"logicalType\": " + "\"timestamp-millis\" and arrow time unit NANOSECOND."); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestUnionType) { + // Test nullable int (union of null and int) + std::string schema_json = R"(["null", "int"])"; + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + ASSERT_TRUE(builder.AppendNull().ok()); + ASSERT_TRUE(builder.Append(100).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestRecordType) { + std::string schema_json = R"({ + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "active", "type": "boolean"} + ] + })"; + + // Create struct array + auto int_field = arrow::field("id", arrow::int32()); + auto string_field = arrow::field("name", arrow::utf8()); + auto bool_field = arrow::field("active", arrow::boolean()); + auto struct_type = arrow::struct_({int_field, string_field, bool_field}); + + arrow::StructBuilder struct_builder( + struct_type, arrow::default_memory_pool(), + {std::make_shared(), std::make_shared(), + std::make_shared()}); + + auto int_builder = static_cast(struct_builder.field_builder(0)); + auto string_builder = static_cast(struct_builder.field_builder(1)); + auto bool_builder = static_cast(struct_builder.field_builder(2)); + + // Add first record + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(1).ok()); + ASSERT_TRUE(string_builder->Append("Alice").ok()); + ASSERT_TRUE(bool_builder->Append(true).ok()); + + // Add second record + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(2).ok()); + ASSERT_TRUE(string_builder->Append("Bob").ok()); + ASSERT_TRUE(bool_builder->Append(false).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(struct_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &struct_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecodeWithProjection) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int16()), + arrow::field("f3", arrow::int32()), + arrow::field("f4", arrow::int64()), + arrow::field("f5", arrow::float32()), + arrow::field("f6", arrow::float64()), + arrow::field("f7", arrow::utf8()), + arrow::field("f8", arrow::binary()), + arrow::field("f9", arrow::map(arrow::float64(), arrow::float64())), + arrow::field("f10", arrow::map(arrow::utf8(), arrow::utf8())), + arrow::field("f11", arrow::list(arrow::float32())), + arrow::field("f12", arrow::struct_({arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int64())})), + arrow::field("f13", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("f14", arrow::date32()), + arrow::field("f15", arrow::decimal128(2, 2)), + arrow::field("f16", arrow::decimal128(10, 10)), + arrow::field("f17", arrow::decimal128(19, 19))}; + + std::shared_ptr src_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [true, 127, 32767, 2147483647, 9999999999999, 1234.56, 1234567890.0987654321, "aa", "qq", [[1.1,10.1],[2.2,20.2]], [["key1","val1"],["key2","val2"]], [0.1, 0.2], [true, null], "1970-01-01 00:02:03.123123", 2456, "0.22", "0.1234567890", "0.1234567890987654321"], + [false, -128, -32768, -2147483648, -9999999999999, -1234.56, -1234567890.0987654321, null, "ww", [[1.11,10.11],[2.22,20.22]], [["key11","val11"],["key22","val22"]], [-0.1, -0.2, null, 0.3, 0.4], [null, 2], "1970-01-01 00:16:39.999999", null, "-0.22", "-0.1234567890", null], + [null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null] + ])") + .ValueOrDie(); + + // no skip + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip bool + CheckResultWithProjection(src_array, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip int + CheckResultWithProjection(src_array, {0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip long + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip float + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip double + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip string + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip binary + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip map + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 17}); + // skip array-based map + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17}); + // skip list + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17}); + // skip struct + CheckResultWithProjection(src_array, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17}); + // skip others + CheckResultWithProjection(src_array, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}); + // skip null and union is already tested in above test cases +} + +TEST_F(AvroDirectEncoderDecoderTest, TestArrayType) { + std::string schema_json = R"({ + "type": "array", + "items": "int" + })"; + + // Create list array + arrow::ListBuilder list_builder(arrow::default_memory_pool(), + std::make_shared()); + auto int_builder = static_cast(list_builder.value_builder()); + + // First list: [1, 2, 3] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(1).ok()); + ASSERT_TRUE(int_builder->Append(2).ok()); + ASSERT_TRUE(int_builder->Append(3).ok()); + + // Second list: [] + ASSERT_TRUE(list_builder.Append().ok()); + + // Third list: [42] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(42).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(list_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &list_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestMapType) { + std::string schema_json = R"({ + "type": "map", + "values": "string" + })"; + + // Create map array + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + + // First map: {"key1": "value1", "key2": "value2"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append("key1").ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + ASSERT_TRUE(key_builder->Append("key2").ok()); + ASSERT_TRUE(value_builder->Append("value2").ok()); + + // Second map: {} + ASSERT_TRUE(map_builder.Append().ok()); + + // Third map: {"single": "entry"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append("single").ok()); + ASSERT_TRUE(value_builder->Append("entry").ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &map_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestArrayBasedMapType) { + std::string schema_json = R"({ + "type" : "array", + "items" : { + "type" : "record", + "name" : "record_f1", + "fields" : [ { + "name" : "key", + "type" : "int" + }, { + "name" : "value", + "type" : "string" + } ] + }, + "logicalType" : "map" + })"; + + // Create map array + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + + // First map: {111: "value1", 222: "value2"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(111).ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + ASSERT_TRUE(key_builder->Append(222).ok()); + ASSERT_TRUE(value_builder->Append("value2").ok()); + + // Second map: {} + ASSERT_TRUE(map_builder.Append().ok()); + + // Third map: {333: "entry"} + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(333).ok()); + ASSERT_TRUE(value_builder->Append("entry").ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &map_builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecimalType) { + std::string schema_json = R"({ + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 2 + })"; + + // Create decimal array + auto decimal_type = arrow::decimal128(10, 2); + arrow::Decimal128Builder builder(decimal_type); + + ASSERT_TRUE(builder.Append(arrow::Decimal128("123.45")).ok()); + ASSERT_TRUE(builder.Append(arrow::Decimal128("-67.89")).ok()); + ASSERT_TRUE(builder.Append(arrow::Decimal128("0.00")).ok()); + + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + CheckResult(schema_json, input_array, &builder); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestEncoderErrorCases) { + std::string schema_json = R"({"type": "int"})"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + { + // Test out of bounds row index + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + -1, encoder.get(), &encode_ctx_), + "Row index -1 out of bounds 1"); + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + 1, encoder.get(), &encode_ctx_), + "Row index 1 out of bounds 1"); + } + { + // Test null value in non-nullable field + arrow::Int32Builder nullable_builder; + ASSERT_TRUE(nullable_builder.AppendNull().ok()); + std::shared_ptr nullable_array; + ASSERT_TRUE(nullable_builder.Finish(&nullable_array).ok()); + + ASSERT_NOK_WITH_MSG( + AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *nullable_array, 0, + encoder.get(), &encode_ctx_), + "Null value in non-nullable field"); + } +} + +TEST_F(AvroDirectEncoderDecoderTest, TestDecoderErrorCases) { + std::string schema_json = R"(["null", "int"])"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + // Test with invalid union branch index, branch index 2, but union only has 2 branches (0,1) + std::vector invalid_data = {0x04}; + auto input_stream = ::avro::memoryInputStream(invalid_data.data(), invalid_data.size()); + auto decoder = ::avro::binaryDecoder(); + decoder->init(*input_stream); + + arrow::Int32Builder builder; + ASSERT_NOK_WITH_MSG( + AvroDirectDecoder::DecodeAvroToBuilder(avro_schema.root(), std::nullopt, decoder.get(), + &builder, &decode_ctx_), + "Union branch index 2 out of range [0, 2)"); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidUnionType) { + auto run = [&](const std::string& schema_json, const std::string& error_msg) { + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(AvroDirectEncoder::EncodeArrowToAvro(avro_schema.root(), *input_array, + 0, encoder.get(), &encode_ctx_), + error_msg); + }; + // Test union with more than 2 branches + run(R"(["null", "int", "string"])", "Union must have exactly 2 branches, got 3"); + // Test union with null branch not first + run(R"(["int", "null"])", + "Unexpected: In paimon, we expect the null branch to be the first branch in a union."); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidMapType) { + std::string schema_json = R"({ + "type": "map", + "values": "string" + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + auto key_builder = static_cast(map_builder.key_builder()); + auto value_builder = static_cast(map_builder.item_builder()); + ASSERT_TRUE(map_builder.Append().ok()); + ASSERT_TRUE(key_builder->Append(1).ok()); + ASSERT_TRUE(value_builder->Append("value1").ok()); + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "AVRO_MAP keys must be StringArray, got int32"); +} + +TEST_F(AvroDirectEncoderDecoderTest, TestInvalidArrayBasedMapType) { + std::string schema_json = R"({ + "type" : "array", + "items" : { + "type" : "record", + "name" : "record_f1", + "fields" : [ { + "name" : "key", + "type" : "int" + }, { + "name" : "value", + "type" : "string" + }, { + "name" : "metadata", + "type" : "string" + } ] + }, + "logicalType" : "map" + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + + arrow::MapBuilder map_builder(arrow::default_memory_pool(), + std::make_shared(), + std::make_shared()); + ASSERT_TRUE(map_builder.Append().ok()); + std::shared_ptr input_array; + ASSERT_TRUE(map_builder.Finish(&input_array).ok()); + + ASSERT_NOK_WITH_MSG(EncodeData(avro_schema.root(), input_array), + "Expected AVRO_RECORD for map key-value pair"); +} + +#ifndef NDEBUG +TEST_F(AvroDirectEncoderDecoderTest, TestTypeMismatch) { + // Test string schema with int array (The type-mismatch issue should not occur, so we only + // perform type conversion checks in debug mode.) + std::string schema_json = R"({"type": "string"})"; + auto avro_schema = ::avro::compileJsonSchemaFromString(schema_json); + auto output_stream = ::avro::memoryOutputStream(); + auto encoder = ::avro::binaryEncoder(); + encoder->init(*output_stream); + + arrow::Int32Builder builder; + ASSERT_TRUE(builder.Append(42).ok()); + std::shared_ptr input_array; + ASSERT_TRUE(builder.Finish(&input_array).ok()); + + ASSERT_THROW(auto status = AvroDirectEncoder::EncodeArrowToAvro( + avro_schema.root(), *input_array, 0, encoder.get(), &encode_ctx_), + std::bad_cast); +} +#endif + +} // namespace paimon::avro::test