diff --git a/src/paimon/core/operation/append_only_file_store_write.cpp b/src/paimon/core/operation/append_only_file_store_write.cpp new file mode 100644 index 0000000..427a0fa --- /dev/null +++ b/src/paimon/core/operation/append_only_file_store_write.cpp @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/append_only_file_store_write.h" + +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/core/append/append_only_writer.h" +#include "paimon/core/append/bucketed_append_compact_manager.h" +#include "paimon/core/compact/noop_compact_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_file_writer.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/append_only_file_store_scan.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/executor.h" +#include "paimon/logging.h" +#include "paimon/read_context.h" +#include "paimon/result.h" +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class DataFilePathFactory; +class MemoryPool; +class SchemaManager; + +AppendOnlyFileStoreWrite::AppendOnlyFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr& executor, const std::shared_ptr& pool) + : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, schema_manager, commit_user, + root_path, table_schema, schema, write_schema, partition_schema, + dv_maintainer_factory, io_manager, options, ignore_previous_files, + is_streaming_mode, ignore_num_bucket_check, executor, pool), + logger_(Logger::GetLogger("AppendOnlyFileStoreWrite")) { + write_cols_ = write_schema->field_names(); + auto schemas = BlobUtils::SeparateBlobSchema(schema_); + if (schemas.blob_schema && schemas.blob_schema->num_fields() > 0) { + with_blob_ = true; + } + // optimize write_cols to null in following cases: + // 1. write_schema contains all columns + // 2. TODO(xinyu.lxy) write_schema contains all columns and append _ROW_ID & _SEQUENCE_NUMBER + // cols + if (schema->Equals(write_schema)) { + write_cols_ = std::nullopt; + } +} + +AppendOnlyFileStoreWrite::~AppendOnlyFileStoreWrite() = default; + +Result> AppendOnlyFileStoreWrite::CreateFileStoreScan( + const std::shared_ptr& scan_filter) const { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, pool_)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, + options_.GetManifestTargetFileSize(), pool_, options_, + partition_schema_)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan, + AppendOnlyFileStoreScan::Create( + snapshot_manager_, schema_manager_, manifest_list, manifest_file, + table_schema_, schema_, scan_filter, options_, executor_, pool_)); + return scan; +} + +Result>> AppendOnlyFileStoreWrite::CompactRewrite( + const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, + const std::vector>& to_compact, + const std::shared_ptr& cancellation_controller) { + if (to_compact.empty()) { + return std::vector>{}; + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr reader, + CreateFilesReader(partition, bucket, dv_factory, to_compact)); + auto rewriter = + std::make_unique>>( + options_.GetTargetFileSize(/*has_primary_key=*/false), + GetDataFileWriterCreator(partition, bucket, write_schema_, write_cols_, to_compact)); + + ScopeGuard reader_guard([&]() { + if (reader) { + reader->Close(); + } + }); + + ScopeGuard rewriter_guard([&]() { + if (rewriter) { + (void)rewriter->Close(); + } + }); + + while (true) { + if (cancellation_controller->IsCancelled()) { + return Status::Cancelled("Compaction cancelled while rewriting files."); + } + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, reader->NextBatch()); + if (BatchReader::IsEofBatch(batch)) { + break; + } + auto& [c_array, c_schema] = batch; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + auto struct_array = std::dynamic_pointer_cast(arrow_array); + if (!struct_array) { + return Status::Invalid( + "cannot cast array to StructArray in CompleteRowKindBatchReader"); + } + PAIMON_ASSIGN_OR_RAISE(struct_array, ArrowUtils::RemoveFieldFromStructArray( + struct_array, SpecialFields::ValueKind().Name())); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*struct_array, c_array.get(), c_schema.get())); + ArrowSchemaRelease(c_schema.get()); + ScopeGuard guard([array = c_array.get()]() { ArrowArrayRelease(array); }); + PAIMON_RETURN_NOT_OK(rewriter->Write(c_array.get())); + guard.Release(); + } + rewriter_guard.Release(); + PAIMON_RETURN_NOT_OK(rewriter->Close()); + return rewriter->GetResult(); +} + +Result> AppendOnlyFileStoreWrite::CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, const std::shared_ptr& dv_maintainer) { + PAIMON_LOG_DEBUG(logger_, "Creating append only writer for partition %s, bucket %d", + partition.ToString().c_str(), bucket); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + file_store_path_factory_->CreateDataFilePathFactory(partition, bucket)); + + std::shared_ptr compact_manager; + auto schemas = BlobUtils::SeparateBlobSchema(write_schema_); + if (options_.WriteOnly() || options_.DataEvolutionEnabled() || options_.GetBucket() == -1 || + with_blob_) { + compact_manager = std::make_shared(); + } else { + auto dv_factory = + [dv_maintainer]( + const std::string& file_name) -> Result> { + if (dv_maintainer) { + return dv_maintainer->DeletionVectorOf(file_name).value_or( + std::shared_ptr()); + } + return std::shared_ptr(); + }; + auto cancellation_controller = std::make_shared(); + + auto rewriter = [this, partition, bucket, dv_factory, cancellation_controller]( + const std::vector>& to_compact) + -> Result>> { + return CompactRewrite(partition, bucket, dv_factory, to_compact, + cancellation_controller); + }; + + compact_manager = std::make_shared( + compact_executor_, restore_data_files, dv_maintainer, + options_.GetCompactionMinFileNum(), + options_.GetTargetFileSize(/*has_primary_key=*/false), + options_.GetCompactionFileSize(/*has_primary_key=*/false), + options_.CompactionForceRewriteAllFiles(), rewriter, + compaction_metrics_->CreateReporter(partition, bucket), cancellation_controller); + } + + auto writer = std::make_shared( + options_, table_schema_->Id(), write_schema_, write_cols_, restore_max_seq_number, + data_file_path_factory, compact_manager, pool_); + return std::shared_ptr(writer); +} + +AppendOnlyFileStoreWrite::SingleFileWriterCreator +AppendOnlyFileStoreWrite::GetDataFileWriterCreator( + const BinaryRow& partition, int32_t bucket, const std::shared_ptr& schema, + const std::optional>& write_cols, + const std::vector>& to_compact) const { + return + [this, partition, bucket, schema, write_cols, to_compact]() + -> Result< + std::unique_ptr>>> { + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + auto format = options_.GetFileFormat(); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(pool_); + + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr data_file_path_factory, + file_store_path_factory_->CreateDataFilePathFactory(partition, bucket)); + auto writer = std::make_unique( + options_.GetFileCompression(), std::function(), + table_schema_->Id(), + std::make_shared(to_compact[0]->min_sequence_number), + FileSource::Compact(), stats_extractor, data_file_path_factory->IsExternalPath(), + write_cols, pool_); + PAIMON_RETURN_NOT_OK(writer->Init(options_.GetFileSystem(), + data_file_path_factory->NewPath(), writer_builder)); + return writer; + }; +} + +Result> AppendOnlyFileStoreWrite::CreateFilesReader( + const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, + const std::vector>& files) const { + ReadContextBuilder context_builder(root_path_); + context_builder.SetOptions(options_.ToMap()) + .WithFileSystem(options_.GetFileSystem()) + .EnablePrefetch(true) + .SetPrefetchMaxParallelNum(1) + .SetPrefetchBatchCount(3) + .WithMemoryPool(pool_); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, context_builder.Finish()); + std::map options = options_.ToMap(); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high + // memory usage during compaction. Will fix via parquet format refactor. + auto new_options = options; + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_read_context, + InternalReadContext::Create(read_context, table_schema_, new_options)); + auto read = std::make_unique(file_store_path_factory_, internal_read_context, + pool_, compact_executor_); + + return read->CreateReader(partition, bucket, files, dv_factory); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/append_only_file_store_write.h b/src/paimon/core/operation/append_only_file_store_write.h new file mode 100644 index 0000000..d1aacb5 --- /dev/null +++ b/src/paimon/core/operation/append_only_file_store_write.h @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/core/compact/cancellation_controller.h" +#include "paimon/core/core_options.h" +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/operation/abstract_file_store_write.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/file_store_write.h" +#include "paimon/logging.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/type_fwd.h" + +struct ArrowSchema; + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +struct DataFileMeta; +class BatchWriter; +class BucketedDvMaintainer; +class FileStorePathFactory; +class FileStoreScan; +class SnapshotManager; +class ScanFilter; +class MetricsImpl; +class BinaryRow; +class CoreOptions; +class Executor; +class Logger; +class MemoryPool; +class SchemaManager; +class TableSchema; +class IOManager; + +class AppendOnlyFileStoreWrite : public AbstractFileStoreWrite { + public: + AppendOnlyFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr& executor, const std::shared_ptr& pool); + ~AppendOnlyFileStoreWrite() override; + + /// Rewrites the given files into new compacted files. + /// + /// @param partition The partition of the files. + /// @param bucket The bucket number. + /// @param dv_factory Factory for creating deletion vectors (nullptr if DV is disabled). + /// @param to_compact The files to compact. + /// @param cancellation_controller Controller to cancel the compaction. + /// @return Result containing the new compacted files, or an error Status. + Result>> CompactRewrite( + const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, + const std::vector>& to_compact, + const std::shared_ptr& cancellation_controller); + + private: + using SingleFileWriterCreator = std::function< + Result>>>()>; + + Result> CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, + const std::shared_ptr& dv_maintainer) override; + + Result> CreateFileStoreScan( + const std::shared_ptr& filter) const override; + + SingleFileWriterCreator GetDataFileWriterCreator( + const BinaryRow& partition, int32_t bucket, const std::shared_ptr& schema, + const std::optional>& write_cols, + const std::vector>& to_compact) const; + + Result> CreateFilesReader( + const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, + const std::vector>& files) const; + + std::optional> write_cols_; + bool with_blob_ = false; + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/append_only_file_store_write_test.cpp b/src/paimon/core/operation/append_only_file_store_write_test.cpp new file mode 100644 index 0000000..09b9e28 --- /dev/null +++ b/src/paimon/core/operation/append_only_file_store_write_test.cpp @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/append_only_file_store_write.h" + +#include +#include +#include + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/file_store_write.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class AppendOnlyFileStoreWriteTest : public testing::Test { + public: + void SetUp() override { + fields_ = {arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), + arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), + arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), + arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), + arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), + arrow::field("non-partition-field", arrow::int32())}; + commit_user_ = "test_commit_user"; + } + + private: + arrow::FieldVector fields_; + std::string commit_user_; +}; + +TEST_F(AppendOnlyFileStoreWriteTest, TestWriteWithInvalidBatch) { + { + arrow::Schema typed_schema(fields_); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, /*options=*/{}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), commit_user_); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null pointer"); + } + { + arrow::Schema typed_schema(fields_); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, /*options=*/{}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user_); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared(); + arrow::StringBuilder builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(builder.Append(std::to_string(j)).ok()); + } + ASSERT_TRUE(builder.Finish(&array).ok()); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr batch, + batch_builder.SetBucket(1).SetPartition({{"f0", "true"}, {"f3", "1"}}).Finish()); + ASSERT_NOK_WITH_MSG(file_store_write->Write(std::move(batch)), + "batch bucket is 1 while options bucket is -1"); + ArrowArrayRelease(&arrow_array); + } +} + +TEST_F(AppendOnlyFileStoreWriteTest, TestGetMaxSequenceNumberFromMultiPartition) { + WriteContextBuilder builder( + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/", + commit_user_); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr write_context, + builder.AddOption("file.format", "orc").AddOption("manifest.format", "orc").Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + auto write = dynamic_cast(file_store_write.get()); + auto pool = GetDefaultPool(); + { + BinaryRow partition(2); + BinaryRowWriter writer(&partition, 20, pool.get()); + writer.WriteInt(0, 20); + writer.WriteInt(1, 1); + ASSERT_OK_AND_ASSIGN(std::shared_ptr restore_files, + write->ScanExistingFileMetas(partition, + /*bucket=*/0)); + ASSERT_EQ(-1, restore_files->TotalBuckets().value()); + ASSERT_EQ(0, DataFileMeta::GetMaxSequenceNumber(restore_files->DataFiles())); + } + { + BinaryRow partition(2); + BinaryRowWriter writer(&partition, 20, pool.get()); + writer.WriteInt(0, 10); + writer.WriteInt(1, 0); + ASSERT_OK_AND_ASSIGN(std::shared_ptr restore_files, + write->ScanExistingFileMetas(partition, + /*bucket=*/0)); + ASSERT_EQ(-1, restore_files->TotalBuckets().value()); + ASSERT_EQ(2, DataFileMeta::GetMaxSequenceNumber(restore_files->DataFiles())); + } + { + BinaryRow partition(2); + BinaryRowWriter writer(&partition, 20, pool.get()); + writer.WriteInt(0, 10); + writer.WriteInt(1, 0); + ASSERT_OK_AND_ASSIGN(std::shared_ptr restore_files, + write->ScanExistingFileMetas(partition, + /*bucket=*/1)); + ASSERT_EQ(std::nullopt, restore_files->TotalBuckets()); + ASSERT_EQ(-1, DataFileMeta::GetMaxSequenceNumber(restore_files->DataFiles())); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/key_value_file_store_write.cpp b/src/paimon/core/operation/key_value_file_store_write.cpp new file mode 100644 index 0000000..0f51199 --- /dev/null +++ b/src/paimon/core/operation/key_value_file_store_write.cpp @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/key_value_file_store_write.h" + +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/mergetree/levels.h" +#include "paimon/core/mergetree/merge_tree_writer.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/key_value_file_store_scan.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/primary_key_table_utils.h" +#include "paimon/core/utils/snapshot_manager.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class DataFilePathFactory; +class Executor; +class MemoryPool; +struct KeyValue; +template +class MergeFunctionWrapper; + +KeyValueFileStoreWrite::KeyValueFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const std::shared_ptr>& merge_function_wrapper, + const CoreOptions& options, bool ignore_previous_files, bool is_streaming_mode, + bool ignore_num_bucket_check, bool enable_multi_thread_spill, + const std::shared_ptr& executor, const std::shared_ptr& pool) + : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, schema_manager, commit_user, + root_path, table_schema, schema, /*write_schema=*/schema, + partition_schema, dv_maintainer_factory, io_manager, options, + ignore_previous_files, is_streaming_mode, ignore_num_bucket_check, + executor, pool), + enable_multi_thread_spill_(enable_multi_thread_spill), + key_comparator_(key_comparator), + user_defined_seq_comparator_(user_defined_seq_comparator), + merge_function_wrapper_(merge_function_wrapper), + compact_manager_factory_(std::make_unique( + options_, key_comparator_, user_defined_seq_comparator_, compaction_metrics_, + table_schema_, schema_, schema_manager_, io_manager_, cache_manager_, + file_store_path_factory_, root_path_, pool_)), + logger_(Logger::GetLogger("KeyValueFileStoreWrite")) {} + +Result> KeyValueFileStoreWrite::CreateFileStoreScan( + const std::shared_ptr& scan_filter) const { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, pool_)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, + options_.GetManifestTargetFileSize(), pool_, options_, + partition_schema_)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan, + KeyValueFileStoreScan::Create( + snapshot_manager_, schema_manager_, manifest_list, manifest_file, + table_schema_, schema_, scan_filter, options_, executor_, pool_)); + return scan; +} + +Result> KeyValueFileStoreWrite::CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, const std::shared_ptr& dv_maintainer) { + PAIMON_LOG_DEBUG(logger_, "Creating key value writer for partition %s, bucket %d", + partition.ToString().c_str(), bucket); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + file_store_path_factory_->CreateDataFilePathFactory(partition, bucket)); + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_keys, + table_schema_->TrimmedPrimaryKeys()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr levels, + Levels::Create(key_comparator_, restore_data_files, options_.GetNumLevels())); + auto compact_strategy = compact_manager_factory_->CreateCompactStrategy(); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr compact_manager, + compact_manager_factory_->CreateCompactManager(partition, bucket, compact_strategy, + compact_executor_, levels, dv_maintainer)); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer, + MergeTreeWriter::Create( + restore_max_seq_number, trimmed_primary_keys, data_file_path_factory, key_comparator_, + user_defined_seq_comparator_, merge_function_wrapper_, table_schema_->Id(), schema_, + options_, compact_manager, io_manager_, enable_multi_thread_spill_, pool_)); + return writer; +} + +Status KeyValueFileStoreWrite::Close() { + PAIMON_RETURN_NOT_OK(AbstractFileStoreWrite::Close()); + compact_manager_factory_->Close(); + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/key_value_file_store_write.h b/src/paimon/core/operation/key_value_file_store_write.h new file mode 100644 index 0000000..1445759 --- /dev/null +++ b/src/paimon/core/operation/key_value_file_store_write.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/core/mergetree/compact/merge_function_wrapper.h" +#include "paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h" +#include "paimon/core/operation/abstract_file_store_write.h" +#include "paimon/core/utils/batch_writer.h" +#include "paimon/logging.h" +#include "paimon/result.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +class FieldsComparator; +class FileStoreScan; +class ScanFilter; +class BinaryRow; +class FileStorePathFactory; +class SnapshotManager; +class SchemaManager; +class TableSchema; +class IOManager; +struct KeyValue; +template +class MergeFunctionWrapper; + +class KeyValueFileStoreWrite : public AbstractFileStoreWrite { + public: + KeyValueFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const std::shared_ptr>& merge_function_wrapper, + const CoreOptions& options, bool ignore_previous_files, bool is_streaming_mode, + bool ignore_num_bucket_check, bool enable_multi_thread_spill, + const std::shared_ptr& executor, const std::shared_ptr& pool); + + Status Close() override; + + private: + Result> CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, + const std::shared_ptr& dv_maintainer) override; + + Result> CreateFileStoreScan( + const std::shared_ptr& filter) const override; + + private: + bool enable_multi_thread_spill_; + std::shared_ptr key_comparator_; + std::shared_ptr user_defined_seq_comparator_; + std::shared_ptr> merge_function_wrapper_; + std::unique_ptr compact_manager_factory_; + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/key_value_file_store_write_test.cpp b/src/paimon/core/operation/key_value_file_store_write_test.cpp new file mode 100644 index 0000000..be4c715 --- /dev/null +++ b/src/paimon/core/operation/key_value_file_store_write_test.cpp @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/key_value_file_store_write.h" + +#include +#include +#include +#include +#include + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/file_store_write.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class KeyValueFileStoreWriteTest : public ::testing::Test { + protected: + Result> CreateSingleStringFileStoreWrite( + const std::map& table_options, bool with_temp_directory) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(typed_schema, &schema)); + + auto dir = UniqueTestDirectory::Create(); + if (!dir) { + return Status::Invalid("failed to create test directory"); + } + PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir->Str(), {})); + PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, + /*primary_keys=*/{"f0"}, table_options, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + if (with_temp_directory) { + context_builder.WithTempDirectory(dir->Str()); + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr write_context, + context_builder.Finish()); + return FileStoreWrite::Create(std::move(write_context)); + } + + Status WriteSingleStringRow(FileStoreWrite* file_store_write, int32_t bucket, + const std::string& value) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append()); + PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Append(value)); + + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array)); + ::ArrowArray arrow_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); + + RecordBatchBuilder batch_builder(&arrow_array); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr batch, + batch_builder.SetBucket(bucket).Finish()); + Status write_status = file_store_write->Write(std::move(batch)); + if (!ArrowArrayIsReleased(&arrow_array)) { + ArrowArrayRelease(&arrow_array); + } + return write_status; + } +}; + +TEST_F(KeyValueFileStoreWriteTest, TestWriteWithInvalidBatch) { + auto fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + std::string commit_user = "test"; + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "1"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null pointer"); + } + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "-2"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared(); + arrow::StringBuilder builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(builder.Append(std::to_string(j)).ok()); + } + ASSERT_TRUE(builder.Finish(&array).ok()); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + batch_builder.SetBucket(1).Finish()); + ASSERT_NOK_WITH_MSG(file_store_write->Write(std::move(batch)), + "batch bucket is 1 while options bucket is -2"); + ArrowArrayRelease(&arrow_array); + } + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "2"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared(); + arrow::StringBuilder builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(builder.Append(std::to_string(j)).ok()); + } + ASSERT_TRUE(builder.Finish(&array).ok()); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + batch_builder.SetBucket(3).Finish()); + ASSERT_NOK_WITH_MSG( + file_store_write->Write(std::move(batch)), + "fixed bucketed mode must specify a bucket which in [0, 2) in RecordBatch"); + ArrowArrayRelease(&arrow_array); + } +} + +TEST_F(KeyValueFileStoreWriteTest, TestPrepareCommitShouldSucceedWhenLookupEnabledWithIOManager) { + ASSERT_OK_AND_ASSIGN( + auto file_store_write, + CreateSingleStringFileStoreWrite({{"bucket", "1"}, {Options::FORCE_LOOKUP, "true"}}, + /*with_temp_directory=*/true)); + + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "k1")); + ASSERT_OK_AND_ASSIGN(auto commit_messages, + file_store_write->PrepareCommit(/*wait_compaction=*/true)); + ASSERT_EQ(commit_messages.size(), 1); +} + +TEST_F(KeyValueFileStoreWriteTest, + TestPrepareCommitShouldSucceedWhenDefaultCompactRewriterPathEnabled) { + ASSERT_OK_AND_ASSIGN( + auto file_store_write, + CreateSingleStringFileStoreWrite({{"bucket", "1"}}, /*with_temp_directory=*/false)); + + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "k1")); + ASSERT_OK_AND_ASSIGN(auto commit_messages, + file_store_write->PrepareCommit(/*wait_compaction=*/true)); + ASSERT_EQ(commit_messages.size(), 1); +} + +TEST_F(KeyValueFileStoreWriteTest, TestSpillSimple) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{"f0"}, + {{Options::BUCKET, "2"}, + {Options::WRITE_BUFFER_SIZE, "64"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + context_builder.WithTempDirectory(dir->Str()); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + auto key_value_file_store_write = dynamic_cast(file_store_write.get()); + auto get_writer = [&](int32_t bucket) -> std::shared_ptr { + auto partition_iter = key_value_file_store_write->writers_.find(BinaryRow::EmptyRow()); + if (partition_iter != key_value_file_store_write->writers_.end()) { + auto& buckets = partition_iter->second; + auto bucket_iter = buckets.find(bucket); + if (PAIMON_LIKELY(bucket_iter != buckets.end())) { + return bucket_iter->second.writer; + } + } + assert(false); + return nullptr; + }; + + // write bucket 0, not trigger spill + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, std::string(48, 'a'))); + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); + ASSERT_GT(get_writer(0)->GetMemoryUsage(), 0); + + // write bucket 1, spill bucket 0 (pick largest writer) + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/1, std::string(32, 'b'))); + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 1); + ASSERT_EQ(get_writer(0)->GetMemoryUsage(), 0); + ASSERT_GT(get_writer(1)->GetMemoryUsage(), 0); + + // prepare commit, clean all spill files and memory buffers + ASSERT_OK_AND_ASSIGN(auto commit_messages, + file_store_write->PrepareCommit(/*wait_compaction=*/true)); + ASSERT_EQ(commit_messages.size(), 2); + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); + ASSERT_EQ(get_writer(0)->GetMemoryUsage(), 0); + ASSERT_EQ(get_writer(1)->GetMemoryUsage(), 0); +} + +TEST_F(KeyValueFileStoreWriteTest, TestSpillDiskQuotaExhaustedFallsBackToFlushDataFile) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{"f0"}, + {{Options::BUCKET, "1"}, + {Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, "1b"}}, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + context_builder.WithTempDirectory(dir->Str()); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + + // Disk quota is 1 byte, so spill will exhaust quota immediately and fall back to + // FlushWriteBuffer (writing data files directly instead of spill temp files). + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "alice")); + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); + + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "bob")); + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); + + ASSERT_OK_AND_ASSIGN(auto commit_messages, + file_store_write->PrepareCommit(/*wait_compaction=*/true)); + ASSERT_EQ(commit_messages.size(), 1); + + // Verify all rows are committed correctly despite disk quota exhaustion. + auto* commit_impl = dynamic_cast(commit_messages[0].get()); + ASSERT_NE(commit_impl, nullptr); + const auto& new_files = commit_impl->GetNewFilesIncrement().NewFiles(); + ASSERT_FALSE(new_files.empty()); + + int64_t total_row_count = 0; + for (const auto& file : new_files) { + total_row_count += file->row_count; + } + ASSERT_EQ(total_row_count, 2); +} + +TEST_F(KeyValueFileStoreWriteTest, TestMultiRoundSpillWithSameKeyDeduplication) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{"f0"}, + {{Options::BUCKET, "1"}, + {Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}}, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + context_builder.WithTempDirectory(dir->Str()).WithStreamingMode(true); + + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + + // Round 1: alice, bob, alice (duplicate key) → after dedup: alice + bob = 2 rows + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "alice")); + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "bob")); + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "alice")); + + ASSERT_OK_AND_ASSIGN(auto commit_messages_1, + file_store_write->PrepareCommit(/*wait_compaction=*/true, 0)); + ASSERT_EQ(commit_messages_1.size(), 1); + { + auto* commit_impl = dynamic_cast(commit_messages_1[0].get()); + ASSERT_NE(commit_impl, nullptr); + int64_t total_row_count = 0; + for (const auto& file : commit_impl->GetNewFilesIncrement().NewFiles()) { + total_row_count += file->row_count; + } + ASSERT_EQ(total_row_count, 2); + } + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); + + // Round 2: bob, charlie, charlie (duplicate key) → after dedup: bob + charlie = 2 rows + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "bob")); + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "charlie")); + ASSERT_OK(WriteSingleStringRow(file_store_write.get(), /*bucket=*/0, "charlie")); + + ASSERT_OK_AND_ASSIGN(auto commit_messages_2, + file_store_write->PrepareCommit(/*wait_compaction=*/true, 1)); + ASSERT_EQ(commit_messages_2.size(), 1); + { + auto* commit_impl = dynamic_cast(commit_messages_2[0].get()); + ASSERT_NE(commit_impl, nullptr); + int64_t total_row_count = 0; + for (const auto& file : commit_impl->GetNewFilesIncrement().NewFiles()) { + total_row_count += file->row_count; + } + ASSERT_EQ(total_row_count, 2); + } + ASSERT_EQ(TestHelper::CountChannelFiles(dir->GetFileSystem(), dir->Str()), 0); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/metrics/compaction_metrics.h b/src/paimon/core/operation/metrics/compaction_metrics.h new file mode 100644 index 0000000..aa48a5e --- /dev/null +++ b/src/paimon/core/operation/metrics/compaction_metrics.h @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/metrics/metrics_impl.h" + +namespace paimon { +/// Metrics to measure a compaction. +class CompactionMetrics { + public: + static constexpr int32_t kCompactionTimeWindow = 100; + + static constexpr char MAX_LEVEL0_FILE_COUNT[] = "maxLevel0FileCount"; + static constexpr char AVG_LEVEL0_FILE_COUNT[] = "avgLevel0FileCount"; + static constexpr char AVG_COMPACTION_TIME[] = "avgCompactionTime"; + static constexpr char COMPACTION_COMPLETED_COUNT[] = "compactionCompletedCount"; + static constexpr char COMPACTION_TOTAL_COUNT[] = "compactionTotalCount"; + static constexpr char COMPACTION_QUEUED_COUNT[] = "compactionQueuedCount"; + static constexpr char MAX_COMPACTION_INPUT_SIZE[] = "maxCompactionInputSize"; + static constexpr char MAX_COMPACTION_OUTPUT_SIZE[] = "maxCompactionOutputSize"; + static constexpr char AVG_COMPACTION_INPUT_SIZE[] = "avgCompactionInputSize"; + static constexpr char AVG_COMPACTION_OUTPUT_SIZE[] = "avgCompactionOutputSize"; + static constexpr char MAX_TOTAL_FILE_SIZE[] = "maxTotalFileSize"; + static constexpr char AVG_TOTAL_FILE_SIZE[] = "avgTotalFileSize"; + + class Reporter { + public: + Reporter(CompactionMetrics* metrics, const BinaryRow& partition, int32_t bucket) + : metrics_(metrics), partition_(partition), bucket_(bucket) {} + + void ReportLevel0FileCount(int64_t count) { + level0_file_count_ = count; + } + void ReportCompactionInputSize(int64_t bytes) { + compaction_input_size_ = bytes; + } + void ReportCompactionOutputSize(int64_t bytes) { + compaction_output_size_ = bytes; + } + void ReportTotalFileSize(int64_t bytes) { + total_file_size_ = bytes; + } + void ReportCompactionTime(int64_t time) { + metrics_->ReportCompactionTime(time); + } + + void IncreaseCompactionsCompletedCount() { + metrics_->IncreaseCompactionsCompletedCount(); + } + void IncreaseCompactionsTotalCount() { + metrics_->IncreaseCompactionsTotalCount(); + } + void IncreaseCompactionsQueuedCount() { + metrics_->IncreaseCompactionsQueuedCount(); + } + void DecreaseCompactionsQueuedCount() { + metrics_->DecreaseCompactionsQueuedCount(); + } + void Unregister() { + metrics_->EraseReporter(partition_, bucket_); + } + + int64_t Level0FileCount() const { + return level0_file_count_; + } + + int64_t CompactionInputSize() const { + return compaction_input_size_; + } + + int64_t CompactionOutputSize() const { + return compaction_output_size_; + } + + int64_t TotalFileSize() const { + return total_file_size_; + } + + private: + CompactionMetrics* metrics_; + BinaryRow partition_; + int32_t bucket_; + + // Data fields for metrics. + int64_t level0_file_count_ = 0; + int64_t compaction_input_size_ = 0; + int64_t compaction_output_size_ = 0; + int64_t total_file_size_ = 0; + }; + + std::shared_ptr CreateReporter(const BinaryRow& partition, int32_t bucket) { + std::lock_guard lock(reporter_mutex_); + std::pair key(partition, bucket); + auto reporter = std::make_shared(this, partition, bucket); + reporters_[key] = reporter; + return reporter; + } + + void EraseReporter(const BinaryRow& partition, int32_t bucket) { + std::lock_guard lock(reporter_mutex_); + reporters_.erase(std::pair{partition, bucket}); + } + + void ReportCompactionTime(int64_t time) { + std::lock_guard lock(compaction_times_mutex_); + compaction_times_.push_back(time); + if (compaction_times_.size() > kCompactionTimeWindow) { + compaction_times_.erase(compaction_times_.begin()); + } + } + + double MaxLevel0FileCount() { + std::lock_guard lock(reporter_mutex_); + int64_t max_val = -1; + for (const auto& [_, reporter] : reporters_) { + max_val = std::max(max_val, reporter->Level0FileCount()); + } + return static_cast(max_val); + } + + double AvgLevel0FileCount() { + std::lock_guard lock(reporter_mutex_); + int64_t sum = 0; + size_t n = 0; + for (const auto& [_, reporter] : reporters_) { + sum += reporter->Level0FileCount(); + n++; + } + return n > 0 ? static_cast(sum) / n : -1; + } + + double MaxCompactionInputSize() { + std::lock_guard lock(reporter_mutex_); + int64_t max_val = -1; + for (const auto& [_, reporter] : reporters_) { + max_val = std::max(max_val, reporter->CompactionInputSize()); + } + return static_cast(max_val); + } + + double MaxCompactionOutputSize() { + std::lock_guard lock(reporter_mutex_); + int64_t max_val = -1; + for (const auto& [_, reporter] : reporters_) { + max_val = std::max(max_val, reporter->CompactionOutputSize()); + } + return static_cast(max_val); + } + + double AvgCompactionInputSize() { + std::lock_guard lock(reporter_mutex_); + int64_t sum = 0; + size_t n = 0; + for (const auto& [_, reporter] : reporters_) { + sum += reporter->CompactionInputSize(); + n++; + } + return n > 0 ? static_cast(sum) / n : -1; + } + + double AvgCompactionOutputSize() { + std::lock_guard lock(reporter_mutex_); + int64_t sum = 0; + size_t n = 0; + for (const auto& [_, reporter] : reporters_) { + sum += reporter->CompactionOutputSize(); + n++; + } + return n > 0 ? static_cast(sum) / n : -1; + } + + double AvgCompactionTime() { + std::lock_guard lock(compaction_times_mutex_); + if (compaction_times_.empty()) { + return 0.0; + } + int64_t sum = 0; + for (const auto& t : compaction_times_) { + sum += t; + } + return static_cast(sum) / compaction_times_.size(); + } + + double MaxTotalFileSize() { + std::lock_guard lock(reporter_mutex_); + int64_t max_val = -1; + for (const auto& [_, reporter] : reporters_) { + max_val = std::max(max_val, reporter->TotalFileSize()); + } + return static_cast(max_val); + } + + double AvgTotalFileSize() { + std::lock_guard lock(reporter_mutex_); + int64_t sum = 0; + size_t n = 0; + for (const auto& [_, reporter] : reporters_) { + sum += reporter->TotalFileSize(); + n++; + } + return n > 0 ? static_cast(sum) / n : -1; + } + + void IncreaseCompactionsCompletedCount() { + compactions_completed_count_++; + } + + void IncreaseCompactionsQueuedCount() { + compactions_queued_count_++; + } + + void DecreaseCompactionsQueuedCount() { + compactions_queued_count_--; + } + + void IncreaseCompactionsTotalCount() { + compactions_total_count_++; + } + + int64_t GetCompactionsCompletedCount() const { + return compactions_completed_count_; + } + + int64_t GetCompactionsTotalCount() const { + return compactions_total_count_; + } + + int64_t GetCompactionsQueuedCount() const { + return compactions_queued_count_; + } + + std::shared_ptr GetMetrics() { + auto metrics = std::make_shared(); + metrics->SetCounter(COMPACTION_COMPLETED_COUNT, GetCompactionsCompletedCount()); + metrics->SetCounter(COMPACTION_TOTAL_COUNT, GetCompactionsTotalCount()); + metrics->SetCounter(COMPACTION_QUEUED_COUNT, GetCompactionsQueuedCount()); + metrics->SetGauge(MAX_LEVEL0_FILE_COUNT, MaxLevel0FileCount()); + metrics->SetGauge(AVG_LEVEL0_FILE_COUNT, AvgLevel0FileCount()); + metrics->SetGauge(AVG_COMPACTION_TIME, AvgCompactionTime()); + metrics->SetGauge(MAX_COMPACTION_INPUT_SIZE, MaxCompactionInputSize()); + metrics->SetGauge(MAX_COMPACTION_OUTPUT_SIZE, MaxCompactionOutputSize()); + metrics->SetGauge(AVG_COMPACTION_INPUT_SIZE, AvgCompactionInputSize()); + metrics->SetGauge(AVG_COMPACTION_OUTPUT_SIZE, AvgCompactionOutputSize()); + metrics->SetGauge(MAX_TOTAL_FILE_SIZE, MaxTotalFileSize()); + metrics->SetGauge(AVG_TOTAL_FILE_SIZE, AvgTotalFileSize()); + return metrics; + } + + private: + std::unordered_map, std::shared_ptr> reporters_; + std::mutex reporter_mutex_; + + std::vector compaction_times_; + std::mutex compaction_times_mutex_; + + std::atomic compactions_completed_count_ = {0}; + std::atomic compactions_total_count_ = {0}; + std::atomic compactions_queued_count_ = {0}; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/metrics/compaction_metrics_test.cpp b/src/paimon/core/operation/metrics/compaction_metrics_test.cpp new file mode 100644 index 0000000..56919dd --- /dev/null +++ b/src/paimon/core/operation/metrics/compaction_metrics_test.cpp @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/metrics/compaction_metrics.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(CompactionMetricsTest, TestReporterAggregationAndCounters) { + CompactionMetrics metrics; + + auto reporter1 = metrics.CreateReporter(BinaryRow::EmptyRow(), 0); + auto reporter2 = metrics.CreateReporter(BinaryRow::EmptyRow(), 1); + + reporter1->ReportLevel0FileCount(10); + reporter2->ReportLevel0FileCount(4); + reporter1->ReportCompactionInputSize(200); + reporter2->ReportCompactionInputSize(100); + reporter1->ReportCompactionOutputSize(150); + reporter2->ReportCompactionOutputSize(90); + reporter1->ReportTotalFileSize(500); + reporter2->ReportTotalFileSize(300); + + reporter1->ReportCompactionTime(50); + reporter2->ReportCompactionTime(150); + + reporter1->IncreaseCompactionsCompletedCount(); + reporter1->IncreaseCompactionsTotalCount(); + reporter2->IncreaseCompactionsTotalCount(); + reporter1->IncreaseCompactionsQueuedCount(); + reporter2->IncreaseCompactionsQueuedCount(); + reporter2->DecreaseCompactionsQueuedCount(); + + auto snapshot = metrics.GetMetrics(); + + ASSERT_OK_AND_ASSIGN(auto completed, + snapshot->GetCounter(CompactionMetrics::COMPACTION_COMPLETED_COUNT)); + EXPECT_EQ(1, completed); + ASSERT_OK_AND_ASSIGN(auto total, + snapshot->GetCounter(CompactionMetrics::COMPACTION_TOTAL_COUNT)); + EXPECT_EQ(2, total); + ASSERT_OK_AND_ASSIGN(auto queued, + snapshot->GetCounter(CompactionMetrics::COMPACTION_QUEUED_COUNT)); + EXPECT_EQ(1, queued); + + ASSERT_OK_AND_ASSIGN(auto max_l0, snapshot->GetGauge(CompactionMetrics::MAX_LEVEL0_FILE_COUNT)); + EXPECT_DOUBLE_EQ(10.0, max_l0); + ASSERT_OK_AND_ASSIGN(auto avg_l0, snapshot->GetGauge(CompactionMetrics::AVG_LEVEL0_FILE_COUNT)); + EXPECT_DOUBLE_EQ(7.0, avg_l0); + + ASSERT_OK_AND_ASSIGN(auto max_input, + snapshot->GetGauge(CompactionMetrics::MAX_COMPACTION_INPUT_SIZE)); + EXPECT_DOUBLE_EQ(200.0, max_input); + ASSERT_OK_AND_ASSIGN(auto avg_input, + snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_INPUT_SIZE)); + EXPECT_DOUBLE_EQ(150.0, avg_input); + + ASSERT_OK_AND_ASSIGN(auto max_output, + snapshot->GetGauge(CompactionMetrics::MAX_COMPACTION_OUTPUT_SIZE)); + EXPECT_DOUBLE_EQ(150.0, max_output); + ASSERT_OK_AND_ASSIGN(auto avg_output, + snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_OUTPUT_SIZE)); + EXPECT_DOUBLE_EQ(120.0, avg_output); + + ASSERT_OK_AND_ASSIGN(auto max_total, + snapshot->GetGauge(CompactionMetrics::MAX_TOTAL_FILE_SIZE)); + EXPECT_DOUBLE_EQ(500.0, max_total); + ASSERT_OK_AND_ASSIGN(auto avg_total, + snapshot->GetGauge(CompactionMetrics::AVG_TOTAL_FILE_SIZE)); + EXPECT_DOUBLE_EQ(400.0, avg_total); + + ASSERT_OK_AND_ASSIGN(auto avg_time, snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_TIME)); + EXPECT_DOUBLE_EQ(100.0, avg_time); +} + +TEST(CompactionMetricsTest, TestUnregisterAndEmptyDefaults) { + CompactionMetrics metrics; + + auto reporter = metrics.CreateReporter(BinaryRow::EmptyRow(), 7); + reporter->ReportLevel0FileCount(9); + reporter->ReportCompactionInputSize(123); + reporter->ReportCompactionOutputSize(45); + reporter->ReportTotalFileSize(999); + + reporter->Unregister(); + + auto snapshot = metrics.GetMetrics(); + + ASSERT_OK_AND_ASSIGN(auto max_l0, snapshot->GetGauge(CompactionMetrics::MAX_LEVEL0_FILE_COUNT)); + EXPECT_DOUBLE_EQ(-1.0, max_l0); + ASSERT_OK_AND_ASSIGN(auto avg_l0, snapshot->GetGauge(CompactionMetrics::AVG_LEVEL0_FILE_COUNT)); + EXPECT_DOUBLE_EQ(-1.0, avg_l0); + + ASSERT_OK_AND_ASSIGN(auto max_input, + snapshot->GetGauge(CompactionMetrics::MAX_COMPACTION_INPUT_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, max_input); + ASSERT_OK_AND_ASSIGN(auto avg_input, + snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_INPUT_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, avg_input); + + ASSERT_OK_AND_ASSIGN(auto max_output, + snapshot->GetGauge(CompactionMetrics::MAX_COMPACTION_OUTPUT_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, max_output); + ASSERT_OK_AND_ASSIGN(auto avg_output, + snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_OUTPUT_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, avg_output); + + ASSERT_OK_AND_ASSIGN(auto max_total, + snapshot->GetGauge(CompactionMetrics::MAX_TOTAL_FILE_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, max_total); + ASSERT_OK_AND_ASSIGN(auto avg_total, + snapshot->GetGauge(CompactionMetrics::AVG_TOTAL_FILE_SIZE)); + EXPECT_DOUBLE_EQ(-1.0, avg_total); + + ASSERT_OK_AND_ASSIGN(auto avg_time, snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_TIME)); + EXPECT_DOUBLE_EQ(0.0, avg_time); +} + +TEST(CompactionMetricsTest, TestCompactionTimeWindow) { + CompactionMetrics metrics; + + for (int64_t t = 1; t <= CompactionMetrics::kCompactionTimeWindow + 10; ++t) { + metrics.ReportCompactionTime(t); + } + + auto snapshot = metrics.GetMetrics(); + ASSERT_OK_AND_ASSIGN(auto avg_time, snapshot->GetGauge(CompactionMetrics::AVG_COMPACTION_TIME)); + + // Only the last kCompactionTimeWindow values are kept. + EXPECT_DOUBLE_EQ(60.5, avg_time); +} + +} // namespace paimon::test