From e50dc7ad2440fb6be5960da85dcf6cdf870906ce Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Thu, 18 Jun 2026 10:46:51 +0800 Subject: [PATCH] feat: add file store commit and manifest merger support --- include/paimon/commit_context.h | 160 +++ include/paimon/file_store_commit.h | 147 +++ src/paimon/core/operation/commit_context.cpp | 135 +++ .../core/operation/file_store_commit.cpp | 140 +++ .../core/operation/file_store_commit_impl.cpp | 1002 +++++++++++++++++ .../core/operation/file_store_commit_impl.h | 236 ++++ .../core/operation/file_store_commit_test.cpp | 78 ++ .../core/operation/manifest_file_merger.cpp | 185 +++ .../core/operation/manifest_file_merger.h | 71 ++ .../operation/manifest_file_merger_test.cpp | 375 ++++++ .../core/operation/metrics/commit_metrics.h | 48 + .../operation/metrics/commit_metrics_test.cpp | 52 + 12 files changed, 2629 insertions(+) create mode 100644 include/paimon/commit_context.h create mode 100644 include/paimon/file_store_commit.h create mode 100644 src/paimon/core/operation/commit_context.cpp create mode 100644 src/paimon/core/operation/file_store_commit.cpp create mode 100644 src/paimon/core/operation/file_store_commit_impl.cpp create mode 100644 src/paimon/core/operation/file_store_commit_impl.h create mode 100644 src/paimon/core/operation/file_store_commit_test.cpp create mode 100644 src/paimon/core/operation/manifest_file_merger.cpp create mode 100644 src/paimon/core/operation/manifest_file_merger.h create mode 100644 src/paimon/core/operation/manifest_file_merger_test.cpp create mode 100644 src/paimon/core/operation/metrics/commit_metrics.h create mode 100644 src/paimon/core/operation/metrics/commit_metrics_test.cpp diff --git a/include/paimon/commit_context.h b/include/paimon/commit_context.h new file mode 100644 index 0000000..b273048 --- /dev/null +++ b/include/paimon/commit_context.h @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class Executor; +class MemoryPool; + +/// `CommitContext` is some configuration for commit operations. +/// +/// Please do not use this class directly, use `CommitContextBuilder` to build a `CommitContext` +/// which has input validation. +/// @see CommitContextBuilder +class PAIMON_EXPORT CommitContext { + public: + CommitContext(const std::string& root_path, const std::string& commit_user, + bool ignore_empty_commit, bool use_rest_catalog_commit, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor, + const std::shared_ptr& specific_file_system, + const std::map& options); + ~CommitContext(); + + const std::string& GetRootPath() const { + return root_path_; + } + + const std::string& GetCommitUser() const { + return commit_user_; + } + + bool IgnoreEmptyCommit() const { + return ignore_empty_commit_; + } + + bool UseRESTCatalogCommit() const { + return use_rest_catalog_commit_; + } + + std::shared_ptr GetMemoryPool() const { + return memory_pool_; + } + + std::shared_ptr GetExecutor() const { + return executor_; + } + + std::shared_ptr GetSpecificFileSystem() const { + return specific_file_system_; + } + + const std::map& GetOptions() const { + return options_; + } + + private: + std::string root_path_; + std::string commit_user_; + bool ignore_empty_commit_; + bool use_rest_catalog_commit_; + std::shared_ptr memory_pool_; + std::shared_ptr executor_; + std::shared_ptr specific_file_system_; + std::map options_; +}; + +/// `CommitContextBuilder` used to build a `CommitContext`, has input validation. +class PAIMON_EXPORT CommitContextBuilder { + public: + /// Constructs a `CommitContextBuilder` with required parameters. + /// @param root_path The root path of the Paimon table. + /// @param commit_user The user identifier for the commit operation. + CommitContextBuilder(const std::string& root_path, const std::string& commit_user); + + ~CommitContextBuilder(); + + /// Set a configuration options map to set some option entries which are not defined in the + /// table schema or whose values you want to overwrite. + /// @note The options map will clear the options added by `AddOption()` before. + /// @param options The configuration options map. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& SetOptions(const std::map& options); + + /// Add a single configuration option which is not defined in the table schema or whose value + /// you want to overwrite. + /// + /// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()` + /// instead. + /// @param key The option key. + /// @param value The option value. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& AddOption(const std::string& key, const std::string& value); + + /// Sets whether to ignore empty commits (default is true). + /// When set to true, commits that don't contain any actual data changes will be ignored. + /// @param ignore_empty_commit True to ignore empty commits, false otherwise. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& IgnoreEmptyCommit(bool ignore_empty_commit); + + /// Sets whether to use REST catalog commit (default is false). + /// @note Temporary interface, will be removed in the future. + /// @param use_rest_catalog_commit True to use REST catalog commit, false otherwise. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& UseRESTCatalogCommit(bool use_rest_catalog_commit); + + /// Sets the memory pool to be used for memory allocation during commit operations. + /// @param memory_pool Shared pointer to the memory pool instance. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& WithMemoryPool(const std::shared_ptr& memory_pool); + + /// Sets the executor to be used for asynchronous operations during commit. + /// @param executor Shared pointer to the executor instance. + /// @return Reference to this builder for method chaining. + CommitContextBuilder& WithExecutor(const std::shared_ptr& executor); + + /// Sets a custom file system instance to be used for all file operations in this commit + /// context. + /// This bypasses the global file system registry and uses the provided implementation directly. + /// + /// @param file_system The file system to use. + /// @return Reference to this builder for method chaining. + /// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`) + CommitContextBuilder& WithFileSystem(const std::shared_ptr& file_system); + + /// Build and return a `CommitContext` instance with input validation. + /// @return Result containing the constructed `CommitContext` or an error status. + Result> Finish(); + + private: + class Impl; + + std::unique_ptr impl_; +}; + +} // namespace paimon diff --git a/include/paimon/file_store_commit.h b/include/paimon/file_store_commit.h new file mode 100644 index 0000000..a702c41 --- /dev/null +++ b/include/paimon/file_store_commit.h @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/defs.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class CommitContext; +class CommitMessage; + +/// Interface for commit operations in a file store. +/// +/// The `FileStoreCommit` class provides interfaces for committing changes, expiring old snapshots, +/// dropping partitions, and retrieving commit metrics. +class PAIMON_EXPORT FileStoreCommit { + public: + /// Create an instance of `FileStoreCommit`. + /// + /// @param context A unique pointer to the `CommitContext` used for commit operations. + /// + /// @return A Result containing a unique pointer to the `FileStoreCommit` instance. + static Result> Create(std::unique_ptr context); + + virtual ~FileStoreCommit() = default; + + /// Commit changes to the file store. + /// + /// @param commit_messages A vector of commit messages to be committed. + /// @param commit_identifier An optional identifier for the commit operation. Default is + /// `BATCH_WRITE_COMMIT_IDENTIFIER`. + /// @param watermark An optional event-time watermark used to indicate the progress of data + /// processing. Default is std::nullopt. + /// @return Status indicating the success or failure of the commit operation. + virtual Status Commit(const std::vector>& commit_messages, + int64_t commit_identifier = BATCH_WRITE_COMMIT_IDENTIFIER, + std::optional watermark = std::nullopt) = 0; + + /// Filter out all `std::vector` which have been committed and commit the + /// remaining ones. + /// + /// Compared to commit, this method will first check if a commit_identifier has been + /// committed, so this method might be slower. A common usage of this method is to retry the + /// commit process after a failure. + /// + /// @param commit_identifier_and_messages A map containing all {@link CommitMessage}s in + /// question. The key is the commit_identifier. + /// + /// @param watermark An optional event-time watermark used to indicate the progress of data + /// processing. Default is std::nullopt. + /// @return Number of `std::vector` committed. + virtual Result FilterAndCommit( + const std::map>>& + commit_identifier_and_messages, + std::optional watermark = std::nullopt) = 0; + + /// Overwrite from manifest committable and partition. + /// + /// @param partitions A single partition maps each partition key to a partition value. Depending + /// on the user-defined statement, the partition might not include all partition keys. Also + /// note that this partition does not necessarily equal to the partitions of the newly added + /// key-values. This is just the partition to be cleaned up. + /// @param commit_messages Description of the commit messages. + /// @param commit_identifier Unique identifier. + /// @param watermark An optional event-time watermark used to indicate the progress of data + /// processing. Default is std::nullopt. + /// @return Result of the operation. + virtual Status Overwrite(const std::vector>& partitions, + const std::vector>& commit_messages, + int64_t commit_identifier, + std::optional watermark = std::nullopt) = 0; + + /// This is a temporary interface for internal use. It will be removed in a future version. + /// Please do not rely on it for long-term use. + /// + /// @param partitions Description of the partitions. + /// @param commit_messages Description of the commit messages. + /// @param commit_identifier Unique identifier. + /// @param watermark An optional event-time watermark used to indicate the progress of data + /// processing. Default is std::nullopt. + /// @return Result of the operation. + virtual Result FilterAndOverwrite( + const std::vector>& partitions, + const std::vector>& commit_messages, + int64_t commit_identifier, std::optional watermark = std::nullopt) = 0; + + /// If user want to use REST catalog commit, please set + /// `CommitContextBuilder::UseRESTCatalogCommit()`, then call `Commit()` (or + /// `FilterAndCommit()`) normally, then call this method to get the last commit table request, + /// which is a JSON string that can be used to send to REST catalog server. + /// + /// @note Temporary interface for internal use, will be removed in the future. + /// + /// @return A Result containing a JSON string which including `snapshot` and `statistics`, but + /// excluding `tableId`. + virtual Result GetLastCommitTableRequest() = 0; + + /// Expire old snapshot in the file store. + /// + /// @return Result indicating the number of expired items or an error status. + virtual Result Expire() = 0; + + /// Drop specified partitions from the file store. + /// + /// @param partitions A vector of partitions to be dropped. + /// @param commit_identifier An identifier for the commit operation. + /// @return Status indicating the success or failure of the drop partition operation. + virtual Status DropPartition(const std::vector>& partitions, + int64_t commit_identifier) = 0; + + /// Retrieve metrics related to commit operations. + /// + /// @return A shared pointer to a `Metrics` object containing commit metrics. + virtual std::shared_ptr GetCommitMetrics() const = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/commit_context.cpp b/src/paimon/core/operation/commit_context.cpp new file mode 100644 index 0000000..490a075 --- /dev/null +++ b/src/paimon/core/operation/commit_context.cpp @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/commit_context.h" + +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +CommitContext::CommitContext(const std::string& root_path, const std::string& commit_user, + bool ignore_empty_commit, bool use_rest_catalog_commit, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor, + const std::shared_ptr& specific_file_system, + const std::map& options) + : root_path_(root_path), + commit_user_(commit_user), + ignore_empty_commit_(ignore_empty_commit), + use_rest_catalog_commit_(use_rest_catalog_commit), + memory_pool_(memory_pool), + executor_(executor), + specific_file_system_(specific_file_system), + options_(options) {} + +CommitContext::~CommitContext() = default; + +class CommitContextBuilder::Impl { + public: + friend class CommitContextBuilder; + + void Reset() { + ignore_empty_commit_ = true; + use_rest_catalog_commit_ = false; + memory_pool_ = GetDefaultPool(); + executor_ = CreateDefaultExecutor(); + specific_file_system_.reset(); + options_.clear(); + } + + private: + std::string root_path_; + std::string commit_user_; + bool ignore_empty_commit_ = true; + bool use_rest_catalog_commit_ = false; + std::shared_ptr memory_pool_ = GetDefaultPool(); + std::shared_ptr executor_ = CreateDefaultExecutor(); + std::shared_ptr specific_file_system_; + std::map options_; +}; + +CommitContextBuilder::CommitContextBuilder(const std::string& root_path, + const std::string& commit_user) + : impl_(std::make_unique()) { + impl_->root_path_ = root_path; + impl_->commit_user_ = commit_user; +} + +CommitContextBuilder::~CommitContextBuilder() = default; + +CommitContextBuilder& CommitContextBuilder::AddOption(const std::string& key, + const std::string& value) { + impl_->options_[key] = value; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::SetOptions( + const std::map& opts) { + impl_->options_ = opts; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::IgnoreEmptyCommit(bool ignore_empty_commit) { + impl_->ignore_empty_commit_ = ignore_empty_commit; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::UseRESTCatalogCommit(bool use_rest_catalog_commit) { + impl_->use_rest_catalog_commit_ = use_rest_catalog_commit; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::WithMemoryPool( + const std::shared_ptr& memory_pool) { + impl_->memory_pool_ = memory_pool; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::WithExecutor( + const std::shared_ptr& executor) { + impl_->executor_ = executor; + return *this; +} + +CommitContextBuilder& CommitContextBuilder::WithFileSystem( + const std::shared_ptr& file_system) { + impl_->specific_file_system_ = file_system; + return *this; +} + +Result> CommitContextBuilder::Finish() { + PAIMON_ASSIGN_OR_RAISE(impl_->root_path_, PathUtil::NormalizePath(impl_->root_path_)); + if (impl_->root_path_.empty()) { + return Status::Invalid("root path is empty"); + } + auto ctx = std::make_unique( + impl_->root_path_, impl_->commit_user_, impl_->ignore_empty_commit_, + impl_->use_rest_catalog_commit_, impl_->memory_pool_, impl_->executor_, + impl_->specific_file_system_, impl_->options_); + impl_->Reset(); + return ctx; +} + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_commit.cpp b/src/paimon/core/operation/file_store_commit.cpp new file mode 100644 index 0000000..8cc4ba9 --- /dev/null +++ b/src/paimon/core/operation/file_store_commit.cpp @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/file_store_commit.h" + +#include +#include + +#include "paimon/commit_context.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/binary_row_partition_computer.h" +#include "paimon/core/core_options.h" +#include "paimon/core/manifest/index_manifest_entry.h" +#include "paimon/core/manifest/index_manifest_file.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/expire_snapshots.h" +#include "paimon/core/operation/file_store_commit_impl.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/format/file_format.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +Result> FileStoreCommit::Create( + std::unique_ptr ctx) { + if (ctx == nullptr) { + return Status::Invalid("commit context is null pointer"); + } + if (ctx->GetMemoryPool() == nullptr) { + return Status::Invalid("memory pool is null pointer"); + } + if (ctx->GetExecutor() == nullptr) { + return Status::Invalid("executor is null pointer"); + } + + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, + CoreOptions::FromMap(ctx->GetOptions(), ctx->GetSpecificFileSystem())); + const std::string& root_path = ctx->GetRootPath(); + auto schema_manager = std::make_shared(tmp_options.GetFileSystem(), root_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> table_schema, + schema_manager->Latest()); + if (table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + const auto& schema = table_schema.value(); + if (!schema->PrimaryKeys().empty() && + ctx->GetOptions().find("enable-pk-commit-in-inte-test") == ctx->GetOptions().end()) { + return Status::NotImplemented("not support pk table commit yet"); + } + auto opts = schema->Options(); + for (const auto& [key, value] : ctx->GetOptions()) { + opts[key] = value; + } + std::shared_ptr arrow_schema = + DataField::ConvertDataFieldsToArrowSchema(schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(CoreOptions options, + CoreOptions::FromMap(opts, ctx->GetSpecificFileSystem())); + assert(options.GetFileSystem()); + assert(options.GetFileFormat()); + PAIMON_ASSIGN_OR_RAISE(bool is_object_store, FileSystem::IsObjectStore(root_path)); + if (is_object_store && opts.find("enable-object-store-commit-in-inte-test") == opts.end()) { + return Status::NotImplemented( + "commit operation does not support object store file system for now"); + } + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr partition_computer, + BinaryRowPartitionComputer::Create( + table_schema.value()->PartitionKeys(), arrow_schema, options.GetPartitionDefaultName(), + options.LegacyPartitionNameEnabled(), ctx->GetMemoryPool())); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + root_path, arrow_schema, table_schema.value()->PartitionKeys(), + options.GetPartitionDefaultName(), options.GetFileFormat()->Identifier(), + options.DataFilePrefix(), options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, options.IndexFileInDataFileDir(), ctx->GetMemoryPool())); + + auto snapshot_manager = std::make_shared(options.GetFileSystem(), root_path); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), path_factory, ctx->GetMemoryPool())); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, table_schema.value()->PartitionKeys())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), path_factory, + options.GetManifestTargetFileSize(), ctx->GetMemoryPool(), options, + partition_schema)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr index_manifest_file, + IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), path_factory, + options.GetBucket(), ctx->GetMemoryPool(), options)); + + auto expire_snapshots = std::make_shared( + snapshot_manager, path_factory, manifest_list, manifest_file, options.GetFileSystem(), + options.GetExpireConfig(), ctx->GetExecutor()); + + return std::make_unique( + ctx->GetMemoryPool(), ctx->GetExecutor(), arrow_schema, root_path, ctx->GetCommitUser(), + options, path_factory, std::move(partition_computer), snapshot_manager, + ctx->IgnoreEmptyCommit(), ctx->UseRESTCatalogCommit(), table_schema.value(), manifest_file, + manifest_list, index_manifest_file, expire_snapshots, schema_manager); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_commit_impl.cpp b/src/paimon/core/operation/file_store_commit_impl.cpp new file mode 100644 index 0000000..06eeeae --- /dev/null +++ b/src/paimon/core/operation/file_store_commit_impl.cpp @@ -0,0 +1,1002 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/file_store_commit_impl.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/commit_message.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/binary_row_partition_computer.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/catalog/catalog_snapshot_commit.h" +#include "paimon/core/catalog/renaming_snapshot_commit.h" +#include "paimon/core/catalog/snapshot_commit.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_entry.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/manifest/index_manifest_file.h" +#include "paimon/core/manifest/manifest_committable.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_file_meta.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/manifest/partition_entry.h" +#include "paimon/core/operation/append_only_file_store_scan.h" +#include "paimon/core/operation/expire_snapshots.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/manifest_file_merger.h" +#include "paimon/core/operation/metrics/commit_metrics.h" +#include "paimon/core/partition/partition_statistics.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/utils/duration.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/fs/file_system.h" +#include "paimon/logging.h" +#include "paimon/metrics.h" +#include "paimon/scan_context.h" + +namespace paimon { +class Executor; +class MemoryPool; + +FileStoreCommitImpl::FileStoreCommitImpl( + const std::shared_ptr& pool, const std::shared_ptr& executor, + const std::shared_ptr& schema, const std::string& root_path, + const std::string& commit_user, const CoreOptions& options, + const std::shared_ptr& path_factory, + std::unique_ptr partition_computer, + const std::shared_ptr& snapshot_manager, bool ignore_empty_commit, + bool use_rest_catalog_commit, const std::shared_ptr& table_schema, + const std::shared_ptr& manifest_file, + const std::shared_ptr& manifest_list, + const std::shared_ptr& index_manifest_file, + const std::shared_ptr& expire_snapshots, + const std::shared_ptr& schema_manager) + : memory_pool_(pool), + executor_(executor), + schema_(schema), + root_path_(root_path), + commit_user_(commit_user), + options_(options), + path_factory_(path_factory), + fs_(options.GetFileSystem()), + partition_computer_(std::move(partition_computer)), + snapshot_manager_(snapshot_manager), + ignore_empty_commit_(ignore_empty_commit), + num_bucket_(options.GetBucket()), + table_schema_(table_schema), + manifest_file_(manifest_file), + manifest_list_(manifest_list), + index_manifest_file_(index_manifest_file), + expire_snapshots_(expire_snapshots), + schema_manager_(schema_manager), + metrics_(std::make_shared()), + logger_(Logger::GetLogger("FileStoreCommitImpl")) { + if (use_rest_catalog_commit) { + snapshot_commit_ = std::make_shared(); + } else { + snapshot_commit_ = std::make_shared(fs_, snapshot_manager_); + } +} + +FileStoreCommitImpl::~FileStoreCommitImpl() = default; + +Result FileStoreCommitImpl::Expire() { + return expire_snapshots_->Expire(); +} + +Status FileStoreCommitImpl::DropPartition( + const std::vector>& partitions, int64_t commit_identifier) { + if (partitions.empty()) { + return Status::Invalid("Drop partition failed: partitions list cannot be empty."); + } + std::string log_msg = fmt::format("Ready to drop partitions {}", partitions); + PAIMON_LOG_DEBUG(logger_, "%s", log_msg.c_str()); + return TryOverwrite(partitions, {}, commit_identifier, std::nullopt); +} + +Result FileStoreCommitImpl::FilterAndCommit( + const std::map>>& + commit_identifier_and_messages, + std::optional watermark) { + std::vector> committables; + for (const auto& [identifier, msgs] : commit_identifier_and_messages) { + committables.push_back(CreateManifestCommittable(identifier, msgs, watermark)); + } + + PAIMON_ASSIGN_OR_RAISE(std::vector> retry_committables, + FilterCommitted(committables)); + if (!retry_committables.empty()) { + PAIMON_RETURN_NOT_OK(CheckFilesExistence(retry_committables)); + for (const auto& committable : retry_committables) { + PAIMON_RETURN_NOT_OK(Commit(committable, /*check_append_files=*/true)); + } + } + return retry_committables.size(); +} + +Status FileStoreCommitImpl::CheckFilesExistence( + const std::vector>& committables) const { + std::vector all_paths; + for (const auto& committable : committables) { + for (const auto& message : committable->FileCommittables()) { + auto msg = dynamic_cast(message.get()); + if (msg) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr data_file_path_factory, + path_factory_->CreateDataFilePathFactory(msg->Partition(), msg->Bucket())); + auto collect_files = + [&all_paths, data_file_path_factory]( + const std::vector>& file_metas) { + for (const auto& file_meta : file_metas) { + auto paths = data_file_path_factory->CollectFiles(file_meta); + all_paths.insert(all_paths.end(), paths.begin(), paths.end()); + } + }; + // skip compact before files, deleted index files + DataIncrement new_files_increment = msg->GetNewFilesIncrement(); + collect_files(new_files_increment.NewFiles()); + collect_files(new_files_increment.ChangelogFiles()); + auto new_data_index_metas = new_files_increment.NewIndexFiles(); + for (const auto& data_index_meta : new_data_index_metas) { + all_paths.push_back( + path_factory_->ToIndexFilePath(data_index_meta->FileName())); + } + + CompactIncrement compact_increment = msg->GetCompactIncrement(); + collect_files(compact_increment.CompactBefore()); + collect_files(compact_increment.CompactAfter()); + auto new_compact_index_metas = compact_increment.NewIndexFiles(); + for (const auto& compact_index_meta : new_compact_index_metas) { + all_paths.push_back( + path_factory_->ToIndexFilePath(compact_index_meta->FileName())); + } + } else { + return Status::Invalid("fail to cast commit message to impl"); + } + } + } + std::vector>>> file_exists_futures; + for (const auto& path : all_paths) { + file_exists_futures.push_back( + Via(executor_.get(), [this, path]() -> Result> { + PAIMON_ASSIGN_OR_RAISE(bool exist, fs_->Exists(path)); + return std::pair(exist, path); + })); + } + int32_t not_exist_files_count = 0; + std::vector>> file_exists = CollectAll(file_exists_futures); + std::vector non_exist_files; + for (auto file_exist : file_exists) { + if (!file_exist.ok()) { + return file_exist.status(); + } + if (!file_exist.value().first) { + not_exist_files_count++; + non_exist_files.push_back(file_exist.value().second); + } + } + + if (not_exist_files_count > 0) { + return Status::Invalid(fmt::format( + "Cannot recover from this checkpoint because some files in the snapshot that need to " + "be resubmitted have been deleted: {}. The most likely reason is because you are " + "recovering from a very old savepoint that contains some uncommitted files that have " + "already been deleted.", + fmt::join(non_exist_files, ", "))); + } + return Status::OK(); +} + +Result>> FileStoreCommitImpl::FilterCommitted( + const std::vector>& committables) { + // nothing to filter, fast exit + if (committables.empty()) { + return committables; + } + + for (size_t i = 1; i < committables.size(); i++) { + if (committables[i]->Identifier() < committables[i - 1]->Identifier()) { + return Status::Invalid( + "Committables must be sorted according to identifiers before filtering. This is " + "unexpected."); + } + } + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot, + snapshot_manager_->LatestSnapshotOfUser(commit_user_)); + if (latest_snapshot) { + std::vector> result; + for (const auto& committable : committables) { + // if committable is newer than latest snapshot, then it hasn't been committed + if (committable->Identifier() > latest_snapshot.value().CommitIdentifier()) { + result.push_back(committable); + } else { + // TODO(yonghao.fyh): callback + } + } + return result; + } else { + // if there is no previous snapshots then nothing should be filtered + return committables; + } +} + +Status FileStoreCommitImpl::Overwrite( + const std::vector>& partitions, + const std::vector>& commit_messages, int64_t identifier, + std::optional watermark) { + std::shared_ptr committable = + CreateManifestCommittable(identifier, commit_messages, watermark); + std::vector append_table_files; + std::vector append_changelog_files; + std::vector compact_table_files; + std::vector compact_changelog_files; + std::vector append_table_index_files; + std::vector compact_table_index_files; + PAIMON_RETURN_NOT_OK(CollectChanges(committable->FileCommittables(), &append_table_files, + &append_changelog_files, &compact_table_files, + &compact_changelog_files, &append_table_index_files, + &compact_table_index_files)); + if (!append_table_index_files.empty()) { + return Status::NotImplemented("Overwrite not support index for now"); + } + return TryOverwrite(partitions, append_table_files, identifier, watermark); +} + +Result FileStoreCommitImpl::FilterAndOverwrite( + const std::vector>& partitions, + const std::vector>& commit_messages, int64_t identifier, + std::optional watermark) { + std::shared_ptr committable = + CreateManifestCommittable(identifier, commit_messages, watermark); + std::vector> committables; + committables.push_back(committable); + PAIMON_ASSIGN_OR_RAISE(std::vector> actual_committables, + FilterCommitted(committables)); + if (!actual_committables.empty()) { + std::vector append_table_files; + std::vector append_changelog_files; + std::vector compact_table_files; + std::vector compact_changelog_files; + std::vector append_table_index_files; + std::vector compact_table_index_files; + PAIMON_RETURN_NOT_OK(CollectChanges(actual_committables[0]->FileCommittables(), + &append_table_files, &append_changelog_files, + &compact_table_files, &compact_changelog_files, + &append_table_index_files, &compact_table_index_files)); + if (!append_table_index_files.empty()) { + return Status::NotImplemented("FilterAndOverwrite not support index for now"); + } + PAIMON_RETURN_NOT_OK(TryOverwrite(partitions, append_table_files, identifier, watermark)); + } + return actual_committables.size(); +} + +Result FileStoreCommitImpl::GetLastCommitTableRequest() { + return snapshot_commit_->GetLastCommitTableRequest(); +} + +Result> FileStoreCommitImpl::GetAllFiles( + const Snapshot& snapshot, const std::vector>& partitions) { + auto scan_filter = std::make_shared(/*predicate=*/nullptr, partitions, + /*bucket_filter=*/std::nullopt); + PAIMON_ASSIGN_OR_RAISE( + auto scan, AppendOnlyFileStoreScan::Create( + snapshot_manager_, schema_manager_, manifest_list_, manifest_file_, + table_schema_, schema_, scan_filter, options_, executor_, memory_pool_)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, + scan->WithSnapshot(snapshot)->CreatePlan()); + // scan existing file metas + return plan->Files(); +} + +Status FileStoreCommitImpl::TryOverwrite( + const std::vector>& partitions, + const std::vector& changes, int64_t commit_identifier, + std::optional watermark) { + int32_t retry_count = 0; + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot, + snapshot_manager_->LatestSnapshot()); + std::vector changes_with_overwrite; + if (latest_snapshot) { + PAIMON_ASSIGN_OR_RAISE(std::vector entries, + GetAllFiles(latest_snapshot.value(), partitions)); + for (const auto& entry : entries) { + changes_with_overwrite.emplace_back(FileKind::Delete(), entry.Partition(), + entry.Bucket(), entry.TotalBuckets(), + entry.File()); + } + } + changes_with_overwrite.insert(changes_with_overwrite.end(), changes.begin(), changes.end()); + PAIMON_ASSIGN_OR_RAISE(bool commit_success, + TryCommitOnce(changes_with_overwrite, /*index_entries=*/{}, + commit_identifier, watermark, + /*log_offsets=*/{}, /*properties=*/{}, + Snapshot::CommitKind::Overwrite(), latest_snapshot, + /*need_conflict_check=*/true)); + if (commit_success) { + break; + } + if (retry_count >= options_.GetCommitMaxRetries()) { + return Status::Invalid( + fmt::format("Commit failed after {} attempts, there maybe exist commit conflicts " + "between multiple jobs.", + options_.GetCommitMaxRetries())); + } + retry_count++; + } + return Status::OK(); +} + +Status FileStoreCommitImpl::Commit(const std::shared_ptr& committable, + bool check_append_files) { + std::vector append_table_files; + std::vector append_changelog_files; + std::vector compact_table_files; + std::vector compact_changelog_files; + std::vector append_table_index_files; + std::vector compact_table_index_files; + PAIMON_RETURN_NOT_OK(CollectChanges(committable->FileCommittables(), &append_table_files, + &append_changelog_files, &compact_table_files, + &compact_changelog_files, &append_table_index_files, + &compact_table_index_files)); + + int32_t attempt = 0; + int32_t generated_snapshot = 0; + Duration duration; + if (!ignore_empty_commit_ || !append_table_files.empty() || !append_table_index_files.empty()) { + PAIMON_ASSIGN_OR_RAISE(int32_t cnt, + TryCommit(append_table_files, append_table_index_files, + committable->Identifier(), committable->Watermark(), + committable->LogOffsets(), committable->Properties(), + Snapshot::CommitKind::Append(), check_append_files)); + attempt += cnt; + ++generated_snapshot; + } + + if (!compact_table_files.empty() || !compact_table_index_files.empty()) { + PAIMON_ASSIGN_OR_RAISE( + int32_t cnt, TryCommit(compact_table_files, compact_table_index_files, + committable->Identifier(), committable->Watermark(), + committable->LogOffsets(), committable->Properties(), + Snapshot::CommitKind::Compact(), /*check_append_files=*/true)); + attempt += cnt; + ++generated_snapshot; + } + auto table_files_added = static_cast(append_table_files.size()); + int32_t table_files_deleted = 0; + int64_t compaction_input_file_size = 0; + int64_t compaction_output_file_size = 0; + for (const auto& entry : compact_table_files) { + const auto& kind = entry.Kind(); + if (kind == FileKind::Add()) { + ++table_files_added; + compaction_output_file_size += entry.File()->file_size; + } else if (kind == FileKind::Delete()) { + ++table_files_deleted; + compaction_input_file_size += entry.File()->file_size; + } + } + metrics_->SetCounter(CommitMetrics::LAST_COMMIT_DURATION, duration.Get()); + metrics_->SetCounter(CommitMetrics::LAST_COMMIT_ATTEMPTS, attempt); + metrics_->SetCounter(CommitMetrics::LAST_TABLE_FILES_ADDED, table_files_added); + metrics_->SetCounter(CommitMetrics::LAST_TABLE_FILES_DELETED, table_files_deleted); + metrics_->SetCounter(CommitMetrics::LAST_TABLE_FILES_APPENDED, append_table_files.size()); + metrics_->SetCounter(CommitMetrics::LAST_TABLE_FILES_COMMIT_COMPACTED, + compact_table_files.size()); + metrics_->SetCounter(CommitMetrics::LAST_CHANGELOG_FILES_APPENDED, + append_changelog_files.size()); + metrics_->SetCounter(CommitMetrics::LAST_CHANGELOG_FILES_COMMIT_COMPACTED, + compact_changelog_files.size()); + metrics_->SetCounter(CommitMetrics::LAST_GENERATED_SNAPSHOTS, generated_snapshot); + metrics_->SetCounter(CommitMetrics::LAST_DELTA_RECORDS_APPENDED, RowCounts(append_table_files)); + metrics_->SetCounter(CommitMetrics::LAST_CHANGELOG_RECORDS_APPENDED, + RowCounts(append_changelog_files)); + metrics_->SetCounter(CommitMetrics::LAST_DELTA_RECORDS_COMMIT_COMPACTED, + RowCounts(compact_table_files)); + metrics_->SetCounter(CommitMetrics::LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED, + RowCounts(compact_changelog_files)); + metrics_->SetCounter(CommitMetrics::LAST_PARTITIONS_WRITTEN, + NumChangedPartitions({append_table_files, compact_table_files})); + metrics_->SetCounter(CommitMetrics::LAST_BUCKETS_WRITTEN, + NumChangedBuckets({append_table_files, compact_table_files})); + metrics_->SetCounter(CommitMetrics::LAST_COMPACTION_INPUT_FILE_SIZE, + compaction_input_file_size); + metrics_->SetCounter(CommitMetrics::LAST_COMPACTION_OUTPUT_FILE_SIZE, + compaction_output_file_size); + return Status::OK(); +} + +Status FileStoreCommitImpl::Commit( + const std::vector>& commit_messages, int64_t identifier, + std::optional watermark) { + std::shared_ptr committable = + CreateManifestCommittable(identifier, commit_messages, watermark); + return Commit(committable, /*check_append_files=*/false); +} + +Result FileStoreCommitImpl::TryCommit(const std::vector& delta_files, + const std::vector& index_entries, + int64_t identifier, std::optional watermark, + std::map log_offsets, + const std::map& properties, + Snapshot::CommitKind commit_kind, + bool check_append_files) { + int32_t retry_count = 0; + int64_t start_millis = DateTimeUtils::GetCurrentUTCTimeUs() / 1000; + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot, + snapshot_manager_->LatestSnapshot()); + PAIMON_ASSIGN_OR_RAISE( + bool commit_success, + TryCommitOnce(delta_files, index_entries, identifier, watermark, log_offsets, + properties, commit_kind, latest_snapshot, check_append_files)); + if (commit_success) { + break; + } + int64_t current_millis = DateTimeUtils::GetCurrentUTCTimeUs() / 1000; + if (current_millis - start_millis > options_.GetCommitTimeout() || + retry_count >= options_.GetCommitMaxRetries()) { + return Status::Invalid( + fmt::format("Commit failed after {} millis with {} retries, there maybe exist " + "commit conflicts between multiple jobs.", + options_.GetCommitTimeout(), options_.GetCommitMaxRetries())); + } + retry_count++; + } + return retry_count + 1; +} + +Result>> FileStoreCommitImpl::ChangedPartitions( + const std::vector& data_files, + const std::vector& index_files) const { + std::set> partitions; + auto add_partition = [&, this](const BinaryRow& partition_row) -> Status { + std::vector> part_values; + PAIMON_ASSIGN_OR_RAISE(part_values, + partition_computer_->GeneratePartitionVector(partition_row)); + if (part_values.empty()) { + return Status::OK(); + } + std::map part_values_map; + for (const auto& [key, value] : part_values) { + part_values_map[key] = value; + } + partitions.insert(part_values_map); + return Status::OK(); + }; + + for (const ManifestEntry& entry : data_files) { + PAIMON_RETURN_NOT_OK(add_partition(entry.Partition())); + } + for (const IndexManifestEntry& entry : index_files) { + if (entry.index_file->IndexType() == DeletionVectorsIndexFile::DELETION_VECTORS_INDEX) { + PAIMON_RETURN_NOT_OK(add_partition(entry.partition)); + } + } + return partitions; +} + +Result> FileStoreCommitImpl::ReadAllEntriesFromChangedPartitions( + const Snapshot& latest_snapshot, + const std::set>& partitions) const { + std::vector> partition_filters(partitions.begin(), + partitions.end()); + auto scan_filter = std::make_shared(/*predicate=*/nullptr, partition_filters, + /*bucket_filter=*/std::nullopt); + PAIMON_ASSIGN_OR_RAISE( + auto scan, AppendOnlyFileStoreScan::Create( + snapshot_manager_, schema_manager_, manifest_list_, manifest_file_, + table_schema_, schema_, scan_filter, options_, executor_, memory_pool_)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, + scan->WithSnapshot(latest_snapshot)->CreatePlan()); + // scan existing file metas + return plan->Files(); +} + +Status FileStoreCommitImpl::NoConflictsOrFail(const std::string& base_commit_user, + const std::vector& base_entries, + const std::vector& changes) const { + ScopeGuard guard([&]() { + PAIMON_LOG_WARN(logger_, "File deletion conflicts detected! Give up committing. %s", + base_commit_user.c_str()); + }); + std::vector all_entries = base_entries; + all_entries.insert(all_entries.end(), changes.begin(), changes.end()); + std::vector merged_entries; + PAIMON_RETURN_NOT_OK(FileEntry::MergeEntries(all_entries, &merged_entries)); + for (const auto& entry : merged_entries) { + if (entry.Kind() == FileKind::Delete()) { + return Status::Invalid(fmt::format( + "Trying to delete file {} which is not previously added.", entry.FileName())); + } + } + // TODO(yonghao.fyh): check for all LSM level >= 1, key ranges of files do not intersect + guard.Release(); + return Status::OK(); +} + +Result FileStoreCommitImpl::TryCommitOnce( + const std::vector& delta_entries, + const std::vector& index_entries, int64_t identifier, + std::optional watermark, std::map log_offsets, + const std::map& properties, Snapshot::CommitKind commit_kind, + const std::optional& latest_snapshot, bool need_conflict_check) { + std::vector delta_files = delta_entries; + int64_t start_millis = DateTimeUtils::GetCurrentUTCTimeUs() / 1000; + int64_t new_snapshot_id = Snapshot::FIRST_SNAPSHOT_ID; + int64_t first_row_id_start = 0; + if (latest_snapshot) { + new_snapshot_id = latest_snapshot.value().Id() + 1; + std::optional next_row_id = latest_snapshot.value().NextRowId(); + if (next_row_id) { + first_row_id_start = next_row_id.value(); + } + } + + PAIMON_LOG_DEBUG(logger_, "Ready to commit table files to snapshot #%ld", new_snapshot_id); + for (const ManifestEntry& entry : delta_files) { + PAIMON_LOG_DEBUG(logger_, " * %s", entry.ToString().c_str()); + } + + if (need_conflict_check && latest_snapshot) { + std::set> changed_partitions; + PAIMON_ASSIGN_OR_RAISE(changed_partitions, ChangedPartitions(delta_files, index_entries)); + PAIMON_ASSIGN_OR_RAISE( + std::vector base_data_files, + ReadAllEntriesFromChangedPartitions(latest_snapshot.value(), changed_partitions)); + PAIMON_RETURN_NOT_OK( + NoConflictsOrFail(latest_snapshot.value().CommitUser(), base_data_files, delta_files)); + } + + std::vector merge_before_manifests; + std::vector merge_after_manifests; + std::pair base_manifest_list; + std::pair delta_manifest_list; + std::vector delta_statistics; + std::string new_snapshot_path; + + std::optional old_index_manifest; + std::optional index_manifest_name; + ScopeGuard guard([&]() { + int64_t commit_time = ((DateTimeUtils::GetCurrentUTCTimeUs() / 1000) - start_millis) / 1000; + PAIMON_LOG_WARN(logger_, + "Atomic commit failed for snapshot #%ld (path %s) by user %s with " + "identifier %ld and kind %s after %ld seconds. Clean up and try again.", + new_snapshot_id, new_snapshot_path.c_str(), commit_user_.c_str(), + identifier, Snapshot::CommitKind::ToString(commit_kind).c_str(), + commit_time); + + CleanUpTmpManifests(base_manifest_list.first, delta_manifest_list.first, + merge_before_manifests, merge_after_manifests, old_index_manifest, + index_manifest_name); + }); + int64_t next_row_id_start = first_row_id_start; + int64_t previous_total_record_count = 0; + + if (latest_snapshot) { + old_index_manifest = latest_snapshot.value().IndexManifest(); + // TODO(yonghao.fyh): total record count should call scan when its std::nullopt + previous_total_record_count = latest_snapshot.value().TotalRecordCount() != std::nullopt + ? latest_snapshot.value().TotalRecordCount().value() + : 0; + std::vector previous_manifests; + // read all previous manifest files + PAIMON_RETURN_NOT_OK( + manifest_list_->ReadDataManifests(latest_snapshot.value(), &previous_manifests)); + merge_before_manifests.insert(merge_before_manifests.end(), previous_manifests.begin(), + previous_manifests.end()); + // read the last snapshot to complete the bucket's offsets when logOffsets does not + // contain all buckets + std::optional> latest_log_offsets = + latest_snapshot.value().LogOffsets(); + if (latest_log_offsets) { + for (const auto& [key, value] : latest_log_offsets.value()) { + log_offsets.emplace(key, value); + } + } + std::optional latest_watermark = latest_snapshot.value().Watermark(); + if (latest_watermark) { + if (watermark == std::nullopt) { + watermark = latest_watermark; + } else { + watermark = std::max(watermark.value(), latest_watermark.value()); + } + } + } + + // try to merge old manifest files to create base manifest list + PAIMON_ASSIGN_OR_RAISE( + std::vector merged_metas, + ManifestFileMerger::Merge(merge_before_manifests, options_.GetManifestTargetFileSize(), + options_.GetManifestMergeMinCount(), + options_.GetManifestFullCompactionThresholdSize(), + manifest_file_.get())); + merge_after_manifests.insert(merge_after_manifests.end(), merged_metas.begin(), + merged_metas.end()); + PAIMON_ASSIGN_OR_RAISE(base_manifest_list, manifest_list_->Write(merge_after_manifests)); + + if (options_.RowTrackingEnabled()) { + if (options_.RowTrackingPartitionGroupOnCommit()) { + std::unordered_map> delta_files_by_partition; + for (auto& entry : delta_files) { + delta_files_by_partition[entry.Partition()].push_back(std::move(entry)); + } + delta_files.clear(); + for (auto& [_, entries] : delta_files_by_partition) { + delta_files.insert(delta_files.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); + } + } + // assigned snapshot id to delta files + AssignSnapshotId(new_snapshot_id, &delta_files); + // assign row id for new files + PAIMON_ASSIGN_OR_RAISE(next_row_id_start, + AssignRowTrackingMeta(first_row_id_start, &delta_files)); + } + + // the added records subtract the deleted records from + int64_t delta_record_count = + ManifestEntry::RecordCountAdd(delta_files) - ManifestEntry::RecordCountDelete(delta_files); + int64_t total_record_count = previous_total_record_count + delta_record_count; + + // write new delta files into manifest files + std::unordered_map partition_entry_map; + PAIMON_RETURN_NOT_OK(PartitionEntry::Merge(delta_files, &partition_entry_map)); + delta_statistics.reserve(partition_entry_map.size()); + for (const auto& [_, partition_entry] : partition_entry_map) { + delta_statistics.push_back(partition_entry); + } + PAIMON_ASSIGN_OR_RAISE(std::vector new_changes_manifests, + manifest_file_->Write(delta_files)); + merge_after_manifests.insert(merge_after_manifests.end(), new_changes_manifests.begin(), + new_changes_manifests.end()); + PAIMON_ASSIGN_OR_RAISE(delta_manifest_list, manifest_list_->Write(new_changes_manifests)); + + PAIMON_ASSIGN_OR_RAISE(index_manifest_name, index_manifest_file_->WriteIndexFiles( + old_index_manifest, index_entries)); + + std::optional> changelog_manifest_list; + std::optional statistics; + int64_t changelog_record_count = 0; + int64_t schema_id = 0; + PAIMON_ASSIGN_OR_RAISE(std::optional> table_schema, + schema_manager_->Latest()); + if (table_schema) { + schema_id = table_schema.value()->Id(); + } + + Snapshot new_snapshot( + new_snapshot_id, schema_id, base_manifest_list.first, base_manifest_list.second, + delta_manifest_list.first, delta_manifest_list.second, + changelog_manifest_list ? std::optional(changelog_manifest_list.value().first) + : std::nullopt, + changelog_manifest_list ? std::optional(changelog_manifest_list.value().second) + : std::nullopt, + index_manifest_name, commit_user_, identifier, commit_kind, + DateTimeUtils::GetCurrentUTCTimeUs() / 1000, log_offsets, total_record_count, + delta_record_count, changelog_record_count, watermark, statistics, + properties.empty() ? std::nullopt + : std::optional>(properties), + next_row_id_start); + + Result commit_result = CommitSnapshotImpl(new_snapshot, delta_statistics); + if (!commit_result.ok()) { + // commit exception, not sure about the situation and should not clean up the files. + PAIMON_LOG_WARN(logger_, "You need call FilterAndCommit to retry commit for exception. %s", + commit_result.status().ToString().c_str()); + + // To prevent the case where an atomic write times out but actually succeeds, + // retrying the commit could lead to the snapshot file being committed multiple times. + // Therefore, retries should be handled by the upper layer, + // which should call FilterAndCommit to avoid duplicate commits. + // Therefore, we should not trigger cleanup here, + // as it may delete meta files from a snapshot that was just written by ourselves, + // leading to an incomplete or corrupted snapshot. + guard.Release(); + return Status::Invalid("You need call FilterAndCommit to retry commit for exception. ", + commit_result.status().ToString()); + } + bool commit_success = commit_result.value(); + if (commit_success) { + PAIMON_LOG_INFO(logger_, + "Successfully commit snapshot %ld to table %s by user %s with identifier " + "%ld and kind %s.", + new_snapshot.Id(), root_path_.c_str(), commit_user_.c_str(), + new_snapshot.CommitIdentifier(), + Snapshot::CommitKind::ToString(new_snapshot.GetCommitKind()).c_str()); + guard.Release(); + return true; + } else { + // commit fails, should clean up the files + return false; + } +} + +void FileStoreCommitImpl::AssignSnapshotId(int64_t snapshot_id, + std::vector* delta_files) const { + for (auto& entry : *delta_files) { + entry.AssignSequenceNumber(/*min_sequence_number=*/snapshot_id, + /*max_sequence_number=*/snapshot_id); + } +} + +Result FileStoreCommitImpl::AssignRowTrackingMeta( + int64_t first_row_id_start, std::vector* delta_files) const { + if (delta_files->empty()) { + return first_row_id_start; + } + // assign row id for new files + int64_t start = first_row_id_start; + int64_t blob_start_default = first_row_id_start; + // Per-blob-field row id tracking: each blob field maintains its own start position, + // keyed by the blob field name (from write_cols[0]). + std::map blob_starts; + // TODO(xinyu.lxy): support vector store file row tracking when vector store is implemented + for (auto& entry : *delta_files) { + if (entry.File()->file_source == std::nullopt) { + return Status::Invalid( + "This is a bug, file source field for row-tracking table must present."); + } + bool contains_row_id = + entry.File()->write_cols.has_value() && + std::find(entry.File()->write_cols->begin(), entry.File()->write_cols->end(), + SpecialFields::RowId().Name()) != entry.File()->write_cols->end(); + if (entry.File()->file_source.value() == FileSource::Append() && + entry.File()->first_row_id == std::nullopt && !contains_row_id) { + int64_t row_count = entry.File()->row_count; + if (BlobUtils::IsBlobFile(entry.File()->file_name)) { + // Use the first write_col as the blob field name to support + // independent row tracking per blob field. + std::string blob_field_name; + if (!entry.File()->write_cols || entry.File()->write_cols->empty()) { + return Status::Invalid(fmt::format( + "invalid blob file {}: does not have write_cols", entry.File()->file_name)); + } + blob_field_name = entry.File()->write_cols->at(0); + int64_t blob_start = blob_starts.count(blob_field_name) + ? blob_starts[blob_field_name] + : blob_start_default; + if (blob_start >= start) { + return Status::Invalid(fmt::format( + "This is a bug, blob start {} should be less than start {} when " + "assigning a blob entry file.", + blob_start, start)); + } + entry.AssignFirstRowId(blob_start); + blob_starts[blob_field_name] = blob_start + row_count; + } else { + entry.AssignFirstRowId(start); + blob_start_default = start; + blob_starts.clear(); + start += row_count; + } + } + // for compact file, do not assign first row id. + } + return start; +} + +Result FileStoreCommitImpl::CommitSnapshotImpl( + const Snapshot& new_snapshot, const std::vector& delta_statistics) { + std::vector statistics; + statistics.reserve(delta_statistics.size()); + for (const auto& entry : delta_statistics) { + PAIMON_ASSIGN_OR_RAISE(PartitionStatistics partition_statistics, + entry.ToPartitionStatistics(partition_computer_.get())); + statistics.emplace_back(std::move(partition_statistics)); + } + Result commit_result = snapshot_commit_->Commit(new_snapshot, statistics); + if (!commit_result.ok()) { + // exception when performing the atomic rename, + // we cannot clean up because we can't determine the success + return Status::Invalid(fmt::format( + "Exception occurs when committing snapshot #{} by user {} with identifier {} and kind " + "{}. Cannot clean up because we can't determine the success. {}", + new_snapshot.Id(), commit_user_, new_snapshot.CommitIdentifier(), + Snapshot::CommitKind::ToString(new_snapshot.GetCommitKind()), + commit_result.status().ToString())); + } + return commit_result; +} + +void FileStoreCommitImpl::CleanUpTmpManifests( + const std::string& base_manifest_list_name, const std::string& delta_manifest_list_name, + const std::vector& merge_before_manifests, + const std::vector& merge_after_manifests, + const std::optional& old_index_manifest, + const std::optional& new_index_manifest) { + if (!base_manifest_list_name.empty()) { + manifest_list_->DeleteQuietly(base_manifest_list_name); + PAIMON_LOG_DEBUG(logger_, "base manifest list %s", base_manifest_list_name.c_str()); + } + if (!delta_manifest_list_name.empty()) { + manifest_list_->DeleteQuietly(delta_manifest_list_name); + PAIMON_LOG_DEBUG(logger_, "delta manifest list %s", delta_manifest_list_name.c_str()); + } + // for faster searching + std::set merge_before_manifest_set; + for (const auto& merge_before_manifest : merge_before_manifests) { + merge_before_manifest_set.emplace(merge_before_manifest.FileName()); + } + // clean up newly merged manifest files + for (const auto& merge_after_manifest : merge_after_manifests) { + if (merge_before_manifest_set.find(merge_after_manifest.FileName()) == + merge_before_manifest_set.end()) { + manifest_list_->DeleteQuietly(merge_after_manifest.FileName()); + PAIMON_LOG_DEBUG(logger_, "delete new file %s", + merge_after_manifest.FileName().c_str()); + } + } + // clean up index manifest + if (new_index_manifest && old_index_manifest != new_index_manifest) { + index_manifest_file_->DeleteQuietly(new_index_manifest.value()); + PAIMON_LOG_DEBUG(logger_, "delete new index file %s", new_index_manifest.value().c_str()); + } +} + +std::shared_ptr FileStoreCommitImpl::CreateManifestCommittable( + int64_t identifier, const std::vector>& commit_messages, + std::optional watermark) { + auto committable = std::make_shared(identifier, watermark); + for (const auto& commit_message : commit_messages) { + committable->AddFileCommittable(commit_message); + } + return committable; +} + +Status FileStoreCommitImpl::CollectChanges( + const std::vector>& commit_messages, + std::vector* append_table_files, + std::vector* append_changelog_files, + std::vector* compact_table_files, + std::vector* compact_changelog_files, + std::vector* append_table_index_files, + std::vector* compact_table_index_files) { + for (const auto& message : commit_messages) { + auto commit_message = std::dynamic_pointer_cast(message); + if (commit_message) { + DataIncrement new_files_increment = commit_message->GetNewFilesIncrement(); + for (const std::shared_ptr& new_file : new_files_increment.NewFiles()) { + append_table_files->push_back(MakeEntry(FileKind::Add(), commit_message, new_file)); + } + for (const std::shared_ptr& deleted_file : + new_files_increment.DeletedFiles()) { + append_table_files->push_back( + MakeEntry(FileKind::Delete(), commit_message, deleted_file)); + } + for (const std::shared_ptr& changelog_file : + new_files_increment.ChangelogFiles()) { + append_changelog_files->push_back( + MakeEntry(FileKind::Add(), commit_message, changelog_file)); + } + for (const std::shared_ptr& deleted_index_file : + new_files_increment.DeletedIndexFiles()) { + append_table_index_files->emplace_back( + FileKind::Delete(), commit_message->Partition(), commit_message->Bucket(), + deleted_index_file); + } + for (const std::shared_ptr& new_index_file : + new_files_increment.NewIndexFiles()) { + append_table_index_files->emplace_back(FileKind::Add(), commit_message->Partition(), + commit_message->Bucket(), new_index_file); + } + CompactIncrement compact_increment = commit_message->GetCompactIncrement(); + for (const std::shared_ptr& compact_before : + compact_increment.CompactBefore()) { + compact_table_files->push_back( + MakeEntry(FileKind::Delete(), commit_message, compact_before)); + } + for (const std::shared_ptr& compact_after : + compact_increment.CompactAfter()) { + compact_table_files->push_back( + MakeEntry(FileKind::Add(), commit_message, compact_after)); + } + for (const std::shared_ptr& changelog_file : + compact_increment.ChangelogFiles()) { + compact_changelog_files->push_back( + MakeEntry(FileKind::Add(), commit_message, changelog_file)); + } + for (const std::shared_ptr& deleted_index_file : + compact_increment.DeletedIndexFiles()) { + compact_table_index_files->emplace_back( + FileKind::Delete(), commit_message->Partition(), commit_message->Bucket(), + deleted_index_file); + } + for (const std::shared_ptr& new_index_file : + compact_increment.NewIndexFiles()) { + compact_table_index_files->emplace_back(FileKind::Add(), + commit_message->Partition(), + commit_message->Bucket(), new_index_file); + } + } else { + return Status::Invalid("fail to cast commit message to commit message impl"); + } + } + return Status::OK(); +} + +ManifestEntry FileStoreCommitImpl::MakeEntry( + const FileKind& kind, const std::shared_ptr& commit_message, + const std::shared_ptr& file) const { + int32_t total_buckets = commit_message->TotalBuckets() == std::nullopt + ? num_bucket_ + : commit_message->TotalBuckets().value(); + return ManifestEntry(kind, commit_message->Partition(), commit_message->Bucket(), total_buckets, + file); +} + +int64_t FileStoreCommitImpl::RowCounts(const std::vector& files) { + return std::accumulate(files.begin(), files.end(), 0L, + [](int64_t row_count, const ManifestEntry& entry) { + return row_count + entry.File()->row_count; + }); +} + +int64_t FileStoreCommitImpl::NumChangedPartitions( + const std::vector>& changes) { + std::unordered_set changed_partitions; + for (const auto& change : changes) { + for (const auto& entry : change) { + changed_partitions.insert(entry.Partition()); + } + } + return static_cast(changed_partitions.size()); +} + +int64_t FileStoreCommitImpl::NumChangedBuckets( + const std::vector>& changes) { + std::unordered_map> changed_partition_buckets; + for (const auto& change : changes) { + for (const auto& entry : change) { + changed_partition_buckets[entry.Partition()].insert(entry.Bucket()); + } + } + return std::accumulate(changed_partition_buckets.begin(), changed_partition_buckets.end(), + int64_t{0}, [](int64_t num_changed_buckets, const auto& bucket) { + return num_changed_buckets + + static_cast(bucket.second.size()); + }); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_commit_impl.h b/src/paimon/core/operation/file_store_commit_impl.h new file mode 100644 index 0000000..217edc0 --- /dev/null +++ b/src/paimon/core/operation/file_store_commit_impl.h @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/common/options/memory_size.h" +#include "paimon/core/catalog/snapshot_commit.h" +#include "paimon/core/core_options.h" +#include "paimon/core/manifest/partition_entry.h" +#include "paimon/core/snapshot.h" +#include "paimon/file_store_commit.h" +#include "paimon/logging.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +class Schema; +} // namespace arrow + +namespace paimon { + +class CommitContext; +class CommitMessageImpl; +struct DataFileMeta; +class ExpireSnapshots; +class FileKind; +class FileStorePathFactory; +class ManifestEntry; +class ManifestCommittable; +class ManifestFile; +class IndexManifestFile; +struct IndexManifestEntry; +class ManifestList; +class ManifestFileMeta; +class SnapshotManager; +class SchemaManager; +class TableSchema; +class BinaryRowPartitionComputer; +class CommitMessage; +class Executor; +class FileSystem; +class Logger; +class MemoryPool; +class Metrics; +class PartitionEntry; +class SnapshotCommit; + +/// Commit operation which provides commit and overwrite. +class FileStoreCommitImpl : public FileStoreCommit { + public: + FileStoreCommitImpl(const std::shared_ptr& pool, + const std::shared_ptr& executor, + const std::shared_ptr& schema, const std::string& root_path, + const std::string& commit_user, const CoreOptions& options, + const std::shared_ptr& path_factory, + std::unique_ptr partition_computer, + const std::shared_ptr& snapshot_manager, + bool ignore_empty_commit, bool use_rest_catalog_commit, + const std::shared_ptr& table_schema, + const std::shared_ptr& manifest_file, + const std::shared_ptr& manifest_list, + const std::shared_ptr& index_manifest_file, + const std::shared_ptr& expire_snapshots, + const std::shared_ptr& schema_manager); + ~FileStoreCommitImpl() override; + + Status Commit(const std::vector>& commit_messages, + int64_t commit_identifier, + std::optional watermark = std::nullopt) override; + + Result FilterAndCommit( + const std::map>>& + commit_identifier_and_messages, + std::optional watermark = std::nullopt) override; + + Status Overwrite(const std::vector>& partitions, + const std::vector>& commit_messages, + int64_t commit_identifier, + std::optional watermark = std::nullopt) override; + + Result FilterAndOverwrite( + const std::vector>& partitions, + const std::vector>& commit_messages, + int64_t commit_identifier, std::optional watermark = std::nullopt) override; + + Result GetLastCommitTableRequest() override; + + Result Expire() override; + + Status DropPartition(const std::vector>& partitions, + int64_t commit_identifier) override; + + std::shared_ptr GetCommitMetrics() const override { + return metrics_; + } + + Status Init(std::unique_ptr ctx); + + private: + Status Commit(const std::shared_ptr& manifest_committable, + bool check_append_files); + + Status TryOverwrite(const std::vector>& partition, + const std::vector& changes, int64_t commit_identifier, + std::optional watermark); + + Result> GetAllFiles( + const Snapshot& snapshot, + const std::vector>& partitions); + + Result>> FilterCommitted( + const std::vector>& committables); + + std::shared_ptr CreateManifestCommittable( + int64_t identifier, const std::vector>& commit_messages, + std::optional watermark); + + ManifestEntry MakeEntry(const FileKind& kind, + const std::shared_ptr& commit_message, + const std::shared_ptr& file) const; + + Status CollectChanges(const std::vector>& commit_messages, + std::vector* append_table_files, + std::vector* append_changelog_files, + std::vector* compact_table_files, + std::vector* compact_changelog_files, + std::vector* append_table_index_files, + std::vector* compact_table_index_files); + + Result TryCommit(const std::vector& delta_files, + const std::vector& index_entries, + int64_t identifier, std::optional watermark, + std::map log_offsets, + const std::map& properties, + Snapshot::CommitKind commit_kind, bool check_append_files); + Result TryCommitOnce(const std::vector& delta_files, + const std::vector& index_entries, + int64_t commit_identifier, std::optional watermark, + std::map log_offsets, + const std::map& properties, + Snapshot::CommitKind commit_kind, + const std::optional& latest_snapshot, + bool need_conflict_check); + + Result CommitSnapshotImpl(const Snapshot& new_snapshot, + const std::vector& delta_statistics); + + void CleanUpTmpManifests(const std::string& previous_changes_list_name, + const std::string& new_changes_list_name, + const std::vector& old_metas, + const std::vector& new_metas, + const std::optional& old_index_manifest, + const std::optional& new_index_manifest); + + Result> ReadAllEntriesFromChangedPartitions( + const Snapshot& latest_snapshot, + const std::set>& partitions) const; + + Status NoConflictsOrFail(const std::string& base_commit_user, + const std::vector& base_entries, + const std::vector& changes) const; + + Status CheckFilesExistence( + const std::vector>& committables) const; + + void AssignSnapshotId(int64_t snapshot_id, std::vector* delta_files) const; + + Result AssignRowTrackingMeta(int64_t first_row_id_start, + std::vector* delta_files) const; + + Result>> ChangedPartitions( + const std::vector& data_files, + const std::vector& index_files) const; + + static int64_t RowCounts(const std::vector& files); + + static int64_t NumChangedPartitions(const std::vector>& changes); + + static int64_t NumChangedBuckets(const std::vector>& changes); + + private: + std::shared_ptr memory_pool_; + std::shared_ptr executor_; + std::shared_ptr schema_; + std::string root_path_; + std::string commit_user_; + CoreOptions options_; + std::shared_ptr path_factory_; + std::shared_ptr fs_; + + std::unique_ptr partition_computer_; + std::shared_ptr snapshot_manager_; + std::shared_ptr snapshot_commit_; + bool ignore_empty_commit_ = true; + int32_t num_bucket_ = 0; + std::shared_ptr table_schema_; + + std::shared_ptr manifest_file_; + std::shared_ptr manifest_list_; + std::shared_ptr index_manifest_file_; + + std::shared_ptr expire_snapshots_; + std::shared_ptr schema_manager_; + + std::shared_ptr metrics_; + std::shared_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_commit_test.cpp b/src/paimon/core/operation/file_store_commit_test.cpp new file mode 100644 index 0000000..3ee13c9 --- /dev/null +++ b/src/paimon/core/operation/file_store_commit_test.cpp @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/file_store_commit.h" + +#include + +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/commit_context.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/operation/file_store_commit_impl.h" +#include "paimon/defs.h" +#include "paimon/result.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(FileStoreCommitTest, TestCreate) { + auto string_field = arrow::field("f0", arrow::utf8()); + auto int_field = arrow::field("f1", arrow::int32()); + auto int_field1 = arrow::field("f2", arrow::int32()); + auto double_field = arrow::field("f3", arrow::float64()); + auto schema = + arrow::schema(arrow::FieldVector({string_field, int_field, int_field1, double_field})); + + ::ArrowSchema arrow_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &arrow_schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + std::map options = {{Options::FILE_FORMAT, "orc"}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::FILE_SYSTEM, "local"}, + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f2"}}; + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &arrow_schema, + /*partition_keys=*/{"f1"}, + /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + std::string table_path = PathUtil::JoinPath(dir->Str(), "foo.db/bar"); + + CommitContextBuilder context_builder(table_path, "commit_user"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr commit_context, + context_builder.AddOption(Options::MANIFEST_FORMAT, "orc") + .AddOption(Options::MANIFEST_TARGET_FILE_SIZE, "8mb") + .AddOption(Options::FILE_SYSTEM, "local") + .Finish()); + ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); + auto commit_impl = dynamic_cast(commit.get()); + ASSERT_TRUE(commit_impl); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/manifest_file_merger.cpp b/src/paimon/core/operation/manifest_file_merger.cpp new file mode 100644 index 0000000..aa308e0 --- /dev/null +++ b/src/paimon/core/operation/manifest_file_merger.cpp @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/manifest_file_merger.h" + +#include +#include +#include +#include +#include + +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/manifest/file_entry.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/status.h" + +namespace paimon { + +std::shared_ptr ManifestFileMerger::GetLogger() { + static std::shared_ptr logger = Logger::GetLogger("ManifestFileMerger"); + return logger; +} + +Result> ManifestFileMerger::Merge( + const std::vector& all, int64_t target_file_size, int32_t merge_min_count, + int64_t full_compaction_file_size, ManifestFile* manifest_file) { + if (manifest_file == nullptr) { + return Status::Invalid("manifest_file is null pointer"); + } + + // these are the newly created manifest files, clean them up if exception occurs + std::vector new_metas_for_abort; + ScopeGuard guard([&]() { + for (const auto& meta : new_metas_for_abort) { + manifest_file->DeleteQuietly(meta.FileName()); + } + }); + PAIMON_ASSIGN_OR_RAISE(std::optional> full_compacted, + TryFullCompaction(all, target_file_size, full_compaction_file_size, + manifest_file, &new_metas_for_abort)); + std::vector results; + if (full_compacted != std::nullopt) { + results = full_compacted.value(); + } else { + PAIMON_ASSIGN_OR_RAISE(results, TryMinorCompaction(all, target_file_size, merge_min_count, + manifest_file, &new_metas_for_abort)); + } + guard.Release(); + return results; +} + +// TryFullCompaction aims to perform a full compaction of manifest files. It consolidates +// manifest files into two categories, base and delta, and applies a "full compaction" condition +// if certain thresholds are met. +Result>> ManifestFileMerger::TryFullCompaction( + const std::vector& all, int64_t target_file_size, + int64_t full_compaction_file_size, ManifestFile* manifest_file, + std::vector* new_metas_for_abort) { + // 1. Splitting base and delta sets + std::vector base; + int64_t total_manifest_size = 0; + size_t i = 0; + for (; i < all.size(); i++) { + ManifestFileMeta file = all[i]; + if (file.NumDeletedFiles() == 0 && file.FileSize() >= target_file_size) { + base.push_back(file); + total_manifest_size += file.FileSize(); + } else { + break; + } + } + std::vector delta; + int64_t delta_delete_file_num = 0; + int64_t total_delta_file_size = 0; + for (; i < all.size(); i++) { + const ManifestFileMeta& file = all[i]; + delta.push_back(file); + total_manifest_size += file.FileSize(); + total_delta_file_size += file.FileSize(); + delta_delete_file_num += file.NumDeletedFiles(); + } + // 2. Determining Full Compaction Requirement + if (total_delta_file_size < full_compaction_file_size) { + return std::optional>(); + } + + // 3. Merging Delta Files + PAIMON_LOG_DEBUG(GetLogger(), + "Start Manifest File Full Compaction, pick the number of delete file: %ld, " + "total manifest file size: %ld", + delta_delete_file_num, total_manifest_size); + + if (delta.size() <= 1) { + return std::optional>(); + } + + PAIMON_ASSIGN_OR_RAISE(std::vector merged_delta, + MergeEntries(delta, manifest_file)); + + // 4. Result Construction and Return + std::vector result; + result.insert(result.end(), base.begin(), base.end()); + result.insert(result.end(), merged_delta.begin(), merged_delta.end()); + new_metas_for_abort->insert(new_metas_for_abort->end(), merged_delta.begin(), + merged_delta.end()); + return std::optional>(result); +} + +// TryMinorCompaction focuses on performing a "minor compaction" of manifest files, especially +// those that are smaller than the suggested meta file size. +Result> ManifestFileMerger::TryMinorCompaction( + const std::vector& input, int64_t suggested_meta_size, + int32_t suggested_min_meta_count, ManifestFile* manifest_file, + std::vector* new_metas_for_abort) { + std::vector result; + std::vector candidates; + int64_t total_size = 0; + // merge existing small manifest files + for (const ManifestFileMeta& manifest : input) { + candidates.push_back(manifest); + total_size += manifest.FileSize(); + if (total_size >= suggested_meta_size) { + if (candidates.size() == 1) { + result.push_back(candidates[0]); + } else { + // reach suggested file size, perform merging and produce new file + PAIMON_ASSIGN_OR_RAISE(std::vector merged, + MergeEntries(candidates, manifest_file)); + result.insert(result.end(), merged.begin(), merged.end()); + new_metas_for_abort->insert(new_metas_for_abort->end(), merged.begin(), + merged.end()); + } + candidates.clear(); + total_size = 0; + } + } + + // merge the last bit of manifests if there are too many + if (candidates.size() >= static_cast(suggested_min_meta_count)) { + if (candidates.size() == 1) { + result.push_back(candidates[0]); + } else { + PAIMON_ASSIGN_OR_RAISE(std::vector merged, + MergeEntries(candidates, manifest_file)); + result.insert(result.end(), merged.begin(), merged.end()); + new_metas_for_abort->insert(new_metas_for_abort->end(), merged.begin(), merged.end()); + } + } else { + result.insert(result.end(), candidates.begin(), candidates.end()); + } + return result; +} + +Result> ManifestFileMerger::MergeEntries( + const std::vector& metas, ManifestFile* manifest_file) { + if (metas.size() == 1) { + return std::vector({metas[0]}); + } + std::vector entries; + for (const auto& meta : metas) { + PAIMON_RETURN_NOT_OK(manifest_file->Read(meta.FileName(), /*filter=*/nullptr, &entries)); + } + std::vector result; + PAIMON_RETURN_NOT_OK(FileEntry::MergeEntries(entries, &result)); + return manifest_file->Write(result); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/manifest_file_merger.h b/src/paimon/core/operation/manifest_file_merger.h new file mode 100644 index 0000000..0cc33a7 --- /dev/null +++ b/src/paimon/core/operation/manifest_file_merger.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/core/manifest/file_entry.h" +#include "paimon/core/manifest/manifest_file_meta.h" +#include "paimon/logging.h" +#include "paimon/result.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { + +class ManifestFile; + +/// This file includes several `ManifestFileMeta`, representing all data of the whole +/// table at the corresponding snapshot. +class ManifestFileMerger { + public: + /// Merge several `ManifestFileMeta`s. + /// + /// @note This method is atomic. + static Result> Merge(const std::vector& all, + int64_t manifest_target_file_size, + int32_t merge_min_count, + int64_t full_compaction_file_size, + ManifestFile* manifest_file); + + private: + static Result>> TryFullCompaction( + const std::vector& all, int64_t manifest_target_file_size, + int64_t full_compaction_file_size, ManifestFile* manifest_file, + std::vector* new_metas); + + static Result> TryMinorCompaction( + const std::vector& all, int64_t manifest_target_file_size, + int32_t merge_min_count, ManifestFile* manifest_file, + std::vector* new_metas); + + static Result> MergeEntries( + const std::vector& metas, ManifestFile* manifest_file); + + static std::shared_ptr GetLogger(); +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/manifest_file_merger_test.cpp b/src/paimon/core/operation/manifest_file_merger_test.cpp new file mode 100644 index 0000000..150723f --- /dev/null +++ b/src/paimon/core/operation/manifest_file_merger_test.cpp @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/manifest_file_merger.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "fmt/format.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/data/timestamp.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class ManifestFileMergerTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + dir_ = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + test_root_ = dir_->Str(); + partition_type_ = arrow::int64(); + CreateManifestFile(test_root_); + assert(manifest_file_); + } + + void TearDown() override {} + + ManifestEntry MakeEntry(FileKind kind, const std::string& file_name) { + return MakeEntry(kind, file_name, 0); + } + + ManifestEntry MakeEntry(FileKind kind, const std::string& file_name, + std::optional partition) { + BinaryRow binary_row = BinaryRow::EmptyRow(); + if (partition != std::nullopt) { + binary_row = BinaryRow(1); + BinaryRowWriter writer(&binary_row, 0, pool_.get()); + writer.WriteInt(0, partition.value()); + writer.Complete(); + } + + return ManifestEntry( + kind, binary_row, + 0, // not used + 0, // not used + std::make_shared( + file_name, + 0, // not used + 0, // not used + binary_row, // not used + binary_row, // not used + SimpleStats::EmptyStats(), // not used + SimpleStats::EmptyStats(), // not used + 0, // not used + 0, // not used + 0, // not used + 0, // not used + /*extra_files=*/std::vector>(), Timestamp(200000, 0), + 0, // not used + nullptr, // not used + FileSource::Append(), /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + } + + ManifestFileMeta MakeManifest(const std::vector& entries) { + EXPECT_OK_AND_ASSIGN(std::vector manifest_file_metas, + manifest_file_->Write(entries)); + // force the file size of each manifest to be 3000 + manifest_file_metas[0].file_size_ = 3000; + return manifest_file_metas[0]; + } + + private: + void CreateManifestFile(const std::string& path_str) { + auto file_system = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr file_format, + FileFormatFactory::Get("parquet", std::map())); + auto schema = arrow::schema(arrow::FieldVector({arrow::field("f0", partition_type_)})); + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ASSERT_OK_AND_ASSIGN(std::vector external_paths, + options.CreateExternalPaths()); + ASSERT_OK_AND_ASSIGN(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + + ASSERT_OK_AND_ASSIGN( + static std::shared_ptr path_factory, + FileStorePathFactory::Create( + path_str, schema, /*partition_keys=*/{"f0"}, options.GetPartitionDefaultName(), + options.GetFileFormat()->Identifier(), options.DataFilePrefix(), + options.LegacyPartitionNameEnabled(), external_paths, global_index_external_path, + options.IndexFileInDataFileDir(), pool_)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(schema, {"f0"})); + ASSERT_OK_AND_ASSIGN(manifest_file_, + ManifestFile::Create(file_system, file_format, "zstd", path_factory, + /*target_file_size=*/1024 * 1024, pool_, options, + partition_schema)); + } + + void ContainSameEntryFile( + const std::vector& metas, + const std::vector>& entry_file_name_expected) { + std::vector entries; + for (const auto& meta : metas) { + ASSERT_OK(manifest_file_->Read(meta.FileName(), /*filter=*/nullptr, &entries)); + } + std::vector> entry_file_name_actual; + for (const auto& entry : entries) { + entry_file_name_actual.emplace_back(entry.FileName(), entry.Kind()); + } + ASSERT_EQ(entry_file_name_expected, entry_file_name_actual); + } + + void AssertEquivalentEntries(const std::vector& lhs, + const std::vector& rhs) { + ASSERT_EQ(lhs.size(), rhs.size()); + for (uint32_t i = 0; i < lhs.size(); i++) { + ASSERT_EQ(lhs[i].ToString(), rhs[i].ToString()); + } + } + + static constexpr int64_t MAX_LONG_VALUE = std::numeric_limits::max(); + static constexpr int32_t MAX_INT_VALUE = std::numeric_limits::max(); + + std::unique_ptr dir_; + std::string test_root_; + std::shared_ptr pool_; + std::shared_ptr manifest_file_; + std::shared_ptr partition_type_; +}; + +TEST_F(ManifestFileMergerTest, TestMergeWithoutCompaction) { + std::vector entries; + for (int32_t i = 0; i < 16; i++) { + entries.push_back(MakeEntry(FileKind::Add(), std::to_string(i))); + } + std::vector input; + // base manifest + input.push_back(MakeManifest(entries)); + + // delta manifest + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "A"), MakeEntry(FileKind::Add(), "B"), + MakeEntry(FileKind::Add(), "C")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "D")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "A"), MakeEntry(FileKind::Delete(), "B"), + MakeEntry(FileKind::Add(), "F")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "14"), MakeEntry(FileKind::Delete(), "15")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "C"), MakeEntry(FileKind::Delete(), "D"), + MakeEntry(FileKind::Delete(), "F"), MakeEntry(FileKind::Add(), "G")})); + // every file is larger than target manifest file size 500 + ASSERT_OK_AND_ASSIGN( + std::vector actual, + ManifestFileMerger::Merge(input, 500, 3, MAX_LONG_VALUE, manifest_file_.get())); + AssertEquivalentEntries(input, actual); +} + +TEST_F(ManifestFileMergerTest, TestMergeWithoutDeleteFile) { + // entries are All Add(). + std::vector input; + // base + for (int32_t j = 0; j < 6; j++) { + std::vector entries; + for (int32_t i = 1; i < 16; i++) { + entries.push_back(MakeEntry(FileKind::Add(), fmt::format("{}-{}", j, i), j)); + } + input.push_back(MakeManifest(entries)); + } + // delta + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "A")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "B")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "C")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "D")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "E")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "F")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "G")})); + + ASSERT_OK_AND_ASSIGN(std::vector merged, + ManifestFileMerger::Merge(input, 500, 3, 200, manifest_file_.get())); + AssertEquivalentEntries(input, merged); +} + +TEST_F(ManifestFileMergerTest, TestTriggerMinorCompaction) { + std::vector entries; + for (int32_t i = 0; i < 16; i++) { + entries.push_back(MakeEntry(FileKind::Add(), std::to_string(i))); + } + std::vector input; + // base manifest + input.push_back(MakeManifest(entries)); + + // delta manifest + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "A"), MakeEntry(FileKind::Add(), "B"), + MakeEntry(FileKind::Add(), "C")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "D")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "A"), MakeEntry(FileKind::Delete(), "B"), + MakeEntry(FileKind::Add(), "F")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "14"), MakeEntry(FileKind::Delete(), "15")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "C"), MakeEntry(FileKind::Delete(), "D"), + MakeEntry(FileKind::Delete(), "F"), MakeEntry(FileKind::Add(), "G")})); + + std::vector new_metas; + // trigger minor compaction + ASSERT_OK_AND_ASSIGN(std::vector merged, + ManifestFileMerger::TryMinorCompaction( + input, /*manifest_target_file_size=*/5000, + /*merge_min_count=*/30, manifest_file_.get(), &new_metas)); + ASSERT_EQ(3, new_metas.size()); + + std::vector> entry_file_expected; + for (int32_t i = 0; i < 16; i++) { + entry_file_expected.emplace_back(std::to_string(i), FileKind::Add()); + } + entry_file_expected.emplace_back("A", FileKind::Add()); + entry_file_expected.emplace_back("B", FileKind::Add()); + entry_file_expected.emplace_back("C", FileKind::Add()); + entry_file_expected.emplace_back("D", FileKind::Add()); + entry_file_expected.emplace_back("A", FileKind::Delete()); + entry_file_expected.emplace_back("B", FileKind::Delete()); + entry_file_expected.emplace_back("F", FileKind::Add()); + entry_file_expected.emplace_back("14", FileKind::Delete()); + entry_file_expected.emplace_back("15", FileKind::Delete()); + entry_file_expected.emplace_back("C", FileKind::Delete()); + entry_file_expected.emplace_back("D", FileKind::Delete()); + entry_file_expected.emplace_back("F", FileKind::Delete()); + entry_file_expected.emplace_back("G", FileKind::Add()); + ContainSameEntryFile(merged, entry_file_expected); +} + +TEST_F(ManifestFileMergerTest, TestTriggerMinorCompactionWithLastBit) { + std::vector entries; + for (int32_t i = 0; i < 16; i++) { + entries.push_back(MakeEntry(FileKind::Add(), std::to_string(i))); + } + std::vector input; + // base manifest + input.push_back(MakeManifest(entries)); + + // delta manifest + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "A"), MakeEntry(FileKind::Add(), "B"), + MakeEntry(FileKind::Add(), "C")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "D")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "A"), MakeEntry(FileKind::Delete(), "B"), + MakeEntry(FileKind::Add(), "F")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "14"), MakeEntry(FileKind::Delete(), "15")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "C"), MakeEntry(FileKind::Delete(), "D"), + MakeEntry(FileKind::Delete(), "F"), MakeEntry(FileKind::Add(), "G")})); + + std::vector new_metas; + // trigger minor compaction + ASSERT_OK_AND_ASSIGN(std::vector merged, + ManifestFileMerger::TryMinorCompaction( + input, /*manifest_target_file_size=*/10000, + /*merge_min_count=*/2, manifest_file_.get(), &new_metas)); + ASSERT_EQ(2, new_metas.size()); + + std::vector> entry_file_expected; + for (int32_t i = 0; i < 16; i++) { + entry_file_expected.emplace_back(std::to_string(i), FileKind::Add()); + } + entry_file_expected.emplace_back("C", FileKind::Add()); + entry_file_expected.emplace_back("D", FileKind::Add()); + entry_file_expected.emplace_back("F", FileKind::Add()); + + entry_file_expected.emplace_back("14", FileKind::Delete()); + entry_file_expected.emplace_back("15", FileKind::Delete()); + entry_file_expected.emplace_back("C", FileKind::Delete()); + entry_file_expected.emplace_back("D", FileKind::Delete()); + entry_file_expected.emplace_back("F", FileKind::Delete()); + entry_file_expected.emplace_back("G", FileKind::Add()); + ContainSameEntryFile(merged, entry_file_expected); +} + +TEST_F(ManifestFileMergerTest, TestTriggerFullCompaction) { + std::vector entries; + for (int32_t i = 0; i < 16; i++) { + entries.push_back(MakeEntry(FileKind::Add(), std::to_string(i))); + } + + std::vector input; + + // base manifest + input.push_back(MakeManifest(entries)); + + // delta manifest + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "A"), MakeEntry(FileKind::Add(), "B"), + MakeEntry(FileKind::Add(), "C")})); + input.push_back(MakeManifest({MakeEntry(FileKind::Add(), "D")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "A"), MakeEntry(FileKind::Delete(), "B"), + MakeEntry(FileKind::Add(), "F")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "14"), MakeEntry(FileKind::Delete(), "15")})); + input.push_back( + MakeManifest({MakeEntry(FileKind::Delete(), "C"), MakeEntry(FileKind::Delete(), "D"), + MakeEntry(FileKind::Delete(), "F"), MakeEntry(FileKind::Add(), "G")})); + + // no trigger for delta size + std::vector new_metas; + ASSERT_OK_AND_ASSIGN( + std::optional> full_compacted, + ManifestFileMerger::TryFullCompaction(input, /*manifest_target_file_size=*/500, + /*full_compaction_file_size=*/MAX_INT_VALUE, + manifest_file_.get(), &new_metas)); + ASSERT_EQ(std::nullopt, full_compacted); + ASSERT_EQ(0, new_metas.size()); + new_metas.clear(); + + // trigger full compaction + ASSERT_OK_AND_ASSIGN(std::optional> merged, + ManifestFileMerger::TryFullCompaction( + input, /*manifest_target_file_size=*/5000, + /*full_compaction_file_size=*/100, manifest_file_.get(), &new_metas)); + ASSERT_NE(std::nullopt, merged); + ASSERT_GT(new_metas.size(), 0); + + std::vector> entry_file_expected; + for (int32_t i = 0; i < 14; i++) { + entry_file_expected.emplace_back(std::to_string(i), FileKind::Add()); + } + entry_file_expected.emplace_back("G", FileKind::Add()); + ContainSameEntryFile(merged.value(), entry_file_expected); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/metrics/commit_metrics.h b/src/paimon/core/operation/metrics/commit_metrics.h new file mode 100644 index 0000000..cb26a12 --- /dev/null +++ b/src/paimon/core/operation/metrics/commit_metrics.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +namespace paimon { + +/// Metrics to measure a commit. +class CommitMetrics { + public: + static constexpr char LAST_COMMIT_DURATION[] = "lastCommitDuration"; + static constexpr char LAST_COMMIT_ATTEMPTS[] = "lastCommitAttempts"; + static constexpr char LAST_TABLE_FILES_ADDED[] = "lastTableFilesAdded"; + static constexpr char LAST_TABLE_FILES_DELETED[] = "lastTableFilesDeleted"; + static constexpr char LAST_TABLE_FILES_APPENDED[] = "lastTableFilesAppended"; + static constexpr char LAST_TABLE_FILES_COMMIT_COMPACTED[] = "lastTableFilesCommitCompacted"; + static constexpr char LAST_CHANGELOG_FILES_APPENDED[] = "lastChangelogFilesAppended"; + static constexpr char LAST_CHANGELOG_FILES_COMMIT_COMPACTED[] = + "lastChangelogFileCommitCompacted"; + static constexpr char LAST_GENERATED_SNAPSHOTS[] = "lastGeneratedSnapshots"; + static constexpr char LAST_DELTA_RECORDS_APPENDED[] = "lastDeltaRecordsAppended"; + static constexpr char LAST_CHANGELOG_RECORDS_APPENDED[] = "lastChangelogRecordsAppended"; + static constexpr char LAST_DELTA_RECORDS_COMMIT_COMPACTED[] = "lastDeltaRecordsCommitCompacted"; + static constexpr char LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED[] = + "lastChangelogRecordsCommitCompacted"; + static constexpr char LAST_PARTITIONS_WRITTEN[] = "lastPartitionsWritten"; + static constexpr char LAST_BUCKETS_WRITTEN[] = "lastBucketsWritten"; + static constexpr char LAST_COMPACTION_INPUT_FILE_SIZE[] = "lastCompactionInputFileSize"; + static constexpr char LAST_COMPACTION_OUTPUT_FILE_SIZE[] = "lastCompactionOutputFileSize"; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/metrics/commit_metrics_test.cpp b/src/paimon/core/operation/metrics/commit_metrics_test.cpp new file mode 100644 index 0000000..787a01d --- /dev/null +++ b/src/paimon/core/operation/metrics/commit_metrics_test.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/metrics/commit_metrics.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(CommitMetricsTest, TestSimple) { + auto commit_metrics = std::make_shared(); + commit_metrics->SetCounter("some_metric", 100); + commit_metrics->SetCounter(CommitMetrics::LAST_COMMIT_ATTEMPTS, 30); + ASSERT_OK_AND_ASSIGN(uint64_t counter, + commit_metrics->GetCounter(CommitMetrics::LAST_COMMIT_ATTEMPTS)); + ASSERT_EQ(30, counter); + ASSERT_OK_AND_ASSIGN(counter, commit_metrics->GetCounter("some_metric")); + ASSERT_EQ(100, counter); + auto other = std::make_shared(); + other->SetCounter("some_metric_2", 200); + other->SetCounter(CommitMetrics::LAST_COMMIT_ATTEMPTS, 50); + commit_metrics->Merge(other); + ASSERT_OK_AND_ASSIGN(counter, commit_metrics->GetCounter(CommitMetrics::LAST_COMMIT_ATTEMPTS)); + ASSERT_EQ(80, counter); + ASSERT_OK_AND_ASSIGN(counter, commit_metrics->GetCounter("some_metric")); + ASSERT_EQ(100, counter); + ASSERT_OK_AND_ASSIGN(counter, commit_metrics->GetCounter("some_metric_2")); + ASSERT_EQ(200, counter); +} + +} // namespace paimon::test