From 994dfc5e8bc12a6e686c2254dd36a9b8cd511901 Mon Sep 17 00:00:00 2001 From: "jinli.zjw" Date: Thu, 18 Jun 2026 11:08:18 +0800 Subject: [PATCH] feat(format): introduce blob file format --- .../format/blob/blob_file_batch_reader.cpp | 334 +++++++++++++ .../format/blob/blob_file_batch_reader.h | 182 ++++++++ .../blob/blob_file_batch_reader_test.cpp | 370 +++++++++++++++ src/paimon/format/blob/blob_file_format.h | 76 +++ .../format/blob/blob_file_format_factory.cpp | 37 ++ .../format/blob/blob_file_format_factory.h | 43 ++ .../blob/blob_file_format_factory_test.cpp | 34 ++ src/paimon/format/blob/blob_format_writer.cpp | 212 +++++++++ src/paimon/format/blob/blob_format_writer.h | 99 ++++ .../format/blob/blob_format_writer_test.cpp | 437 ++++++++++++++++++ src/paimon/format/blob/blob_reader_builder.h | 61 +++ .../format/blob/blob_stats_extractor.cpp | 64 +++ src/paimon/format/blob/blob_stats_extractor.h | 61 +++ .../format/blob/blob_stats_extractor_test.cpp | 111 +++++ src/paimon/format/blob/blob_writer_builder.h | 85 ++++ .../format/blob/blob_writer_builder_test.cpp | 57 +++ 16 files changed, 2263 insertions(+) create mode 100644 src/paimon/format/blob/blob_file_batch_reader.cpp create mode 100644 src/paimon/format/blob/blob_file_batch_reader.h create mode 100644 src/paimon/format/blob/blob_file_batch_reader_test.cpp create mode 100644 src/paimon/format/blob/blob_file_format.h create mode 100644 src/paimon/format/blob/blob_file_format_factory.cpp create mode 100644 src/paimon/format/blob/blob_file_format_factory.h create mode 100644 src/paimon/format/blob/blob_file_format_factory_test.cpp create mode 100644 src/paimon/format/blob/blob_format_writer.cpp create mode 100644 src/paimon/format/blob/blob_format_writer.h create mode 100644 src/paimon/format/blob/blob_format_writer_test.cpp create mode 100644 src/paimon/format/blob/blob_reader_builder.h create mode 100644 src/paimon/format/blob/blob_stats_extractor.cpp create mode 100644 src/paimon/format/blob/blob_stats_extractor.h create mode 100644 src/paimon/format/blob/blob_stats_extractor_test.cpp create mode 100644 src/paimon/format/blob/blob_writer_builder.h create mode 100644 src/paimon/format/blob/blob_writer_builder_test.cpp diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp new file mode 100644 index 0000000..52df797 --- /dev/null +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/builder_dict.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/bridge.h" +#include "arrow/util/bit_util.h" +#include "fmt/format.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/io/offset_input_stream.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/common/utils/stream_utils.h" +#include "paimon/data/blob.h" + +namespace paimon::blob { + +Result> BlobFileBatchReader::Create( + const std::shared_ptr& input_stream, int32_t batch_size, bool blob_as_descriptor, + const std::shared_ptr& pool) { + if (input_stream == nullptr) { + return Status::Invalid("blob file batch reader create failed: input stream is nullptr"); + } + if (batch_size <= 0) { + return Status::Invalid(fmt::format( + "blob file batch reader create failed: read batch size '{}' should be larger than zero", + batch_size)); + } + + PAIMON_ASSIGN_OR_RAISE(uint64_t file_size, input_stream->Length()); + PAIMON_RETURN_NOT_OK( + input_stream->Seek(file_size - BlobDefs::kBlobFileHeaderLength, FS_SEEK_SET)); + int8_t header[BlobDefs::kBlobFileHeaderLength]; + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_size, + input_stream->Read(reinterpret_cast(header), BlobDefs::kBlobFileHeaderLength)); + if (actual_size != BlobDefs::kBlobFileHeaderLength) { + return Status::Invalid( + fmt::format("actual read size {} not match with expect header length {}", actual_size, + BlobDefs::kBlobFileHeaderLength)); + } + int8_t version = header[4]; + if (version != BlobDefs::kFileVersion) { + return Status::Invalid(fmt::format( + "create blob format reader failed. unsupported blob file version: {}", version)); + } + int32_t index_length = GetIndexLength(header, 0); + PAIMON_RETURN_NOT_OK(input_stream->Seek( + file_size - BlobDefs::kBlobFileHeaderLength - index_length, FS_SEEK_SET)); + std::vector index_bytes(index_length, '\0'); + PAIMON_ASSIGN_OR_RAISE(actual_size, input_stream->Read(index_bytes.data(), index_length)); + if (actual_size != index_length) { + return Status::Invalid( + fmt::format("actual read size {} not match with expect index length {}", actual_size, + index_length)); + } + PAIMON_ASSIGN_OR_RAISE(const std::vector blob_lengths, + DeltaVarintCompressor::Decompress(index_bytes)); + + std::vector blob_offsets; + blob_offsets.reserve(blob_lengths.size()); + int64_t offset = 0; + for (const auto& blob_length : blob_lengths) { + blob_offsets.push_back(offset); + // Null blobs (bin_length == -1) don't occupy file space + if (blob_length >= 0) { + offset += blob_length; + } + } + PAIMON_ASSIGN_OR_RAISE(std::string file_path, input_stream->GetUri()); + auto reader = std::unique_ptr(new BlobFileBatchReader( + input_stream, file_path, blob_lengths, blob_offsets, batch_size, blob_as_descriptor, pool)); + return reader; +} + +BlobFileBatchReader::BlobFileBatchReader(const std::shared_ptr& input_stream, + const std::string& file_path, + const std::vector& blob_lengths, + const std::vector& blob_offsets, + int32_t batch_size, bool blob_as_descriptor, + const std::shared_ptr& pool) + : input_stream_(input_stream), + file_path_(file_path), + all_blob_lengths_(blob_lengths), + all_blob_offsets_(blob_offsets), + target_blob_lengths_(blob_lengths), + target_blob_offsets_(blob_offsets), + batch_size_(batch_size), + blob_as_descriptor_(blob_as_descriptor), + pool_(pool), + arrow_pool_(GetArrowPool(pool_)), + metrics_(std::make_shared()) { + target_blob_row_indexes_.resize(target_blob_lengths_.size()); + std::iota(target_blob_row_indexes_.begin(), target_blob_row_indexes_.end(), 0); +} + +Status BlobFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, + const std::shared_ptr& predicate, + const std::optional& selection_bitmap) { + if (!read_schema) { + return Status::Invalid("SetReadSchema failed: read schema cannot be nullptr"); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_schema, + arrow::ImportSchema(read_schema)); + if (arrow_schema->num_fields() != 1) { + return Status::Invalid( + fmt::format("read schema field number {} is not 1", arrow_schema->num_fields())); + } + if (!BlobUtils::IsBlobField(arrow_schema->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", arrow_schema->field(0)->ToString())); + } + if (selection_bitmap != std::nullopt) { + int32_t cardinality = selection_bitmap->Cardinality(); + std::vector new_lengths(cardinality); + std::vector new_offsets(cardinality); + std::vector new_row_indexes(cardinality); + + PAIMON_ASSIGN_OR_RAISE(uint64_t total_rows, GetNumberOfRows()); + RoaringBitmap32::Iterator iterator(*selection_bitmap); + for (int32_t i = 0; i < cardinality; i++) { + int32_t row_index = *iterator; + if (static_cast(row_index) >= total_rows) { + return Status::Invalid(fmt::format( + "row index {} is out of bound of total row number {}", row_index, total_rows)); + } + ++iterator; + new_lengths[i] = all_blob_lengths_[row_index]; + new_offsets[i] = all_blob_offsets_[row_index]; + new_row_indexes[i] = row_index; + } + target_blob_lengths_ = new_lengths; + target_blob_offsets_ = new_offsets; + target_blob_row_indexes_ = new_row_indexes; + } + target_type_ = arrow::struct_(arrow_schema->fields()); + current_pos_ = 0; + previous_batch_first_row_number_ = std::numeric_limits::max(); + + return Status::OK(); +} + +Result> BlobFileBatchReader::NextBlobOffsets( + int32_t rows_to_read) const { + arrow::TypedBufferBuilder buffer_builder(arrow_pool_.get()); + PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Reserve(rows_to_read + 1)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Append(0)); + int64_t data_length = 0; + for (int32_t k = 0; k < rows_to_read; ++k) { + const size_t i = current_pos_ + k; + // Null blobs contribute zero bytes to content + if (!IsTargetNull(i)) { + data_length += GetTargetContentLength(i); + } + PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Append(data_length)); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr offset_buffer, + buffer_builder.Finish()); + return offset_buffer; +} + +Result> BlobFileBatchReader::NextBlobContents( + int32_t rows_to_read) const { + int64_t total_length = 0; + for (int32_t k = 0; k < rows_to_read; ++k) { + const size_t i = current_pos_ + k; + if (!IsTargetNull(i)) { + total_length += GetTargetContentLength(i); + } + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr data_buffer, + arrow::AllocateBuffer(total_length, arrow_pool_.get())); + uint8_t* buffer = data_buffer->mutable_data(); + for (int32_t k = 0; k < rows_to_read; ++k) { + const size_t i = current_pos_ + k; + if (IsTargetNull(i)) { + continue; + } + int64_t offset = GetTargetContentOffset(i); + int64_t length = GetTargetContentLength(i); + PAIMON_RETURN_NOT_OK(ReadBlobContentAt(offset, length, buffer)); + buffer += length; + } + return data_buffer; +} + +Result> BlobFileBatchReader::BuildNullBitmap( + int32_t rows_to_read) const { + bool has_null = false; + for (int32_t k = 0; k < rows_to_read; ++k) { + if (IsTargetNull(current_pos_ + k)) { + has_null = true; + break; + } + } + if (!has_null) { + return std::shared_ptr(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr null_bitmap, + arrow::AllocateBitmap(rows_to_read, arrow_pool_.get())); + // Initialize all bits to 1 (valid), then clear bits for null rows + memset(null_bitmap->mutable_data(), 0xFF, null_bitmap->size()); + for (int32_t k = 0; k < rows_to_read; ++k) { + if (IsTargetNull(current_pos_ + k)) { + arrow::bit_util::ClearBit(null_bitmap->mutable_data(), k); + } + } + return null_bitmap; +} + +Result> BlobFileBatchReader::BuildContentArray( + int32_t rows_to_read) const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_offsets, + NextBlobOffsets(rows_to_read)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data, NextBlobContents(rows_to_read)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr child_null_bitmap, + BuildNullBitmap(rows_to_read)); + + auto large_binary_array = std::make_shared(rows_to_read, value_offsets, + data, child_null_bitmap); + std::vector> child_data; + child_data.emplace_back(large_binary_array->data()); + std::shared_ptr struct_array_data = + arrow::ArrayData::Make(target_type_, large_binary_array->length(), {nullptr}, child_data); + return std::make_shared(struct_array_data); +} + +Result> BlobFileBatchReader::BuildTargetArray( + int32_t rows_to_read) const { + std::shared_ptr blob_array; + if (!blob_as_descriptor_) { + return BuildContentArray(rows_to_read); + } + // For descriptor mode, build using StructBuilder to handle nulls properly + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, + arrow::MakeBuilder(target_type_, arrow_pool_.get())); + auto builder = dynamic_cast(array_builder.get()); + if (builder == nullptr) { + return Status::Invalid("cast to struct builder failed"); + } + auto field_builder = dynamic_cast(builder->field_builder(0)); + if (field_builder == nullptr) { + return Status::Invalid("cast to large binary builder failed"); + } + for (int32_t k = 0; k < rows_to_read; ++k) { + const size_t i = current_pos_ + k; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append()); + if (IsTargetNull(i)) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(field_builder->AppendNull()); + } else { + int64_t offset = GetTargetContentOffset(i); + int64_t length = GetTargetContentLength(i); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr blob, + Blob::FromPath(file_path_, offset, length)); + auto descriptor = blob->ToDescriptor(pool_); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + field_builder->Append(descriptor->data(), descriptor->size())); + } + } + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&array)); + return array; +} + +Result BlobFileBatchReader::NextBatch() { + if (closed_) { + return Status::Invalid("blob file batch reader is closed"); + } + if (target_type_ == nullptr) { + return Status::Invalid("target type is nullptr, call SetReadSchema first"); + } + if (current_pos_ >= target_blob_lengths_.size()) { + PAIMON_ASSIGN_OR_RAISE(previous_batch_first_row_number_, GetNumberOfRows()); + return BatchReader::MakeEofBatch(); + } + int32_t left_rows = target_blob_lengths_.size() - current_pos_; + int32_t rows_to_read = std::min(left_rows, batch_size_); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr blob_array, + BuildTargetArray(rows_to_read)); + std::unique_ptr c_array = std::make_unique(); + std::unique_ptr c_schema = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*blob_array, c_array.get(), c_schema.get())); + previous_batch_first_row_number_ = target_blob_row_indexes_[current_pos_]; + current_pos_ += rows_to_read; + return make_pair(std::move(c_array), std::move(c_schema)); +} + +Status BlobFileBatchReader::ReadBlobContentAt(const int64_t offset, const int64_t length, + uint8_t* content) const { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr offset_input_stream, + OffsetInputStream::Create(input_stream_, length, offset)); + return StreamUtils::ReadAsyncFully(std::move(offset_input_stream), + reinterpret_cast(content)); +} + +int32_t BlobFileBatchReader::GetIndexLength(const int8_t* bytes, int32_t offset) { + return static_cast( + (static_cast(static_cast(bytes[offset + 3])) << 24) | + (static_cast(static_cast(bytes[offset + 2])) << 16) | + (static_cast(static_cast(bytes[offset + 1])) << 8) | + static_cast(static_cast(bytes[offset]))); +} + +// Note: blob file has no self-describing schema, use read schema instead. +Result> BlobFileBatchReader::GetFileSchema() const { + return Status::NotImplemented("blob file has no self-describing file schema"); +} + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h new file mode 100644 index 0000000..159d055 --- /dev/null +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/memory_pool.h" +#include "arrow/type.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/predicate/predicate.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/reader/file_batch_reader.h" +#include "paimon/result.h" +#include "paimon/utils/roaring_bitmap32.h" + +namespace paimon::blob { + +/// Binary Blob File Layout Specification +/// +/// This file format is designed for the efficient storage of a sequence of 'bins' (data +/// blocks/records) with associated metadata. The structure consists of one or more data bins +/// (bin_0, bin_1, ...), followed by an Index section and a Footer. +/// +/// Endianness: +/// - All multi-byte fields (magic number, bin length, crc32, index len, index) use little-endian +/// byte order. +/// +/// ==================================================================== +/// 1. Data Bins Section +/// ==================================================================== +/// The file consists of one or more contiguous 'bins' (bin_0, bin_1, bin_2, ...). +/// The structure of each bin is as follows: +/// +/// | Field Name | Length (bytes) | Description | +/// |-------------------|----------------|---------------------------------------------------------| +/// | magic number | 4 | A fixed number identifying the start of the block. | +/// | blob content | bin len - 16 | The actual data payload. | +/// | bin length | 8 | The total length of the entire bin (including metadata).| +/// | bin CRC32 | 4 | The 32-bit Cyclic Redundancy Checksum for the bin. | +/// +/// Note: +/// - Current magic number is 1481511375. +/// +/// ==================================================================== +/// 2. Index Section +/// ==================================================================== +/// The Index is located after all data bins and is used for quick lookup and management. +/// +/// Purpose: Records the lengths (record lens) of all data bins. +/// +/// Encoding: +/// - Uses Delta Encoding to store differences between successive length values. +/// - Uses Varints (Variable-length Integers) to store long values efficiently. +/// +/// ==================================================================== +/// 3. File Footer +/// ==================================================================== +/// Metadata located at the very end of the file, describing the index and file version. +/// +/// | Field Name | Length (bytes) | Description | +/// |---------------|----------------|---------------------------------------------------| +/// | Index Len | 4 | The byte length of the preceding Index section. | +/// | version | 1 | The file format version number. | +/// +/// Note: +/// - Current version is 1. +class BlobFileBatchReader : public FileBatchReader { + public: + static Result> Create( + const std::shared_ptr& input_stream, int32_t batch_size, + bool blob_as_descriptor, const std::shared_ptr& pool); + + Result> GetFileSchema() const override; + + Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, + const std::optional& selection_bitmap) override; + + Result NextBatch() override; + + Result GetPreviousBatchFirstRowNumber() const override { + if (all_blob_lengths_.size() != target_blob_lengths_.size()) { + return Status::Invalid( + "Cannot call GetPreviousBatchFirstRowNumber in BlobFileBatchReader because, after " + "bitmap pushdown, rows in the array returned by NextBatch are no longer " + "contiguous."); + } + return previous_batch_first_row_number_; + } + + Result GetNumberOfRows() const override { + return all_blob_lengths_.size(); + } + + std::shared_ptr GetReaderMetrics() const override { + return metrics_; + } + + void Close() override { + closed_ = true; + } + + bool SupportPreciseBitmapSelection() const override { + return true; + } + + private: + static constexpr uint64_t kDefaultReadChunkSize = 1024 * 1024; + + static int32_t GetIndexLength(const int8_t* bytes, int32_t offset); + + BlobFileBatchReader(const std::shared_ptr& input_stream, + const std::string& file_path, const std::vector& blob_lengths, + const std::vector& blob_offsets, int32_t batch_size, + bool blob_as_descriptor, const std::shared_ptr& pool); + + Status ReadBlobContentAt(const int64_t offset, const int64_t length, uint8_t* content) const; + + Result> NextBlobOffsets(int32_t rows_to_read) const; + Result> NextBlobContents(int32_t rows_to_read) const; + /// Builds a null bitmap buffer for the given rows. Returns nullptr if no nulls. + Result> BuildNullBitmap(int32_t rows_to_read) const; + Result> BuildContentArray(int32_t rows_to_read) const; + Result> BuildTargetArray(int32_t rows_to_read) const; + + /// Returns true if the blob at the given index is null (bin_length == kNullBinLength). + bool IsTargetNull(size_t index) const { + return target_blob_lengths_[index] == BlobDefs::kNullBinLength; + } + + int64_t GetTargetContentOffset(size_t index) const { + return target_blob_offsets_[index] + BlobDefs::kContentStartOffset; + } + + int64_t GetTargetContentLength(size_t index) const { + return target_blob_lengths_[index] - BlobDefs::kTotalMetaLength; + } + + std::shared_ptr input_stream_; + const std::string file_path_; + const std::vector all_blob_lengths_; + const std::vector all_blob_offsets_; + + std::vector target_blob_lengths_; + std::vector target_blob_offsets_; + std::vector target_blob_row_indexes_; + + const int32_t batch_size_; + const bool blob_as_descriptor_; + std::shared_ptr pool_; + std::shared_ptr arrow_pool_; + + std::shared_ptr target_type_; + std::shared_ptr metrics_; + + size_t current_pos_ = 0; + uint64_t previous_batch_first_row_number_ = std::numeric_limits::max(); + bool closed_ = false; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_file_batch_reader_test.cpp b/src/paimon/format/blob/blob_file_batch_reader_test.cpp new file mode 100644 index 0000000..3fc9051 --- /dev/null +++ b/src/paimon/format/blob/blob_file_batch_reader_test.cpp @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_batch_reader.h" + +#include "arrow/api.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobFileBatchReaderTest : public testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + void CheckResult(const std::string& table_path, const std::string& paimon_blob_file, + const std::vector& original_blob_files, bool blob_as_descriptor, + const std::optional& selection_bitmap = std::nullopt) { + auto schema = arrow::schema({BlobUtils::ToArrowField(blob_field_name_, false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/" + paimon_blob_file)); + ASSERT_OK_AND_ASSIGN(auto reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, + blob_as_descriptor, pool_)); + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, selection_bitmap)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + if (chunked_array == nullptr) { + ASSERT_EQ(0, original_blob_files.size()); + return; + } + + std::shared_ptr combined_array = + arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + if (original_blob_files.size() == 0) { + ASSERT_EQ(0, combined_array->length()); + return; + } + auto struct_array = std::dynamic_pointer_cast(combined_array); + ASSERT_TRUE(struct_array); + auto blob_array = + std::dynamic_pointer_cast(struct_array->field(0)); + ASSERT_EQ(blob_array->length(), original_blob_files.size()); + for (size_t i = 0; i < original_blob_files.size(); i++) { + ASSERT_OK_AND_ASSIGN(auto origin_input_stream, + fs->Open(table_path + "/" + original_blob_files[i])); + ASSERT_OK_AND_ASSIGN(auto origin_length, origin_input_stream->Length()); + auto origin_bytes = Bytes::AllocateBytes(origin_length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + origin_input_stream->Read(origin_bytes->data(), origin_length)); + ASSERT_EQ(actual_read_length, origin_length); + if (blob_as_descriptor) { + auto blob_descriptor = blob_array->GetString(i); + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromDescriptor(blob_descriptor.data(), + blob_descriptor.size())); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(fs)); + ASSERT_OK_AND_ASSIGN(auto pos, input_stream->GetPos()); + ASSERT_EQ(pos, 0); + ASSERT_OK_AND_ASSIGN(auto length, input_stream->Length()); + auto bytes = Bytes::AllocateBytes(length, pool_.get()); + ASSERT_OK_AND_ASSIGN(auto actual_read_length, + input_stream->Read(bytes->data(), length)); + ASSERT_EQ(actual_read_length, length); + ASSERT_EQ(length, origin_length); + ASSERT_EQ(*bytes, *origin_bytes); + } else { + auto blob_data = blob_array->GetString(i); + ASSERT_EQ(blob_data.size(), origin_length); + std::string origin_data(origin_bytes->data(), origin_length); + ASSERT_EQ(blob_data, origin_data); + } + } + } + + private: + std::string blob_field_name_; + std::shared_ptr pool_; +}; + +TEST_P(BlobFileBatchReaderTest, TestSimple) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin", "blob_1_b81cf9f4.bin", "blob_2_470e1dfe.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_3_07b08c4d.bin", "blob_4_67007c96.bin"}, blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_7_6bcae65e.bin", + "blob_8_5fba0737.bin"}, + blob_as_descriptor); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", + {"blob_9_f54d253c.bin"}, blob_as_descriptor); +} + +TEST_P(BlobFileBatchReaderTest, TestPushdownBitmap) { + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + bool blob_as_descriptor = GetParam(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + RoaringBitmap32 roaring_0; + roaring_0.Add(0); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", + {"blob_0_811d5dab.bin"}, blob_as_descriptor, roaring_0); + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", + {"blob_4_67007c96.bin"}, blob_as_descriptor, roaring_1); + RoaringBitmap32 roaring_2; + roaring_2.Add(0); + roaring_2.Add(1); + roaring_2.Add(3); + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", + {"blob_5_f7099dea.bin", "blob_6_6b6706ef.bin", "blob_8_5fba0737.bin"}, + blob_as_descriptor, roaring_2); + RoaringBitmap32 roaring_3; + CheckResult(table_path, "data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", {}, + blob_as_descriptor, roaring_3); +} + +TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(3, number_of_rows); + ASSERT_EQ(std::numeric_limits::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); + ArrowArrayRelease(batch1.first.get()); + ArrowSchemaRelease(batch1.second.get()); + ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); + ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch2.first.get()); + ArrowSchemaRelease(batch2.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); + ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ArrowArrayRelease(batch3.first.get()); + ArrowSchemaRelease(batch3.second.get()); + ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); + ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); +} + +TEST_F(BlobFileBatchReaderTest, InvalidScenario) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared(); + std::string test_data_path = paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/0, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: read batch size '0' should be larger than zero"); + } + { + ASSERT_NOK_WITH_MSG( + BlobFileBatchReader::Create(/*input_stream=*/nullptr, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_), + "blob file batch reader create failed: input stream is nullptr"); + } + { + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(/*input_stream=*/input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), + "blob file has no self-describing file schema"); + ASSERT_TRUE(reader->GetReaderMetrics()); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), + "target type is nullptr, call SetReadSchema first"); + reader->Close(); + ASSERT_NOK_WITH_MSG(reader->NextBatch(), "blob file batch reader is closed"); + } +} + +TEST_P(BlobFileBatchReaderTest, EmptyFile) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto file_system = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr output_stream, + file_system->Create(dir->Str() + "/file.blob", /*overwrite=*/true)); + std::shared_ptr blob_field = BlobUtils::ToArrowField("blob_col"); + auto struct_type = arrow::struct_({blob_field}); + bool blob_as_descriptor = GetParam(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor, output_stream, struct_type, + file_system, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + ASSERT_OK(output_stream->Flush()); + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system->Open(dir->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN(auto reader, BlobFileBatchReader::Create( + input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + + ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); + ASSERT_EQ(0, number_of_rows); + ASSERT_EQ(std::numeric_limits::max(), + reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); + ASSERT_TRUE(BatchReader::IsEofBatch(batch)); +} + +TEST_F(BlobFileBatchReaderTest, SetReadSchemaWithInvalidInputs) { + { + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(/*read_schema=*/nullptr, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "SetReadSchema failed: read schema cannot be nullptr"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false), + BlobUtils::ToArrowField("my_blob_field_2", false)}); + + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "read schema field number 2 is not 1"); + } + { + auto blob_field = arrow::field("my_blob_field", arrow::large_binary()); + + auto schema = arrow::schema({blob_field}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + ASSERT_NOK_WITH_MSG(reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "field my_blob_field: large_binary is not BLOB"); + } + { + auto schema = arrow::schema({BlobUtils::ToArrowField("my_blob_field", false)}); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + std::string test_data_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/"; + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(paimon::test::TestUtil::CopyDirectory(test_data_path, table_path)); + + std::shared_ptr fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr input_stream, + fs->Open(table_path + "/bucket-0/data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob")); + ASSERT_OK_AND_ASSIGN( + auto reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); + RoaringBitmap32 roaring; + roaring.Add(0); + roaring.Add(1); + roaring.Add(2); + roaring.Add(3); + roaring.Add(4); + ASSERT_NOK_WITH_MSG( + reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, /*selection_bitmap=*/roaring), + "Invalid: row index 3 is out of bound of total row number 3"); + } +} + +INSTANTIATE_TEST_SUITE_P(BlobAsDescriptor, BlobFileBatchReaderTest, ::testing::Values(true, false)); + +} // namespace paimon::blob::test diff --git a/src/paimon/format/blob/blob_file_format.h b/src/paimon/format/blob/blob_file_format.h new file mode 100644 index 0000000..486cd04 --- /dev/null +++ b/src/paimon/format/blob/blob_file_format.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/blob/blob_reader_builder.h" +#include "paimon/format/blob/blob_stats_extractor.h" +#include "paimon/format/blob/blob_writer_builder.h" +#include "paimon/format/file_format.h" + +struct ArrowSchema; + +namespace paimon { + +class WriterBuilder; +class ReaderBuilder; +class FormatStatsExtractor; + +namespace blob { + +class BlobFileFormat : public FileFormat { + public: + explicit BlobFileFormat(const std::map& options) + : identifier_("blob"), options_(options) {} + + const std::string& Identifier() const override { + return identifier_; + } + + Result> CreateReaderBuilder(int32_t batch_size) const override { + return std::make_unique(batch_size, options_); + } + + Result> CreateWriterBuilder(::ArrowSchema* schema, + int32_t batch_size) const override { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr typed_schema, + arrow::ImportSchema(schema)); + auto data_type = arrow::struct_(typed_schema->fields()); + return std::make_unique(data_type, options_); + } + + Result> CreateStatsExtractor( + ::ArrowSchema* schema) const override { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr typed_schema, + arrow::ImportSchema(schema)); + return std::make_unique(typed_schema); + } + + private: + std::string identifier_; + std::map options_; +}; + +} // namespace blob +} // namespace paimon diff --git a/src/paimon/format/blob/blob_file_format_factory.cpp b/src/paimon/format/blob/blob_file_format_factory.cpp new file mode 100644 index 0000000..8d0f858 --- /dev/null +++ b/src/paimon/format/blob/blob_file_format_factory.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_format_factory.h" + +#include + +#include "paimon/factories/factory.h" +#include "paimon/format/blob/blob_file_format.h" + +namespace paimon::blob { + +const char BlobFileFormatFactory::IDENTIFIER[] = "blob"; + +Result> BlobFileFormatFactory::Create( + const std::map& options) const { + return std::make_unique(options); +} + +REGISTER_PAIMON_FACTORY(BlobFileFormatFactory); + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_file_format_factory.h b/src/paimon/format/blob/blob_file_format_factory.h new file mode 100644 index 0000000..681be5c --- /dev/null +++ b/src/paimon/format/blob/blob_file_format_factory.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/result.h" + +namespace paimon::blob { + +class BlobFileFormatFactory : public FileFormatFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::map& options) const override; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_file_format_factory_test.cpp b/src/paimon/format/blob/blob_file_format_factory_test.cpp new file mode 100644 index 0000000..0589898 --- /dev/null +++ b/src/paimon/format/blob/blob_file_format_factory_test.cpp @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_file_format_factory.h" + +#include "gtest/gtest.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +TEST(BlobFileFormatFactoryTest, TestIdentifier) { + BlobFileFormatFactory factory; + ASSERT_EQ(std::string(factory.Identifier()), "blob"); + ASSERT_OK_AND_ASSIGN(auto file_format, factory.Create({})); + ASSERT_EQ(file_format->Identifier(), "blob"); +} + +} // namespace paimon::blob::test diff --git a/src/paimon/format/blob/blob_format_writer.cpp b/src/paimon/format/blob/blob_format_writer.cpp new file mode 100644 index 0000000..d9e8f4d --- /dev/null +++ b/src/paimon/format/blob/blob_format_writer.cpp @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_format_writer.h" + +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/delta_varint_compressor.h" +#include "paimon/data/blob.h" +#include "paimon/io/byte_array_input_stream.h" + +namespace paimon::blob { + +BlobFormatWriter::BlobFormatWriter(bool blob_as_descriptor, + const std::shared_ptr& out, + const std::shared_ptr& data_type, + const std::shared_ptr& fs, + const std::shared_ptr& pool) + : blob_as_descriptor_(blob_as_descriptor), + out_(out), + data_type_(data_type), + fs_(fs), + pool_(pool) { + metrics_ = std::make_shared(); + tmp_buffer_ = Bytes::AllocateBytes(kTmpBufferSize, pool_.get()); +} + +Result> BlobFormatWriter::Create( + bool blob_as_descriptor, const std::shared_ptr& out, + const std::shared_ptr& data_type, const std::shared_ptr& fs, + const std::shared_ptr& pool) { + if (out == nullptr) { + return Status::Invalid("blob format writer create failed. out is nullptr"); + } + if (data_type == nullptr) { + return Status::Invalid("blob format writer create failed. data_type is nullptr"); + } + if (pool == nullptr) { + return Status::Invalid("blob format writer create failed. pool is nullptr"); + } + if (data_type->num_fields() != 1) { + return Status::Invalid( + fmt::format("blob data type field number {} is not 1", data_type->num_fields())); + } + if (!BlobUtils::IsBlobField(data_type->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", data_type->field(0)->ToString())); + } + return std::unique_ptr( + new BlobFormatWriter(blob_as_descriptor, out, data_type, fs, pool)); +} + +Status BlobFormatWriter::AddBatch(ArrowArray* batch) { + if (batch == nullptr) { + return Status::Invalid("blob format writer add batch failed. batch is nullptr"); + } + if (batch->length != 1) { + return Status::Invalid("BlobFormatWriter only supports batch with a row count of 1"); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(batch, data_type_)); + + assert(arrow_array->num_fields() == 1); + auto struct_array = arrow::internal::checked_pointer_cast(arrow_array); + auto child_array = struct_array->field(0); + + // Struct-level null is not supported (caller should not pass null struct rows) + if (struct_array->IsNull(0)) { + return Status::Invalid("BlobFormatWriter does not support struct-level null."); + } + // Child-level null: record kNullBinLength, skip data writing (aligned with Java) + if (child_array->IsNull(0)) { + bin_lengths_.push_back(BlobDefs::kNullBinLength); + return Status::OK(); + } + + if (child_array->type_id() != arrow::Type::type::LARGE_BINARY) { + return Status::Invalid("BlobFormatWriter only support large binary type."); + } + + const auto& blob_array = + arrow::internal::checked_cast(*child_array); + assert(blob_array.length() == 1); + PAIMON_RETURN_NOT_OK(WriteBlob(blob_array.GetView(0))); + + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); +} + +Status BlobFormatWriter::Flush() { + return out_->Flush(); +} + +Status BlobFormatWriter::Finish() { + // index + const auto& index_bytes = DeltaVarintCompressor::Compress(bin_lengths_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_bytes.data(), index_bytes.size())); + // header + PAIMON_UNIQUE_PTR index_length_bytes = + IntegerToLittleEndian(static_cast(index_bytes.size()), pool_); + PAIMON_RETURN_NOT_OK(WriteBytes(index_length_bytes->data(), index_length_bytes->size())); + PAIMON_RETURN_NOT_OK(WriteBytes(reinterpret_cast(&BlobDefs::kFileVersion), + sizeof(BlobDefs::kFileVersion))); + + PAIMON_RETURN_NOT_OK(Flush()); + + tmp_buffer_.reset(); + return Status::OK(); +} + +Status BlobFormatWriter::WriteBlob(std::string_view blob_data) { + crc32_ = 0; + PAIMON_ASSIGN_OR_RAISE(int64_t previous_pos, out_->GetPos()); + + // write magic number + static PAIMON_UNIQUE_PTR kMagicNumberBytes = + IntegerToLittleEndian(BlobDefs::kMagicNumber, pool_); + PAIMON_RETURN_NOT_OK(WriteWithCrc32(kMagicNumberBytes->data(), kMagicNumberBytes->size())); + + // write blob content + std::unique_ptr in; + if (blob_as_descriptor_) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr blob, + Blob::FromDescriptor(blob_data.data(), blob_data.size())); + PAIMON_ASSIGN_OR_RAISE(in, blob->NewInputStream(fs_)); + } else { + in = std::make_unique(blob_data.data(), blob_data.size()); + } + PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, in->Length()); + uint64_t total_read_length = 0; + auto read_len = static_cast(std::min(file_length, tmp_buffer_->size())); + while (read_len > 0) { + PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, in->Read(tmp_buffer_->data(), read_len)); + if (static_cast(actual_read_len) != read_len) { + return Status::Invalid("actual read length {}, not match with expect length {}", + actual_read_len, read_len); + } + PAIMON_RETURN_NOT_OK(WriteWithCrc32(tmp_buffer_->data(), actual_read_len)); + total_read_length += actual_read_len; + read_len = static_cast( + std::min(file_length - total_read_length, tmp_buffer_->size())); + } + + // write bin length + PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, out_->GetPos()); + /// magic number(4) + blob content(bin length - 16) + bin length(8) + crc32(4) + /// ↑ ↑ + /// previous_pos current_pos + int64_t bin_length = current_pos - previous_pos + 8 + 4; + bin_lengths_.push_back(bin_length); + PAIMON_UNIQUE_PTR bin_length_bytes = IntegerToLittleEndian(bin_length, pool_); + PAIMON_RETURN_NOT_OK(WriteWithCrc32(bin_length_bytes->data(), bin_length_bytes->size())); + + // write crc32 + PAIMON_UNIQUE_PTR crc32_bytes = IntegerToLittleEndian(crc32_, pool_); + PAIMON_RETURN_NOT_OK(WriteBytes(crc32_bytes->data(), crc32_bytes->size())); + + return Status::OK(); +} + +Status BlobFormatWriter::WriteBytes(const char* data, int32_t length) { + PAIMON_ASSIGN_OR_RAISE(int32_t actual, out_->Write(data, length)); + if (actual != length) { + return Status::Invalid("not suppose actual length {} not match with expect {}", actual, + length); + } + return Status::OK(); +} + +Status BlobFormatWriter::WriteWithCrc32(const char* data, int32_t length) { + crc32_ = arrow::internal::crc32(crc32_, data, length); + return WriteBytes(data, length); +} + +Result BlobFormatWriter::ReachTargetSize(bool suggested_check, int64_t target_size) const { + PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, out_->GetPos()); + return current_pos >= target_size; +} + +template +PAIMON_UNIQUE_PTR BlobFormatWriter::IntegerToLittleEndian( + T value, const std::shared_ptr& pool) { + static_assert(std::is_integral_v, "IntegerToLittleEndian() only supports integral types."); + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN); + out.WriteValue(value); + return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); +} + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_format_writer.h b/src/paimon/format/blob/blob_format_writer.h new file mode 100644 index 0000000..542fe26 --- /dev/null +++ b/src/paimon/format/blob/blob_format_writer.h @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/util/crc32.h" +#include "paimon/format/format_writer.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +} // namespace arrow +struct ArrowArray; + +namespace paimon { +class Blob; +class FileSystem; +class Metrics; +class OutputStream; +} // namespace paimon + +namespace paimon::blob { + +// Blob format: +// https://cwiki.apache.org/confluence/display/PAIMON/PIP-35%3A+Introduce+Blob+to+store+multimodal+data +class BlobFormatWriter : public FormatWriter { + public: + static Result> Create( + bool blob_as_descriptor, const std::shared_ptr& out, + const std::shared_ptr& data_type, const std::shared_ptr& fs, + const std::shared_ptr& pool); + + Status AddBatch(ArrowArray* batch) override; + + Status Flush() override; + + Status Finish() override; + + Result ReachTargetSize(bool suggested_check, int64_t target_size) const override; + + std::shared_ptr GetWriterMetrics() const override { + return metrics_; + } + + private: + BlobFormatWriter(bool blob_as_descriptor, const std::shared_ptr& out, + const std::shared_ptr& data_type, + const std::shared_ptr& fs, + const std::shared_ptr& pool); + + Status WriteBlob(std::string_view blob_data); + + Status WriteBytes(const char* data, int32_t length); + Status WriteWithCrc32(const char* data, int32_t length); + + template + static PAIMON_UNIQUE_PTR IntegerToLittleEndian(T value, + const std::shared_ptr& pool); + + public: + static constexpr uint32_t kTmpBufferSize = 1024 * 1024; + + private: + bool blob_as_descriptor_; + uint32_t crc32_ = 0; + std::vector bin_lengths_; + std::shared_ptr out_; + PAIMON_UNIQUE_PTR tmp_buffer_; + std::shared_ptr data_type_; + std::shared_ptr fs_; + std::shared_ptr pool_; + std::shared_ptr metrics_; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_format_writer_test.cpp b/src/paimon/format/blob/blob_format_writer_test.cpp new file mode 100644 index 0000000..9902225 --- /dev/null +++ b/src/paimon/format/blob/blob_format_writer_test.cpp @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_format_writer.h" + +#include +#include + +#include "arrow/c/bridge.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/stream_utils.h" +#include "paimon/data/blob.h" +#include "paimon/format/blob/blob_file_batch_reader.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { +class BlobFormatWriterTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override { + blob_as_descriptor_ = GetParam(); + pool_ = GetDefaultPool(); + dir_ = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + file_system_ = std::make_shared(); + ASSERT_OK_AND_ASSIGN(output_stream_, + file_system_->Create(dir_->Str() + "/file.blob", /*overwrite=*/true)); + + struct_type_ = arrow::struct_({BlobUtils::ToArrowField("blob_col", true)}); + } + void TearDown() override { + ASSERT_OK(output_stream_->Flush()); + ASSERT_OK(output_stream_->Close()); + } + + Result> PrepareBlobArray( + const std::shared_ptr& blob) const { + arrow::StructBuilder struct_builder(struct_type_, arrow::default_memory_pool(), + {std::make_shared()}); + auto blob_builder = + static_cast(struct_builder.field_builder(0)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append()); + if (blob_as_descriptor_) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(blob_builder->Append( + blob->ToDescriptor(pool_)->data(), blob->ToDescriptor(pool_)->size())); + } else { + PAIMON_ASSIGN_OR_RAISE(auto blob_data, blob->ToData(file_system_, pool_)); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + blob_builder->Append(blob_data->data(), blob_data->size())); + } + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array)); + return array; + } + + Status AddBatchOnce(const std::shared_ptr& format_writer, + const std::shared_ptr& blob_array) const { + auto c_array = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*blob_array, c_array.get())); + return format_writer->AddBatch(c_array.get()); + } + + private: + bool blob_as_descriptor_; + std::shared_ptr pool_; + std::unique_ptr dir_; + std::shared_ptr output_stream_; + std::shared_ptr file_system_; + std::shared_ptr struct_type_; +}; + +INSTANTIATE_TEST_SUITE_P(BlobAsDescriptor, BlobFormatWriterTest, ::testing::Values(false, true)); + +TEST_P(BlobFormatWriterTest, TestSimple) { + // write + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + std::vector> expected_blobs; + std::string file1 = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; + ASSERT_OK_AND_ASSIGN(std::shared_ptr blob1, Blob::FromPath(file1)); + expected_blobs.emplace_back(blob1); + ASSERT_OK_AND_ASSIGN(auto array1, PrepareBlobArray(blob1)); + ASSERT_OK(AddBatchOnce(writer, array1)); + ASSERT_OK(writer->Flush()); + + std::string file2 = paimon::test::GetDataDir() + "/xxhash.data"; + ASSERT_OK_AND_ASSIGN(std::shared_ptr blob2, + Blob::FromPath(file2, /*offset=*/0, /*length=*/91)); + expected_blobs.emplace_back(blob2); + ASSERT_OK_AND_ASSIGN(auto array2, PrepareBlobArray(blob2)); + ASSERT_OK(AddBatchOnce(writer, array2)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr blob3, + Blob::FromPath(file2, /*offset=*/92, /*length=*/85)); + expected_blobs.emplace_back(blob3); + ASSERT_OK_AND_ASSIGN(auto array3, PrepareBlobArray(blob3)); + ASSERT_OK(AddBatchOnce(writer, array3)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // read + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_TRUE(input_stream); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, blob_as_descriptor_, pool_)); + auto schema = arrow::schema(struct_type_->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + ASSERT_OK( + reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + + // check result + if (blob_as_descriptor_) { + auto concat_array = arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + auto struct_array = arrow::internal::checked_pointer_cast(concat_array); + ASSERT_TRUE(struct_array); + ASSERT_OK_AND_ASSIGN(std::vector> result_blobs, + paimon::test::TestHelper::ToBlobs(struct_array)); + ASSERT_OK_AND_ASSIGN(bool equal, paimon::test::TestHelper::CheckBlobsEqual( + result_blobs, expected_blobs, file_system_)); + ASSERT_TRUE(equal); + } else { + auto expected_chunk_array = + arrow::ChunkedArray::Make({array1, array2, array3}).ValueOrDie(); + ASSERT_TRUE(expected_chunk_array->Equals(chunked_array)) + << expected_chunk_array->ToString() << chunked_array->ToString(); + } +} + +TEST_P(BlobFormatWriterTest, TestCreateWithInvalidParameters) { + // Test with nullptr output stream + ASSERT_NOK_WITH_MSG( + BlobFormatWriter::Create(blob_as_descriptor_, nullptr, struct_type_, file_system_, pool_), + "blob format writer create failed. out is nullptr"); + + // Test with nullptr data type + ASSERT_NOK_WITH_MSG( + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, nullptr, file_system_, pool_), + "blob format writer create failed. data_type is nullptr"); + + // Test with nullptr memory pool + ASSERT_NOK_WITH_MSG(BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, nullptr), + "blob format writer create failed. pool is nullptr"); + + // Test with invalid field count (more than 1 field) + auto multi_field_type = arrow::struct_( + {arrow::field("blob_col1", arrow::binary()), arrow::field("blob_col2", arrow::binary())}); + ASSERT_NOK_WITH_MSG(BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, + multi_field_type, file_system_, pool_), + "blob data type field number 2 is not 1"); + + // Test with non-blob field (missing blob metadata) + auto non_blob_field = arrow::field("regular_col", arrow::binary()); + auto non_blob_type = arrow::struct_({non_blob_field}); + ASSERT_NOK_WITH_MSG(BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, non_blob_type, + file_system_, pool_), + "field regular_col: binary is not BLOB"); +} + +TEST_P(BlobFormatWriterTest, TestInvalidCase) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Test nullptr batch + ASSERT_NOK_WITH_MSG(writer->AddBatch(nullptr), + "blob format writer add batch failed. batch is nullptr"); + + // Test invalid blob + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromPath("test_path", 0, 10)); + if (blob_as_descriptor_) { + ASSERT_OK_AND_ASSIGN(auto array, PrepareBlobArray(std::move(blob))); + ASSERT_NOK_WITH_MSG(AddBatchOnce(writer, array), "File 'test_path' not exists"); + } else { + ASSERT_NOK_WITH_MSG(PrepareBlobArray(std::move(blob)), "File 'test_path' not exists"); + } +} + +TEST_P(BlobFormatWriterTest, TestAddBatchWithInvalidBatchLength) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Test batch with wrong length (not 1) + arrow::StructBuilder struct_builder(struct_type_, arrow::default_memory_pool(), + {std::make_shared()}); + auto blob_builder = static_cast(struct_builder.field_builder(0)); + + // Add two rows instead of one + ASSERT_OK_AND_ASSIGN(auto blob, Blob::FromPath(paimon::test::GetDataDir() + "/xxhash.data")); + ASSERT_TRUE(struct_builder.Append().ok()); + auto blob_descriptor = blob->ToDescriptor(pool_); + ASSERT_TRUE(blob_builder->Append(blob_descriptor->data(), blob_descriptor->size()).ok()); + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(blob_builder->Append(blob_descriptor->data(), blob_descriptor->size()).ok()); + + std::shared_ptr array; + ASSERT_TRUE(struct_builder.Finish(&array).ok()); + auto c_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, c_array.get()).ok()); + + ASSERT_NOK_WITH_MSG(writer->AddBatch(c_array.get()), + "BlobFormatWriter only supports batch with a row count of 1"); + ArrowArrayRelease(c_array.get()); +} + +TEST_P(BlobFormatWriterTest, TestReachTargetSize) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Initially should not reach target size + ASSERT_OK_AND_ASSIGN(bool reached, writer->ReachTargetSize(true, 1000)); + ASSERT_FALSE(reached); + + // Add some data + std::string file = paimon::test::GetDataDir() + "/xxhash.data"; + ASSERT_OK_AND_ASSIGN(std::shared_ptr blob, Blob::FromPath(file)); + ASSERT_OK_AND_ASSIGN(auto array, PrepareBlobArray(blob)); + ASSERT_OK(AddBatchOnce(writer, array)); + ASSERT_OK(writer->Flush()); + + // Check if we reach a small target size + ASSERT_OK_AND_ASSIGN(reached, writer->ReachTargetSize(true, 10)); + ASSERT_TRUE(reached); + + // Check if we don't reach a large target size + ASSERT_OK_AND_ASSIGN(reached, writer->ReachTargetSize(true, 100000)); + ASSERT_FALSE(reached); +} + +TEST_P(BlobFormatWriterTest, TestGetWriterMetrics) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + auto metrics = writer->GetWriterMetrics(); + ASSERT_TRUE(metrics); +} + +TEST_P(BlobFormatWriterTest, TestEmptyWriter) { + // Test creating a writer and finishing without adding any data + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // Verify the file is the same with java + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_TRUE(input_stream); + ASSERT_OK_AND_ASSIGN(uint64_t file_length, input_stream->Length()); + ASSERT_EQ(file_length, 5); // Should have footer even if no data + std::vector buffer(file_length); + ASSERT_OK_AND_ASSIGN(auto read_length, input_stream->Read(buffer.data(), buffer.size())); + ASSERT_EQ(read_length, 5); + std::vector expected = {0x00, 0x00, 0x00, 0x00, 0x01}; + ASSERT_EQ(buffer, expected); +} + +TEST_P(BlobFormatWriterTest, TestLargeBlob) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Create a temporary large file for testing + std::string large_file_path = dir_->Str() + "/large_test_file.bin"; + ASSERT_OK_AND_ASSIGN(auto large_file_stream, + file_system_->Create(large_file_path, /*overwrite=*/true)); + + // Write data larger than TMP_BUFFER_SIZE (1MB) + const size_t large_size = BlobFormatWriter::kTmpBufferSize * 2 + 1000; // ~2MB + std::vector large_data(large_size, 'A'); + ASSERT_OK_AND_ASSIGN(int32_t written, large_file_stream->Write(large_data.data(), large_size)); + ASSERT_EQ(written, large_size); + ASSERT_OK(large_file_stream->Flush()); + ASSERT_OK(large_file_stream->Close()); + + // Create blob from large file and write it + ASSERT_OK_AND_ASSIGN(std::shared_ptr large_blob, Blob::FromPath(large_file_path)); + ASSERT_OK_AND_ASSIGN(auto array, PrepareBlobArray(large_blob)); + ASSERT_OK(AddBatchOnce(writer, array)); + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // Verify we can read it back + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, blob_as_descriptor_, pool_)); + auto schema = arrow::schema(struct_type_->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + ASSERT_OK( + reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + + // check result + if (blob_as_descriptor_) { + auto concat_array = arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + auto struct_array = arrow::internal::checked_pointer_cast(concat_array); + ASSERT_TRUE(struct_array); + ASSERT_OK_AND_ASSIGN(std::vector> result_blobs, + paimon::test::TestHelper::ToBlobs(struct_array)); + ASSERT_OK_AND_ASSIGN(bool equal, paimon::test::TestHelper::CheckBlobsEqual( + result_blobs, {large_blob}, file_system_)); + ASSERT_TRUE(equal); + } else { + auto expected_chunk_array = arrow::ChunkedArray::Make({array}).ValueOrDie(); + ASSERT_TRUE(expected_chunk_array->Equals(chunked_array)); + } +} + +TEST_P(BlobFormatWriterTest, TestAddBatchWithNullValues) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Write one row with child-level null blob + arrow::StructBuilder struct_builder(struct_type_, arrow::default_memory_pool(), + {std::make_shared()}); + auto blob_builder = static_cast(struct_builder.field_builder(0)); + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(blob_builder->AppendNull().ok()); + std::shared_ptr null_child_array; + ASSERT_TRUE(struct_builder.Finish(&null_child_array).ok()); + auto c_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*null_child_array, c_array.get()).ok()); + ASSERT_OK(writer->AddBatch(c_array.get())); + + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // Read back and verify + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_TRUE(input_stream); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, blob_as_descriptor_, pool_)); + auto schema = arrow::schema(struct_type_->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + ASSERT_OK( + reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + + auto concat_array = arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + auto result_struct = arrow::internal::checked_pointer_cast(concat_array); + ASSERT_TRUE(result_struct); + ASSERT_EQ(result_struct->length(), 1); + ASSERT_TRUE(result_struct->field(0)->IsNull(0)); + + // Struct-level null should still be rejected + arrow::StructBuilder struct_builder2(struct_type_, arrow::default_memory_pool(), + {std::make_shared()}); + ASSERT_TRUE(struct_builder2.AppendNull().ok()); + std::shared_ptr null_struct_array; + ASSERT_TRUE(struct_builder2.Finish(&null_struct_array).ok()); + auto null_c_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*null_struct_array, null_c_array.get()).ok()); + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer2, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + ASSERT_NOK_WITH_MSG(writer2->AddBatch(null_c_array.get()), + "BlobFormatWriter does not support struct-level null."); + ArrowArrayRelease(null_c_array.get()); +} + +TEST_P(BlobFormatWriterTest, TestAddBatchWithZeroLengthBlob) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + + // Create a zero-length file + std::string zero_file_path = dir_->Str() + "/zero_length_file.bin"; + ASSERT_OK_AND_ASSIGN(auto zero_file_stream, + file_system_->Create(zero_file_path, /*overwrite=*/true)); + ASSERT_OK(zero_file_stream->Flush()); + ASSERT_OK(zero_file_stream->Close()); + + // Create blob from zero-length file + ASSERT_OK_AND_ASSIGN(std::shared_ptr zero_blob, Blob::FromPath(zero_file_path)); + + // This should work - zero-length blobs should be supported + ASSERT_OK_AND_ASSIGN(auto array, PrepareBlobArray(zero_blob)); + ASSERT_OK(AddBatchOnce(writer, array)); + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // Verify the file is the same with java + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_TRUE(input_stream); + ASSERT_OK_AND_ASSIGN(uint64_t file_length, input_stream->Length()); + ASSERT_EQ(file_length, 22); + std::vector buffer(file_length); + ASSERT_OK_AND_ASSIGN(auto read_length, + input_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_EQ(read_length, 22); + std::vector expected = {{0xcf, 0x11, 0x4e, 0x58, 0x10, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x53, 0x7f, 0xdf, 0x03, + 0x20, 0x01, 0x00, 0x00, 0x00, 0x01}}; + ASSERT_EQ(buffer, expected); +} + +} // namespace paimon::blob::test diff --git a/src/paimon/format/blob/blob_reader_builder.h b/src/paimon/format/blob/blob_reader_builder.h new file mode 100644 index 0000000..0bffbf3 --- /dev/null +++ b/src/paimon/format/blob/blob_reader_builder.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/utils/options_utils.h" +#include "paimon/format/blob/blob_file_batch_reader.h" +#include "paimon/format/reader_builder.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::blob { + +class BlobReaderBuilder : public ReaderBuilder { + public: + BlobReaderBuilder(int32_t batch_size, const std::map& options) + : batch_size_(batch_size), pool_(GetDefaultPool()), options_(options) {} + + ReaderBuilder* WithMemoryPool(const std::shared_ptr& pool) override { + pool_ = pool; + return this; + } + + Result> Build( + const std::shared_ptr& input_stream) const override { + PAIMON_ASSIGN_OR_RAISE( + bool blob_as_descriptor, + OptionsUtils::GetValueFromMap(options_, Options::BLOB_AS_DESCRIPTOR, false)); + return BlobFileBatchReader::Create(input_stream, batch_size_, blob_as_descriptor, pool_); + } + + Result> Build(const std::string& path) const override { + return Status::Invalid("do not support build reader with path in blob format"); + } + + private: + int32_t batch_size_; + std::shared_ptr pool_; + std::map options_; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_stats_extractor.cpp b/src/paimon/format/blob/blob_stats_extractor.cpp new file mode 100644 index 0000000..12a7abd --- /dev/null +++ b/src/paimon/format/blob/blob_stats_extractor.cpp @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_stats_extractor.h" + +#include +#include + +#include "arrow/api.h" +#include "fmt/format.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/format/blob/blob_file_batch_reader.h" +#include "paimon/format/column_stats.h" +#include "paimon/fs/file_system.h" +#include "paimon/status.h" + +namespace paimon { +class MemoryPool; +} // namespace paimon + +namespace paimon::blob { + +Result> +BlobStatsExtractor::ExtractWithFileInfo(const std::shared_ptr& file_system, + const std::string& path, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr input_stream, file_system->Open(path)); + assert(input_stream); + if (write_schema_->num_fields() != 1) { + return Status::Invalid( + fmt::format("schema field number {} is not 1", write_schema_->num_fields())); + } + if (!BlobUtils::IsBlobField(write_schema_->field(0))) { + return Status::Invalid( + fmt::format("field {} is not BLOB", write_schema_->field(0)->ToString())); + } + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr blob_reader, + BlobFileBatchReader::Create(input_stream, + /*batch_size=*/1024, /*blob_as_descriptor=*/true, pool)); + ColumnStatsVector result_stats; + result_stats.push_back( + ColumnStats::CreateStringColumnStats(std::nullopt, std::nullopt, /*null_count=*/0)); + PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, blob_reader->GetNumberOfRows()); + return std::make_pair(result_stats, FileInfo(num_rows)); +} + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_stats_extractor.h b/src/paimon/format/blob/blob_stats_extractor.h new file mode 100644 index 0000000..eed11d8 --- /dev/null +++ b/src/paimon/format/blob/blob_stats_extractor.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/format/format_stats_extractor.h" +#include "paimon/result.h" +#include "paimon/type_fwd.h" + +namespace arrow { +class DataType; +class Schema; +} // namespace arrow +namespace paimon { +class FileSystem; +class MemoryPool; +} // namespace paimon + +namespace paimon::blob { + +class BlobStatsExtractor : public FormatStatsExtractor { + public: + explicit BlobStatsExtractor(const std::shared_ptr& write_schema) + : write_schema_(write_schema) {} + + Result Extract(const std::shared_ptr& file_system, + const std::string& path, + const std::shared_ptr& pool) override { + PAIMON_ASSIGN_OR_RAISE(auto result, ExtractWithFileInfo(file_system, path, pool)); + return result.first; + } + + Result> ExtractWithFileInfo( + const std::shared_ptr& file_system, const std::string& path, + const std::shared_ptr& pool) override; + + private: + std::shared_ptr write_schema_; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_stats_extractor_test.cpp b/src/paimon/format/blob/blob_stats_extractor_test.cpp new file mode 100644 index 0000000..e6c8d69 --- /dev/null +++ b/src/paimon/format/blob/blob_stats_extractor_test.cpp @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_stats_extractor.h" + +#include +#include + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/defs.h" +#include "paimon/format/column_stats.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { + +class BlobStatsExtractorTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + // Create a blob schema with one blob field + blob_field_ = BlobUtils::ToArrowField("blob_field", false); + blob_schema_ = arrow::schema({blob_field_}); + fs_ = std::make_shared(); + } + + private: + std::shared_ptr pool_; + std::shared_ptr blob_field_; + std::shared_ptr blob_schema_; + std::shared_ptr fs_; +}; + +TEST_F(BlobStatsExtractorTest, TestDifferentBlobFiles) { + BlobStatsExtractor extractor(blob_schema_); + + std::vector> test_files = { + {"data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob", 3}, + {"data-d7816e8e-6c6d-4e28-9137-837cdf706350-2.blob", 2}, + {"data-d7816e8e-6c6d-4e28-9137-837cdf706350-3.blob", 4}, + {"data-d7816e8e-6c6d-4e28-9137-837cdf706350-4.blob", 1}}; + + for (const auto& [filename, expected_rows] : test_files) { + std::string blob_file_path = + paimon::test::GetDataDir() + "/db_with_blob.db/table_with_blob/bucket-0/" + filename; + + ASSERT_OK_AND_ASSIGN(auto stats_with_info, + extractor.ExtractWithFileInfo(fs_, blob_file_path, pool_)); + + // Check stats structure + ASSERT_EQ(1u, stats_with_info.first.size()); + ASSERT_TRUE(stats_with_info.first[0]); + ASSERT_EQ(FieldType::STRING, stats_with_info.first[0]->GetFieldType()); + ASSERT_EQ("min null, max null, null count 0", stats_with_info.first[0]->ToString()); + + // Check row count matches expected + ASSERT_EQ(expected_rows, stats_with_info.second.GetRowCount()) + << "Row count mismatch for file: " << filename; + } +} + +TEST_F(BlobStatsExtractorTest, TestInvalidCase) { + std::string blob_file_path = paimon::test::GetDataDir() + + "/db_with_blob.db/table_with_blob/bucket-0/" + "data-d7816e8e-6c6d-4e28-9137-837cdf706350-1.blob"; + + // Should fail because schema has more than 1 field + { + auto int_field = arrow::field("int_field", arrow::int32()); + auto multi_field_schema = arrow::schema({blob_field_, int_field}); + BlobStatsExtractor extractor(multi_field_schema); + ASSERT_NOK_WITH_MSG(extractor.ExtractWithFileInfo(fs_, blob_file_path, pool_), + "schema field number 2 is not 1"); + } + // Should fail because field is not a blob field + { + auto string_field = arrow::field("string_field", arrow::utf8()); + auto non_blob_schema = arrow::schema({string_field}); + BlobStatsExtractor extractor(non_blob_schema); + ASSERT_NOK_WITH_MSG(extractor.ExtractWithFileInfo(fs_, blob_file_path, pool_), + "field string_field: string is not BLOB"); + } + // Should fail because file doesn't exist + { + BlobStatsExtractor extractor(blob_schema_); + std::string non_existent_path = "/path/that/does/not/exist.blob"; + ASSERT_NOK_WITH_MSG(extractor.ExtractWithFileInfo(fs_, non_existent_path, pool_), + "File '/path/that/does/not/exist.blob' not exists"); + } +} + +} // namespace paimon::blob::test diff --git a/src/paimon/format/blob/blob_writer_builder.h b/src/paimon/format/blob/blob_writer_builder.h new file mode 100644 index 0000000..1fc1a58 --- /dev/null +++ b/src/paimon/format/blob/blob_writer_builder.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/defs.h" +#include "paimon/format/blob/blob_format_writer.h" +#include "paimon/format/format_writer.h" +#include "paimon/format/writer_builder.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +} // namespace arrow +namespace paimon { +class FileSystem; +class OutputStream; +} // namespace paimon + +namespace paimon::blob { + +class BlobWriterBuilder : public SpecificFSWriterBuilder { + public: + BlobWriterBuilder(const std::shared_ptr& data_type, + const std::map& options) + : pool_(GetDefaultPool()), data_type_(data_type), options_(options) { + assert(data_type_); + } + + WriterBuilder* WithMemoryPool(const std::shared_ptr& pool) override { + pool_ = pool; + return this; + } + + SpecificFSWriterBuilder* WithFileSystem(const std::shared_ptr& fs) override { + fs_ = fs; + return this; + } + + Result> Build(const std::shared_ptr& out, + const std::string& compression) override { + assert(out); + if (fs_ == nullptr) { + return Status::Invalid("File system is nullptr. Please call WithFileSystem() first."); + } + PAIMON_ASSIGN_OR_RAISE( + bool blob_as_descriptor, + OptionsUtils::GetValueFromMap(options_, Options::BLOB_AS_DESCRIPTOR, false)); + return BlobFormatWriter::Create(blob_as_descriptor, out, data_type_, fs_, pool_); + } + + private: + std::shared_ptr pool_; + std::shared_ptr data_type_; + std::map options_; + std::shared_ptr fs_; +}; + +} // namespace paimon::blob diff --git a/src/paimon/format/blob/blob_writer_builder_test.cpp b/src/paimon/format/blob/blob_writer_builder_test.cpp new file mode 100644 index 0000000..2350eba --- /dev/null +++ b/src/paimon/format/blob/blob_writer_builder_test.cpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/blob/blob_writer_builder.h" + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::blob::test { +class BlobWriterBuilderTest : public ::testing::Test { + public: + void SetUp() override { + dir_ = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + file_system_ = std::make_shared(); + ASSERT_OK_AND_ASSIGN(output_stream_, + file_system_->Create(dir_->Str() + "/file.blob", /*overwrite=*/true)); + struct_type_ = arrow::struct_({BlobUtils::ToArrowField("blob_col", false)}); + } + void TearDown() override {} + + private: + std::unique_ptr dir_; + std::shared_ptr output_stream_; + std::shared_ptr file_system_; + std::shared_ptr struct_type_; +}; + +TEST_F(BlobWriterBuilderTest, TestSimple) { + BlobWriterBuilder builder(struct_type_, {}); + ASSERT_NOK_WITH_MSG(builder.Build(output_stream_, "none"), + "File system is nullptr. Please call WithFileSystem() first."); + + builder.WithFileSystem(file_system_); + ASSERT_OK(builder.Build(output_stream_, "none")); +} + +} // namespace paimon::blob::test