From 2ad5764c1c8baaf0f42c271f31404a9fe3d6b067 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Thu, 18 Jun 2026 10:35:47 +0800 Subject: [PATCH] feat: add file store write and restore utilities --- include/paimon/file_store_write.h | 95 +++++ include/paimon/write_context.h | 250 ++++++++++++ .../operation/abstract_file_store_write.cpp | 368 ++++++++++++++++++ .../operation/abstract_file_store_write.h | 159 ++++++++ .../core/operation/file_store_write.cpp | 223 +++++++++++ .../core/operation/file_store_write_test.cpp | 187 +++++++++ .../operation/file_system_write_restore.h | 95 +++++ .../file_system_write_restore_test.cpp | 75 ++++ src/paimon/core/operation/restore_files.h | 77 ++++ src/paimon/core/operation/write_context.cpp | 207 ++++++++++ .../core/operation/write_context_test.cpp | 113 ++++++ src/paimon/core/operation/write_restore.cpp | 45 +++ src/paimon/core/operation/write_restore.h | 50 +++ .../core/operation/write_restore_test.cpp | 97 +++++ 14 files changed, 2041 insertions(+) create mode 100644 include/paimon/file_store_write.h create mode 100644 include/paimon/write_context.h create mode 100644 src/paimon/core/operation/abstract_file_store_write.cpp create mode 100644 src/paimon/core/operation/abstract_file_store_write.h create mode 100644 src/paimon/core/operation/file_store_write.cpp create mode 100644 src/paimon/core/operation/file_store_write_test.cpp create mode 100644 src/paimon/core/operation/file_system_write_restore.h create mode 100644 src/paimon/core/operation/file_system_write_restore_test.cpp create mode 100644 src/paimon/core/operation/restore_files.h create mode 100644 src/paimon/core/operation/write_context.cpp create mode 100644 src/paimon/core/operation/write_context_test.cpp create mode 100644 src/paimon/core/operation/write_restore.cpp create mode 100644 src/paimon/core/operation/write_restore.h create mode 100644 src/paimon/core/operation/write_restore_test.cpp diff --git a/include/paimon/file_store_write.h b/include/paimon/file_store_write.h new file mode 100644 index 0000000..16c6528 --- /dev/null +++ b/include/paimon/file_store_write.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include + +#include "paimon/commit_message.h" +#include "paimon/defs.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class RecordBatch; +class WriteContext; + +/// Interface for write operations in a file store. +class PAIMON_EXPORT FileStoreWrite { + public: + /// Create an instance of `FileStoreWrite`. + /// + /// @param context A unique pointer to the `WriteContext` used for write operations. + /// + /// @return A Result containing a unique pointer to the `FileStoreWrite` instance. + static Result> Create(std::unique_ptr context); + + virtual ~FileStoreWrite() = default; + + /// Support write an input `RecordBatch` to internal buffer or file. + /// @note If a field in table schema is marked as non-nullable (`nullable = false`), + /// the corresponding array in `batch` must have zero null entries. + virtual Status Write(std::unique_ptr&& batch) = 0; + + /// Compact data stored in given partition and bucket. Note that compaction process is only + /// submitted and may not be completed when the method returns. + /// + /// @param partition the partition to compact + /// @param bucket the bucket to compact + /// @param full_compaction whether to trigger full compaction or just normal compaction + /// + /// @return status for compacting the records + virtual Status Compact(const std::map& partition, int32_t bucket, + bool full_compaction) = 0; + + /// Generate a list of commit messages with the latest generated data file meta + /// information of the current snapshot. + /// + /// When we need commit, call PrepareCommit to get the current {@link CommitMessage}s with the + /// latest generated data file meta information of the current snapshot. + /// + /// This function is designed to be called when a commit is required. Depending on the writing + /// scenario, the behavior will differ: + /// + /// - For batch write, simply call `PrepareCommit()` without any parameters. + /// - For streaming write, you need to provide both parameters: + /// `PrepareCommit(bool wait_compaction, int64_t commit_identifier)`. + /// + /// @param wait_compaction Indicates whether to wait for any ongoing compaction process to + /// complete. + /// @param commit_identifier A unique identifier for the commit operation. This parameter is + /// only relevant in streaming write scenarios. + /// + /// @return A Result containing `std::vector>` objects, + /// representing the generated commit messages. + virtual Result>> PrepareCommit( + bool wait_compaction = true, int64_t commit_identifier = BATCH_WRITE_COMMIT_IDENTIFIER) = 0; + virtual std::shared_ptr GetMetrics() const = 0; + virtual Status Close() = 0; +}; + +} // namespace paimon diff --git a/include/paimon/write_context.h b/include/paimon/write_context.h new file mode 100644 index 0000000..375eb8b --- /dev/null +++ b/include/paimon/write_context.h @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class Executor; +class MemoryPool; + +/// `WriteContext` is some configuration for write operations. +/// +/// Please do not use this class directly, use `WriteContextBuilder` to build a `WriteContext` which +/// has input validation. +/// @see WriteContextBuilder +class PAIMON_EXPORT WriteContext { + public: + WriteContext(const std::string& root_path, const std::string& commit_user, + bool is_streaming_mode, bool ignore_num_bucket_check, bool ignore_previous_files, + bool enable_multi_thread_spill, const std::optional& write_id, + const std::string& branch, const std::vector& write_schema, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor, const std::string& temp_directory, + const std::shared_ptr& specific_file_system, + const std::map& fs_scheme_to_identifier_map, + const std::map& options); + + ~WriteContext(); + + const std::string& GetRootPath() const { + return root_path_; + } + + const std::string& GetCommitUser() const { + return commit_user_; + } + + const std::map& GetFileSystemSchemeToIdentifierMap() const { + return fs_scheme_to_identifier_map_; + } + + const std::map& GetOptions() const { + return options_; + } + + bool IsStreamingMode() const { + return is_streaming_mode_; + } + + bool IgnoreNumBucketCheck() const { + return ignore_num_bucket_check_; + } + + bool IgnorePreviousFiles() const { + return ignore_previous_files_; + } + + const std::optional GetWriteId() const { + return write_id_; + } + + const std::string& GetBranch() const { + return branch_; + } + + const std::vector& GetWriteSchema() const { + return write_schema_; + } + + std::shared_ptr GetMemoryPool() const { + return memory_pool_; + } + + std::shared_ptr GetExecutor() const { + return executor_; + } + + const std::string& GetTempDirectory() const { + return temp_directory_; + } + + std::shared_ptr GetSpecificFileSystem() const { + return specific_file_system_; + } + + bool EnableMultiThreadSpill() const { + return enable_multi_thread_spill_; + } + + private: + std::string root_path_; + std::string commit_user_; + std::string branch_; + bool is_streaming_mode_; + bool ignore_num_bucket_check_; + bool ignore_previous_files_; + bool enable_multi_thread_spill_; + std::optional write_id_; + std::vector write_schema_; + std::shared_ptr memory_pool_; + std::shared_ptr executor_; + std::string temp_directory_; + std::shared_ptr specific_file_system_; + std::map fs_scheme_to_identifier_map_; + std::map options_; +}; + +/// `WriteContextBuilder` used to build a `WriteContext`, has input validation. +class PAIMON_EXPORT WriteContextBuilder { + public: + /// Constructs a `WriteContextBuilder` with required parameters. + /// @param root_path The root path of the table. + /// @param commit_user The user identifier for commit operations. + WriteContextBuilder(const std::string& root_path, const std::string& commit_user); + + ~WriteContextBuilder(); + + /// Set a configuration options map to set some option entries which are not defined in the + /// table schema or whose values you want to overwrite. + /// @note The options map will clear the options added by `AddOption()` before. + /// @param options The configuration options map. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& SetOptions(const std::map& options); + + /// Add a single configuration option which is not defined in the table schema or whose value + /// you want to overwrite. + /// + /// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()` + /// instead. + /// @param key The option key. + /// @param value The option value. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& AddOption(const std::string& key, const std::string& value); + + /// Set whether to enable streaming mode (default is false) + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithStreamingMode(bool is_streaming_mode); + + /// Set whether to skip num-bucket consistency check (default is false) + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithIgnoreNumBucketCheck(bool ignore_num_bucket_check); + + /// Set whether the write operation should ignore previously stored files. (default is false) + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithIgnorePreviousFiles(bool ignore_previous_files); + + /// Set custom memory pool for memory management. + /// @param memory_pool The memory pool to use. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithMemoryPool(const std::shared_ptr& memory_pool); + + /// Set custom executor for task execution. + /// @param executor The executor to use. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithExecutor(const std::shared_ptr& executor); + + /// Set the temporary directory path for IO operations (lookup and external disk spill). + /// @param temp_dir The temporary directory path. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithTempDirectory(const std::string& temp_dir); + + /// For postpone bucket mode in pk table, `WithWriteId()` supposed to be used. + /// + /// Each worker must have its own unique `write_id` within a task, which is used as the prefix + /// for its data files. This ensures that files from the same worker share the same prefix and + /// can be consumed by the same compaction reader to preserve input order. + /// + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithWriteId(int32_t write_id); + + /// Write to specific branch, default is main. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithBranch(const std::string& branch); + + /// For data evolution, user can write partial specific fields from table schema. + /// If not set, write all fields in table. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& WithWriteSchema(const std::vector& write_schema); + + /// Sets a custom file system instance to be used for all file operations in this write context. + /// This bypasses the global file system registry and uses the provided implementation directly. + /// + /// @param file_system The file system to use. + /// @return Reference to this builder for method chaining. + /// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`) + WriteContextBuilder& WithFileSystem(const std::shared_ptr& file_system); + + /// Sets a mapping from URI schemes (e.g., "file", "oss") to registered file system + /// identifiers. This allows selecting different pre-registered file system implementations + /// based on the URI scheme at runtime. + /// + /// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to the corresponding + /// file system identifier. + /// @return Reference to this builder for method chaining. + /// @note + /// - This method is intended for environments where multiple file systems are pre-registered. + /// - The specified identifiers must correspond to file systems that have been registered at + /// compile time or initialization. + /// - Cannot be used together with `WithFileSystem()`. + /// - If not set, use default file system (configured in `Options::FILE_SYSTEM`). + /// Example: + /// builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"}, {"file", "local"}}); + /// + WriteContextBuilder& WithFileSystemSchemeToIdentifierMap( + const std::map& fs_scheme_to_identifier_map); + + /// Set the thread number for write buffer spill operations. (default is 0) + /// If <= 0, threading is disabled for spill IPC read/write. + /// If > 0, sets arrow CPU thread pool capacity for spill operations. + /// @param thread_number The thread number to use for spill operations. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& SetWriteBufferSpillThreadNumber(int32_t thread_number); + + /// Build and return a `WriteContext` instance with input validation. + /// @return Result containing the constructed `WriteContext` or an error status. + Result> Finish(); + + private: + class Impl; + + std::unique_ptr impl_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/abstract_file_store_write.cpp b/src/paimon/core/operation/abstract_file_store_write.cpp new file mode 100644 index 0000000..0728cdd --- /dev/null +++ b/src/paimon/core/operation/abstract_file_store_write.cpp @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/operation/abstract_file_store_write.h" + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/file_system_write_restore.h" +#include "paimon/core/operation/metrics/compaction_metrics.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/utils/batch_writer.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/macros.h" +#include "paimon/record_batch.h" +#include "paimon/scan_context.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class Executor; +class MemoryPool; + +AbstractFileStoreWrite::AbstractFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr& executor, const std::shared_ptr& pool) + : pool_(pool), + executor_(executor), + file_store_path_factory_(file_store_path_factory), + snapshot_manager_(snapshot_manager), + schema_manager_(schema_manager), + commit_user_(commit_user), + root_path_(root_path), + schema_(schema), + write_schema_(write_schema), + table_schema_(table_schema), + partition_schema_(partition_schema), + dv_maintainer_factory_(dv_maintainer_factory), + io_manager_(io_manager), + options_(options), + compact_executor_(CreateDefaultExecutor(4)), + compaction_metrics_(std::make_shared()), + ignore_previous_files_(ignore_previous_files), + is_streaming_mode_(is_streaming_mode), + ignore_num_bucket_check_(ignore_num_bucket_check), + metrics_(std::make_shared()), + logger_(Logger::GetLogger("AbstractFileStoreWrite")) { + writer_memory_manager_ = + std::make_unique(static_cast(options.GetWriteBufferSize())); + cache_manager_ = std::make_shared(options.GetLookupCacheMaxMemory(), + options.GetLookupCacheHighPrioPoolRatio()); +} + +Status AbstractFileStoreWrite::Write(std::unique_ptr&& batch) { + if (PAIMON_UNLIKELY(batch == nullptr)) { + return Status::Invalid("batch is null pointer"); + } + // in FileStoreWrite::Create() we have checked the table kind and bucket mode, here we only + // check the bucket id in batch + if (options_.GetBucket() == -1) { + assert(table_schema_->PrimaryKeys().empty()); + if (!batch->HasSpecifiedBucket()) { + batch->SetBucket(BucketModeDefine::UNAWARE_BUCKET); + } else if (batch->GetBucket() != BucketModeDefine::UNAWARE_BUCKET) { + return Status::Invalid( + fmt::format("batch bucket is {} while options bucket is -1", batch->GetBucket())); + } + } else if (options_.GetBucket() == BucketModeDefine::POSTPONE_BUCKET) { + assert(!table_schema_->PrimaryKeys().empty()); + if (!batch->HasSpecifiedBucket()) { + batch->SetBucket(BucketModeDefine::POSTPONE_BUCKET); + } else if (batch->GetBucket() != BucketModeDefine::POSTPONE_BUCKET) { + return Status::Invalid( + fmt::format("batch bucket is {} while options bucket is -2", batch->GetBucket())); + } + } else { + assert(options_.GetBucket() > 0); + if (!(batch->GetBucket() >= 0 && batch->GetBucket() < options_.GetBucket())) { + return Status::Invalid( + fmt::format("fixed bucketed mode must specify a bucket which in [0, {}) in " + "RecordBatch", + options_.GetBucket())); + } + } + // check nullability + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr data, + arrow::ImportArray(batch->GetData(), arrow::struct_(write_schema_->fields()))); + PAIMON_RETURN_NOT_OK(ArrowUtils::CheckNullabilityMatch(write_schema_, data)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*data, batch->GetData())); + + PAIMON_ASSIGN_OR_RAISE(BinaryRow partition, + file_store_path_factory_->ToBinaryRow(batch->GetPartition())) + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr writer, + GetWriter(partition, batch->GetBucket())); + assert(writer); + PAIMON_RETURN_NOT_OK(writer->Write(std::move(batch))); + PAIMON_RETURN_NOT_OK(writer_memory_manager_->OnWriteCompleted(writer.get())); + return Status::OK(); +} + +Status AbstractFileStoreWrite::Compact(const std::map& partition, + int32_t bucket, bool full_compaction) { + PAIMON_ASSIGN_OR_RAISE(BinaryRow part, file_store_path_factory_->ToBinaryRow(partition)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr writer, GetWriter(part, bucket)); + assert(writer); + return writer->Compact(full_compaction); +} + +Result>> AbstractFileStoreWrite::PrepareCommit( + bool wait_compaction, int64_t commit_identifier) { + if (batch_committed_) { + return Status::Invalid("batch write mode only support one-time committing."); + } + if (is_streaming_mode_ == false) { + // batch write prepare commit will ignore these params + batch_committed_ = true; + wait_compaction = true; + commit_identifier = std::numeric_limits::max(); + } + int64_t latest_committed_identifier = std::numeric_limits::min(); + for (const auto& kv : writers_) { + const auto& buckets = kv.second; + for (const auto& kv : buckets) { + const auto& writer_container = kv.second; + latest_committed_identifier = std::max( + latest_committed_identifier, writer_container.last_modified_commit_identifier); + } + } + if (latest_committed_identifier == std::numeric_limits::min()) { + // Optimization for the first commit. + // + // If this is the first commit, no writer has previous modified commit, so the value of + // `latestCommittedIdentifier` does not matter. + // + // Without this optimization, we may need to scan through all snapshots only to find + // that there is no previous snapshot by this user, which is very inefficient. + } else { + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot, + snapshot_manager_->LatestSnapshotOfUser(commit_user_)); + if (latest_snapshot == std::nullopt) { + latest_committed_identifier = std::numeric_limits::min(); + } else { + latest_committed_identifier = latest_snapshot.value().CommitIdentifier(); + } + } + + std::vector> result; + auto metrics = compaction_metrics_->GetMetrics(); + for (auto partition_iter = writers_.begin(); partition_iter != writers_.end();) { + auto& partition = partition_iter->first; + auto& buckets = partition_iter->second; + for (auto bucket_iter = buckets.begin(); bucket_iter != buckets.end();) { + int32_t bucket = bucket_iter->first; + WriterContainer& writer_container = bucket_iter->second; + PAIMON_ASSIGN_OR_RAISE(CommitIncrement increment, + writer_container.writer->PrepareCommit(wait_compaction)); + writer_memory_manager_->RefreshWriterMemory(writer_container.writer.get()); + auto compact_deletion_file = increment.GetCompactDeletionFile(); + auto& compact_increment = increment.GetCompactIncrement(); + if (compact_deletion_file) { + PAIMON_ASSIGN_OR_RAISE( + std::optional> dv_index_file_meta, + compact_deletion_file->GetOrCompute()); + if (dv_index_file_meta) { + compact_increment.AddNewIndexFiles({dv_index_file_meta.value()}); + } + } + + auto committable = std::make_shared( + partition, bucket, writer_container.total_buckets, increment.GetNewFilesIncrement(), + compact_increment); + result.push_back(committable); + if (!committable->IsEmpty()) { + writer_container.last_modified_commit_identifier = commit_identifier; + metrics->Merge(writer_container.writer->GetMetrics()); + ++bucket_iter; + continue; + } + // Condition 1: There is no more record waiting to be committed. Note that the + // condition is < (instead of <=), because each commit identifier may have + // multiple snapshots. We must make sure all snapshots of this identifier are + // committed. + // Condition 2: No compaction is in progress. That is, no more changelog will be + // produced. + // + // Condition 3: The writer has no postponed compaction like gentle lookup + // compaction. + if (writer_container.last_modified_commit_identifier < latest_committed_identifier) { + PAIMON_ASSIGN_OR_RAISE(bool has_pending_compaction, + writer_container.writer->CompactNotCompleted()); + if (!has_pending_compaction) { + // Clear writer if no update, and if its latest modification has committed. + // + // We need a mechanism to clear writers, otherwise there will be more and + // more such as yesterday's partition that no longer needs to be written. + PAIMON_LOG_DEBUG(logger_, + "Closing writer for partition %s, bucket %d. " + "Writer's last modified identifier is %ld, " + "while latest committed identifier is %ld, " + "current commit identifier is %ld.", + partition.ToString().c_str(), bucket, + writer_container.last_modified_commit_identifier, + latest_committed_identifier, commit_identifier); + writer_memory_manager_->UnregisterWriter(writer_container.writer.get()); + PAIMON_RETURN_NOT_OK(writer_container.writer->Close()); + bucket_iter = buckets.erase(bucket_iter); + continue; + } + } + metrics->Merge(writer_container.writer->GetMetrics()); + ++bucket_iter; + } + + if (buckets.empty()) { + partition_iter = writers_.erase(partition_iter); + } else { + ++partition_iter; + } + } + + metrics_->Overwrite(metrics); + return result; +} + +Status AbstractFileStoreWrite::Close() { + for (auto& [_, bucket_writers] : writers_) { + for (auto& [_, writer_container] : bucket_writers) { + writer_memory_manager_->UnregisterWriter(writer_container.writer.get()); + PAIMON_RETURN_NOT_OK(writer_container.writer->Close()); + } + } + writers_.clear(); + compact_executor_->ShutdownNow(); + return Status::OK(); +} + +std::shared_ptr AbstractFileStoreWrite::GetMetrics() const { + return metrics_; +} + +int32_t AbstractFileStoreWrite::GetDefaultBucketNum() const { + return options_.GetBucket(); +} + +Result> AbstractFileStoreWrite::ScanExistingFileMetas( + const BinaryRow& partition, int32_t bucket) const { + PAIMON_ASSIGN_OR_RAISE(auto part_values, + file_store_path_factory_->GeneratePartitionVector(partition)); + std::map part_values_map; + for (const auto& [key, value] : part_values) { + part_values_map[key] = value; + } + std::vector> partition_filters; + if (!part_values_map.empty()) { + partition_filters.push_back(part_values_map); + } + auto scan_filter = std::make_shared( + /*predicate=*/nullptr, partition_filters, std::optional(bucket)); + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan, CreateFileStoreScan(scan_filter)); + std::shared_ptr index_file_handler; + if (dv_maintainer_factory_) { + index_file_handler = dv_maintainer_factory_->GetIndexFileHandler(); + } + FileSystemWriteRestore restore(snapshot_manager_, std::move(scan), index_file_handler); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr restore_files, + restore.GetRestoreFiles(partition, bucket, dv_maintainer_factory_ != nullptr)); + + std::optional restored_total_buckets = restore_files->TotalBuckets(); + int32_t total_buckets = GetDefaultBucketNum(); + if (restored_total_buckets) { + total_buckets = restored_total_buckets.value(); + } + + if (!ignore_num_bucket_check_ && total_buckets != options_.GetBucket()) { + return Status::Invalid(fmt::format( + "Try to write table with a new bucket num {}, but the previous " + "bucket num is {}. Please switch to batch mode, and perform INSERT OVERWRITE to " + "rescale current data layout first.", + options_.GetBucket(), total_buckets)); + } + return restore_files; +} + +Result> AbstractFileStoreWrite::GetWriter(const BinaryRow& partition, + int32_t bucket) { + auto partition_iter = writers_.find(partition); + if (partition_iter != writers_.end()) { + auto& buckets = partition_iter->second; + auto bucket_iter = buckets.find(bucket); + if (PAIMON_LIKELY(bucket_iter != buckets.end())) { + return bucket_iter->second.writer; + } + } + + std::shared_ptr restored = RestoreFiles::Empty(); + if (!ignore_previous_files_) { + PAIMON_ASSIGN_OR_RAISE(restored, ScanExistingFileMetas(partition, bucket)); + } + + auto restore_data_files = restored->DataFiles(); + int64_t max_sequence_number = DataFileMeta::GetMaxSequenceNumber(restore_data_files); + std::shared_ptr dv_maintainer; + if (dv_maintainer_factory_) { + PAIMON_ASSIGN_OR_RAISE( + dv_maintainer, + dv_maintainer_factory_->Create(partition, bucket, restored->DeleteVectorsIndex())); + } + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer, + CreateWriter(partition, bucket, restore_data_files, max_sequence_number, dv_maintainer)); + int32_t total_buckets = restored->TotalBuckets().value_or(GetDefaultBucketNum()); + + if (partition_iter == writers_.end()) { + writers_.emplace(partition, + std::unordered_map>( + {{bucket, WriterContainer(writer, total_buckets)}})); + } else { + partition_iter->second.emplace(bucket, WriterContainer(writer, total_buckets)); + } + writer_memory_manager_->RegisterWriter(writer.get()); + + return writer; +} + +} // namespace paimon diff --git a/src/paimon/core/operation/abstract_file_store_write.h b/src/paimon/core/operation/abstract_file_store_write.h new file mode 100644 index 0000000..c93e7c3 --- /dev/null +++ b/src/paimon/core/operation/abstract_file_store_write.h @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "paimon/commit_message.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/io/cache/cache_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" +#include "paimon/core/memory/writer_memory_manager.h" +#include "paimon/file_store_write.h" +#include "paimon/logging.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" + +struct ArrowSchema; + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +struct DataFileMeta; +class BatchWriter; +class CompactionMetrics; +class FileStoreScan; +class FileStorePathFactory; +class ScanFilter; +class Snapshot; +class SnapshotManager; +class SchemaManager; +class TableSchema; +class MetricsImpl; +class BinaryRow; +class Executor; +class MemoryPool; +class RecordBatch; +class RestoreFiles; +class IOManager; + +class AbstractFileStoreWrite : public FileStoreWrite { + public: + // schema indicates all fields in table schema, write_schema indicates actual write fields while + // "data-evolution.enabled" is true + AbstractFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& write_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr& executor, const std::shared_ptr& pool); + + Status Write(std::unique_ptr&& batch) override; + Status Compact(const std::map& partition, int32_t bucket, + bool full_compaction) override; + + Result>> PrepareCommit( + bool wait_compaction, int64_t commit_identifier) override; + Status Close() override; + std::shared_ptr GetMetrics() const override; + + const CoreOptions& GetOptions() const { + return options_; + } + + template + struct WriterContainer { + public: + WriterContainer() = default; + WriterContainer(const std::shared_ptr& writer, int32_t total_buckets) + : writer(writer), total_buckets(total_buckets) {} + std::shared_ptr writer; + int64_t last_modified_commit_identifier = std::numeric_limits::min(); + int32_t total_buckets = -1; + }; + + protected: + virtual Result> CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, + const std::shared_ptr& dv_maintainer) = 0; + + virtual Result> CreateFileStoreScan( + const std::shared_ptr& filter) const = 0; + + Result> ScanExistingFileMetas(const BinaryRow& partition, + int32_t bucket) const; + int32_t GetDefaultBucketNum() const; + + std::shared_ptr pool_; + std::shared_ptr executor_; + std::shared_ptr file_store_path_factory_; + std::shared_ptr snapshot_manager_; + std::shared_ptr schema_manager_; + std::string commit_user_; + std::string root_path_; + std::shared_ptr schema_; + std::shared_ptr write_schema_; + std::shared_ptr table_schema_; + std::shared_ptr partition_schema_; + std::shared_ptr dv_maintainer_factory_; + std::shared_ptr io_manager_; + std::shared_ptr cache_manager_; + std::unique_ptr writer_memory_manager_; + + CoreOptions options_; + std::shared_ptr compact_executor_; + std::shared_ptr compaction_metrics_; + + private: + Result> GetWriter(const BinaryRow& partition, int32_t bucket); + + private: + std::unordered_map>> + writers_; + bool ignore_previous_files_ = false; + bool is_streaming_mode_ = false; + bool ignore_num_bucket_check_ = false; + bool batch_committed_ = false; + + std::shared_ptr metrics_; + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_write.cpp b/src/paimon/core/operation/file_store_write.cpp new file mode 100644 index 0000000..110627b --- /dev/null +++ b/src/paimon/core/operation/file_store_write.cpp @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/file_store_write.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/manifest/index_manifest_file.h" +#include "paimon/core/mergetree/compact/lookup_merge_function.h" +#include "paimon/core/mergetree/compact/merge_function.h" +#include "paimon/core/mergetree/compact/reducer_merge_function_wrapper.h" +#include "paimon/core/operation/append_only_file_store_write.h" +#include "paimon/core/operation/key_value_file_store_write.h" +#include "paimon/core/options/merge_engine.h" +#include "paimon/core/postpone/postpone_bucket_file_store_write.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/primary_key_table_utils.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/format/file_format.h" +#include "paimon/result.h" +#include "paimon/write_context.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +struct KeyValue; +template +class MergeFunctionWrapper; + +Result> FileStoreWrite::Create(std::unique_ptr ctx) { + if (ctx == nullptr) { + return Status::Invalid("write context is null pointer"); + } + if (ctx->GetMemoryPool() == nullptr) { + return Status::Invalid("memory pool is null pointer"); + } + if (ctx->GetExecutor() == nullptr) { + return Status::Invalid("executor is null pointer"); + } + + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, + CoreOptions::FromMap(ctx->GetOptions(), ctx->GetSpecificFileSystem(), + ctx->GetFileSystemSchemeToIdentifierMap())); + std::string branch = ctx->GetBranch(); + auto schema_manager = + std::make_shared(tmp_options.GetFileSystem(), ctx->GetRootPath(), branch); + PAIMON_ASSIGN_OR_RAISE(std::optional> table_schema, + schema_manager->Latest()); + if (table_schema == std::nullopt) { + return Status::Invalid(fmt::format("cannot found latest schema in branch {}", branch)); + } + const auto& schema = table_schema.value(); + auto opts = schema->Options(); + for (const auto& [key, value] : ctx->GetOptions()) { + opts[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions options, + CoreOptions::FromMap(opts, ctx->GetSpecificFileSystem(), + ctx->GetFileSystemSchemeToIdentifierMap())); + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, schema->PartitionKeys())); + + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr file_store_path_factory, + FileStorePathFactory::Create(ctx->GetRootPath(), arrow_schema, schema->PartitionKeys(), + options.GetPartitionDefaultName(), + options.GetWriteFileFormat(/*level=*/0)->Identifier(), + options.DataFilePrefix(), options.LegacyPartitionNameEnabled(), + external_paths, global_index_external_path, + options.IndexFileInDataFileDir(), ctx->GetMemoryPool())); + auto snapshot_manager = + std::make_shared(options.GetFileSystem(), ctx->GetRootPath(), branch); + + std::shared_ptr io_manager; + const auto& io_temp_dir = ctx->GetTempDirectory(); + if (!io_temp_dir.empty()) { + io_manager = std::make_shared(io_temp_dir, options.GetFileSystem()); + } + + bool ignore_previous_files = ctx->IgnorePreviousFiles(); + if (schema->PrimaryKeys().empty()) { + // append table + bool need_dv_maintainer_factory = options.DeletionVectorsEnabled(); + if (options.GetBucket() == -1) { + need_dv_maintainer_factory = false; + ignore_previous_files = true; + } else if (options.GetBucket() <= 0) { + return Status::Invalid( + fmt::format("not support bucket {} in append table", options.GetBucket())); + } + std::shared_ptr write_schema = arrow_schema; + const auto& write_field_names = ctx->GetWriteSchema(); + if (!write_field_names.empty()) { + arrow::FieldVector write_fields; + write_fields.reserve(write_field_names.size()); + for (const auto& field_name : write_field_names) { + auto field = arrow_schema->GetFieldByName(field_name); + if (!field) { + // TODO(xinyu.lxy): support _ROW_ID and _SEQUENCE_NUMBER + return Status::Invalid( + fmt::format("write field {} does not exist in table schema", field_name)); + } + write_fields.push_back(field); + } + write_schema = arrow::schema(write_fields); + } + + std::shared_ptr dv_maintainer_factory; + if (need_dv_maintainer_factory) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr index_manifest_file, + IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), file_store_path_factory, + options.GetBucket(), ctx->GetMemoryPool(), options)); + auto index_file_handler = std::make_shared( + options.GetFileSystem(), std::move(index_manifest_file), + std::make_shared(file_store_path_factory), + options.DeletionVectorsBitmap64(), ctx->GetMemoryPool()); + dv_maintainer_factory = + std::make_shared(index_file_handler); + } + + return std::make_unique( + file_store_path_factory, snapshot_manager, schema_manager, ctx->GetCommitUser(), + ctx->GetRootPath(), schema, arrow_schema, write_schema, partition_schema, + dv_maintainer_factory, io_manager, options, ignore_previous_files, + ctx->IsStreamingMode(), ctx->IgnoreNumBucketCheck(), ctx->GetExecutor(), + ctx->GetMemoryPool()); + } else { + // pk table + if (options.GetBucket() == BucketModeDefine::POSTPONE_BUCKET) { + return PostponeBucketFileStoreWrite::Create( + snapshot_manager, schema_manager, ctx->GetCommitUser(), ctx->GetRootPath(), schema, + arrow_schema, partition_schema, io_manager, options, ctx->IsStreamingMode(), + ctx->IgnoreNumBucketCheck(), ctx->GetWriteId(), + ctx->GetFileSystemSchemeToIdentifierMap(), ctx->GetExecutor(), ctx->GetMemoryPool(), + ctx->GetSpecificFileSystem()); + } + if (options.GetBucket() <= 0) { + return Status::Invalid( + fmt::format("not support bucket {} in key value table", options.GetBucket())); + } + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_keys, + schema->TrimmedPrimaryKeys()); + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_key_fields, + schema->GetFields(trimmed_primary_keys)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr key_comparator, + FieldsComparator::Create(trimmed_primary_key_fields, + options.SequenceFieldSortOrderIsAscending())); + auto primary_keys = schema->PrimaryKeys(); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr merge_function, + PrimaryKeyTableUtils::CreateMergeFunction(arrow_schema, primary_keys, options)); + if (options.NeedLookup() && options.GetMergeEngine() != MergeEngine::FIRST_ROW) { + // don't wrap first row, it is already OK + merge_function = std::make_unique(std::move(merge_function)); + } + std::shared_ptr> merge_function_wrapper = + std::make_shared(std::move(merge_function)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr sequence_fields_comparator, + PrimaryKeyTableUtils::CreateSequenceFieldsComparator(schema->Fields(), options)); + + std::shared_ptr dv_maintainer_factory; + if (options.DeletionVectorsEnabled()) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr index_manifest_file, + IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), file_store_path_factory, + options.GetBucket(), ctx->GetMemoryPool(), options)); + auto index_file_handler = std::make_shared( + options.GetFileSystem(), std::move(index_manifest_file), + std::make_shared(file_store_path_factory), + options.DeletionVectorsBitmap64(), ctx->GetMemoryPool()); + dv_maintainer_factory = + std::make_shared(index_file_handler); + } + + return std::make_unique( + file_store_path_factory, snapshot_manager, schema_manager, ctx->GetCommitUser(), + ctx->GetRootPath(), schema, arrow_schema, partition_schema, dv_maintainer_factory, + io_manager, key_comparator, sequence_fields_comparator, merge_function_wrapper, options, + ignore_previous_files, ctx->IsStreamingMode(), ctx->IgnoreNumBucketCheck(), + ctx->EnableMultiThreadSpill(), ctx->GetExecutor(), ctx->GetMemoryPool()); + } +} + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_write_test.cpp b/src/paimon/core/operation/file_store_write_test.cpp new file mode 100644 index 0000000..44e73b3 --- /dev/null +++ b/src/paimon/core/operation/file_store_write_test.cpp @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/file_store_write.h" + +#include +#include +#include +#include + +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/operation/key_value_file_store_write.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +TEST(FileStoreWriteTest, TestCreateWithInvalidInput) { + auto dir = UniqueTestDirectory::Create(); + WriteContextBuilder builder(dir->Str(), "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx1, + builder.WithMemoryPool(nullptr).Finish()); + ASSERT_NOK(FileStoreWrite::Create(std::move(ctx1))); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx2, + builder.WithExecutor(nullptr).Finish()); + ASSERT_NOK(FileStoreWrite::Create(std::move(ctx2))); + ASSERT_NOK(FileStoreWrite::Create(/*context=*/nullptr)); +} + +TEST(FileStoreWriteTest, TestCreateAppendTable) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32())}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/true)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{"f0", "f3"}, /*primary_keys=*/{}, + /*options=*/{}, /*ignore_if_exists=*/false)); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_store_write, + FileStoreWrite::Create(std::move(write_context))); +} + +TEST(FileStoreWriteTest, TestCreateAppendTableWithInvalidBucket) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32())}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + std::map options; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/true)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{"f0", "f3"}, + /*primary_keys=*/{}, options, /*ignore_if_exists=*/false)); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + context_builder.AddOption(Options::BUCKET, "-2").Finish()); + ASSERT_NOK_WITH_MSG(FileStoreWrite::Create(std::move(write_context)), + "not support bucket -2 in append table"); +} + +TEST(FileStoreWriteTest, TestCreateAppendTableWithInvalidWriteType) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32())}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + std::map options = { + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + }; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/true)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*ignore_if_exists=*/false)); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + context_builder.AddOption(Options::BUCKET, "-1") + .WithWriteSchema({"field_non_exist"}) + .Finish()); + ASSERT_NOK_WITH_MSG(FileStoreWrite::Create(std::move(write_context)), + "write field field_non_exist does not exist in table schema"); +} + +TEST(FileStoreWriteTest, TestCreatePrimaryKeyTable) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32())}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/true)); + std::map options = {{Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f1"}}; + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{"f0"}, /*primary_keys=*/{"f0", "f1", "f4"}, + options, /*ignore_if_exists=*/false)); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + context_builder.AddOption(Options::BUCKET, "2").Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_store_write, + FileStoreWrite::Create(std::move(write_context))); + + auto fs = std::make_shared(); + SchemaManager schema_manager(fs, PathUtil::JoinPath(dir->Str(), "foo.db/bar")); + ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.Latest()); + ASSERT_TRUE(table_schema); + + ASSERT_OK_AND_ASSIGN(auto trimmed_pk, table_schema.value()->TrimmedPrimaryKeys()); + auto key_value_file_store_write = dynamic_cast(file_store_write.get()); + ASSERT_TRUE(key_value_file_store_write); +} + +TEST(FileStoreWriteTest, TestCreatePrimaryKeyTableWithInvalidBucket) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32())}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + std::map options; + options[Options::BUCKET] = "-1"; + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/true)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{"f0", "f3"}, + /*primary_keys=*/{"f1", "f4"}, options, + /*ignore_if_exists=*/false)); + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, context_builder.Finish()); + ASSERT_NOK_WITH_MSG(FileStoreWrite::Create(std::move(write_context)), + "not support bucket -1 in key value table"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/file_system_write_restore.h b/src/paimon/core/operation/file_system_write_restore.h new file mode 100644 index 0000000..c449188 --- /dev/null +++ b/src/paimon/core/operation/file_system_write_restore.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/core/core_options.h" +#include "paimon/core/index/index_file_handler.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/operation/write_restore.h" +#include "paimon/core/utils/snapshot_manager.h" + +namespace paimon { + +/// `WriteRestore` to restore files directly from file system. +class FileSystemWriteRestore : public WriteRestore { + public: + FileSystemWriteRestore(const std::shared_ptr& snapshot_manager, + std::unique_ptr&& scan, + const std::shared_ptr& index_file_handler) + : snapshot_manager_(snapshot_manager), + scan_(std::move(scan)), + index_file_handler_(index_file_handler) {} + + Result LatestCommittedIdentifier(const std::string& user) const override { + // TODO(yonghao.fyh): in java paimon is LatestSnapshotOfUserFromFileSystem + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot, + snapshot_manager_->LatestSnapshotOfUser(user)); + if (latest_snapshot) { + return latest_snapshot.value().CommitIdentifier(); + } + return std::numeric_limits::min(); + } + + Result> GetRestoreFiles( + const BinaryRow& partition, int32_t bucket, + bool scan_deletion_vectors_index) const override { + // TODO(yonghao.fyh): java paimon doesn't use snapshot_manager.LatestSnapshot() here, + // because they don't want to flood the catalog with high concurrency + PAIMON_ASSIGN_OR_RAISE(std::optional snapshot, + snapshot_manager_->LatestSnapshot()); + if (snapshot == std::nullopt) { + return RestoreFiles::Empty(); + } + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, + scan_->WithSnapshot(snapshot.value())->CreatePlan()); + std::vector entries = plan->Files(); + std::vector> restore_data_files; + PAIMON_ASSIGN_OR_RAISE(std::optional total_buckets, + WriteRestore::ExtractDataFiles(entries, &restore_data_files)); + + std::vector> deletion_vectors_index; + if (scan_deletion_vectors_index) { + PAIMON_ASSIGN_OR_RAISE( + deletion_vectors_index, + index_file_handler_->Scan( + snapshot.value(), std::string(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX), + partition, bucket)); + } + + return std::make_shared(snapshot, total_buckets, restore_data_files, + /*dynamic_bucket_index=*/nullptr, + deletion_vectors_index); + } + + private: + std::shared_ptr snapshot_manager_; + std::unique_ptr scan_; + std::shared_ptr index_file_handler_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/file_system_write_restore_test.cpp b/src/paimon/core/operation/file_system_write_restore_test.cpp new file mode 100644 index 0000000..3d9e11f --- /dev/null +++ b/src/paimon/core/operation/file_system_write_restore_test.cpp @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/operation/file_system_write_restore.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(FileSystemWriteRestoreTest, LatestCommittedIdentifierNoSnapshot) { + auto fs = std::make_shared(); + auto snapshot_manager = std::make_shared( + fs, paimon::test::GetDataDir() + "/orc/append_09.db/not_exist"); + + FileSystemWriteRestore restore(snapshot_manager, /*scan=*/nullptr, + /*index_file_handler=*/nullptr); + + ASSERT_OK_AND_ASSIGN(int64_t latest_identifier, + restore.LatestCommittedIdentifier("unknown_user")); + ASSERT_EQ(latest_identifier, std::numeric_limits::min()); +} + +TEST(FileSystemWriteRestoreTest, LatestCommittedIdentifierWithSnapshot) { + auto fs = std::make_shared(); + auto snapshot_manager = std::make_shared( + fs, paimon::test::GetDataDir() + "/orc/append_09.db/append_09"); + + FileSystemWriteRestore restore(snapshot_manager, /*scan=*/nullptr, + /*index_file_handler=*/nullptr); + + ASSERT_OK_AND_ASSIGN(int64_t latest_identifier, + restore.LatestCommittedIdentifier("b02e4322-9c5f-41e1-a560-c0156fdf7b9c")); + ASSERT_EQ(latest_identifier, std::numeric_limits::max()); +} + +TEST(FileSystemWriteRestoreTest, GetRestoreFilesReturnsEmptyWhenNoLatestSnapshot) { + auto fs = std::make_shared(); + auto snapshot_manager = std::make_shared( + fs, paimon::test::GetDataDir() + "/orc/append_09.db/not_exist"); + + FileSystemWriteRestore restore(snapshot_manager, /*scan=*/nullptr, + /*index_file_handler=*/nullptr); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr files, + restore.GetRestoreFiles(BinaryRow::EmptyRow(), /*bucket=*/0, + /*scan_deletion_vectors_index=*/true)); + ASSERT_FALSE(files->GetSnapshot().has_value()); + ASSERT_FALSE(files->TotalBuckets().has_value()); + ASSERT_TRUE(files->DataFiles().empty()); + ASSERT_TRUE(files->DeleteVectorsIndex().empty()); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/restore_files.h b/src/paimon/core/operation/restore_files.h new file mode 100644 index 0000000..ea5e35f --- /dev/null +++ b/src/paimon/core/operation/restore_files.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include + +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/snapshot.h" + +namespace paimon { + +/// Restored files with snapshot and total buckets. +class RestoreFiles { + public: + RestoreFiles() = default; + + RestoreFiles(const std::optional& snapshot, + const std::optional& total_buckets, + const std::vector>& data_files, + const std::shared_ptr& dynamic_bucket_index, + const std::vector>& delete_vectors_index) + : snapshot_(snapshot), + total_buckets_(total_buckets), + data_files_(data_files), + dynamic_bucket_index_(dynamic_bucket_index), + delete_vectors_index_(delete_vectors_index) {} + + std::optional GetSnapshot() const { + return snapshot_; + } + std::optional TotalBuckets() const { + return total_buckets_; + } + std::vector> DataFiles() const { + return data_files_; + } + std::shared_ptr DynamicBucketIndex() const { + return dynamic_bucket_index_; + } + std::vector> DeleteVectorsIndex() const { + return delete_vectors_index_; + } + + static std::shared_ptr Empty() { + return std::make_shared(); + } + + private: + std::optional snapshot_; + std::optional total_buckets_; + std::vector> data_files_; + std::shared_ptr dynamic_bucket_index_; + std::vector> delete_vectors_index_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/write_context.cpp b/src/paimon/core/operation/write_context.cpp new file mode 100644 index 0000000..dcb34fa --- /dev/null +++ b/src/paimon/core/operation/write_context.cpp @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/write_context.h" + +#include + +#include "arrow/util/thread_pool.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +WriteContext::WriteContext(const std::string& root_path, const std::string& commit_user, + bool is_streaming_mode, bool ignore_num_bucket_check, + bool ignore_previous_files, bool enable_multi_thread_spill, + const std::optional& write_id, const std::string& branch, + const std::vector& write_schema, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor, + const std::string& temp_directory, + const std::shared_ptr& specific_file_system, + const std::map& fs_scheme_to_identifier_map, + const std::map& options) + : root_path_(root_path), + commit_user_(commit_user), + branch_(branch), + is_streaming_mode_(is_streaming_mode), + ignore_num_bucket_check_(ignore_num_bucket_check), + ignore_previous_files_(ignore_previous_files), + enable_multi_thread_spill_(enable_multi_thread_spill), + write_id_(write_id), + write_schema_(write_schema), + memory_pool_(memory_pool), + executor_(executor), + temp_directory_(temp_directory), + specific_file_system_(specific_file_system), + fs_scheme_to_identifier_map_(fs_scheme_to_identifier_map), + options_(options) {} + +WriteContext::~WriteContext() = default; + +class WriteContextBuilder::Impl { + public: + friend class WriteContextBuilder; + + void Reset() { + write_id_ = std::nullopt; + is_streaming_mode_ = false; + ignore_num_bucket_check_ = false; + ignore_previous_files_ = false; + spill_thread_number_ = 0; + memory_pool_ = GetDefaultPool(); + executor_ = CreateDefaultExecutor(); + temp_directory_.clear(); + branch_ = BranchManager::DEFAULT_MAIN_BRANCH; + write_schema_.clear(); + fs_scheme_to_identifier_map_.clear(); + specific_file_system_.reset(); + options_.clear(); + } + + private: + std::string root_path_; + std::string commit_user_; + std::string branch_ = BranchManager::DEFAULT_MAIN_BRANCH; + std::optional write_id_; + bool is_streaming_mode_ = false; + bool ignore_num_bucket_check_ = false; + bool ignore_previous_files_ = false; + int32_t spill_thread_number_ = 0; + std::vector write_schema_; + std::shared_ptr memory_pool_ = GetDefaultPool(); + std::shared_ptr executor_ = CreateDefaultExecutor(); + std::string temp_directory_; + std::map fs_scheme_to_identifier_map_; + std::shared_ptr specific_file_system_; + std::map options_; +}; + +WriteContextBuilder::WriteContextBuilder(const std::string& root_path, + const std::string& commit_user) + : impl_(std::make_unique()) { + impl_->root_path_ = root_path; + impl_->commit_user_ = commit_user; +} + +WriteContextBuilder::~WriteContextBuilder() = default; + +WriteContextBuilder& WriteContextBuilder::AddOption(const std::string& key, + const std::string& value) { + impl_->options_[key] = value; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::SetOptions( + const std::map& opts) { + impl_->options_ = opts; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithStreamingMode(bool is_streaming_mode) { + impl_->is_streaming_mode_ = is_streaming_mode; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithIgnoreNumBucketCheck(bool ignore_num_bucket_check) { + impl_->ignore_num_bucket_check_ = ignore_num_bucket_check; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithMemoryPool( + const std::shared_ptr& memory_pool) { + impl_->memory_pool_ = memory_pool; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithIgnorePreviousFiles(bool ignore_previous_files) { + impl_->ignore_previous_files_ = ignore_previous_files; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithExecutor(const std::shared_ptr& executor) { + impl_->executor_ = executor; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithTempDirectory(const std::string& temp_dir) { + impl_->temp_directory_ = temp_dir; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithWriteId(int32_t write_id) { + impl_->write_id_ = write_id; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithBranch(const std::string& branch) { + impl_->branch_ = branch; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithWriteSchema( + const std::vector& write_schema) { + impl_->write_schema_ = write_schema; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithFileSystemSchemeToIdentifierMap( + const std::map& fs_scheme_to_identifier_map) { + impl_->fs_scheme_to_identifier_map_ = fs_scheme_to_identifier_map; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t thread_number) { + impl_->spill_thread_number_ = thread_number; + return *this; +} + +WriteContextBuilder& WriteContextBuilder::WithFileSystem( + const std::shared_ptr& file_system) { + impl_->specific_file_system_ = file_system; + return *this; +} + +Result> WriteContextBuilder::Finish() { + PAIMON_ASSIGN_OR_RAISE(impl_->root_path_, PathUtil::NormalizePath(impl_->root_path_)); + if (impl_->root_path_.empty()) { + return Status::Invalid("root path is empty"); + } + bool enable_multi_thread_spill = impl_->spill_thread_number_ > 0; + if (enable_multi_thread_spill) { + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::SetCpuThreadPoolCapacity(impl_->spill_thread_number_)); + } + auto ctx = std::make_unique( + impl_->root_path_, impl_->commit_user_, impl_->is_streaming_mode_, + impl_->ignore_num_bucket_check_, impl_->ignore_previous_files_, enable_multi_thread_spill, + impl_->write_id_, impl_->branch_, impl_->write_schema_, impl_->memory_pool_, + impl_->executor_, impl_->temp_directory_, impl_->specific_file_system_, + impl_->fs_scheme_to_identifier_map_, impl_->options_); + impl_->Reset(); + return ctx; +} + +} // namespace paimon diff --git a/src/paimon/core/operation/write_context_test.cpp b/src/paimon/core/operation/write_context_test.cpp new file mode 100644 index 0000000..14fba0d --- /dev/null +++ b/src/paimon/core/operation/write_context_test.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/write_context.h" + +#include "gtest/gtest.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/mock/mock_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(WriteContextTest, TestDefaultValue) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + ASSERT_EQ(ctx->GetRootPath(), "table_root_path"); + ASSERT_EQ(ctx->GetCommitUser(), "commit_user_1"); + ASSERT_FALSE(ctx->IsStreamingMode()); + ASSERT_FALSE(ctx->IgnoreNumBucketCheck()); + ASSERT_FALSE(ctx->IgnorePreviousFiles()); + ASSERT_FALSE(ctx->EnableMultiThreadSpill()); + ASSERT_EQ(ctx->GetWriteId(), std::nullopt); + ASSERT_EQ(ctx->GetBranch(), "main"); + ASSERT_TRUE(ctx->GetWriteSchema().empty()); + ASSERT_TRUE(ctx->GetMemoryPool()); + ASSERT_TRUE(ctx->GetExecutor()); + ASSERT_TRUE(ctx->GetTempDirectory().empty()); + ASSERT_TRUE(ctx->GetOptions().empty()); + ASSERT_TRUE(ctx->GetFileSystemSchemeToIdentifierMap().empty()); + ASSERT_FALSE(ctx->GetSpecificFileSystem()); +} + +TEST(WriteContextTest, TestSetContent) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + + auto memory_pool = GetDefaultPool(); + std::shared_ptr executor = CreateDefaultExecutor(); + auto file_system = std::make_shared(); + std::vector write_schema = {"f0", "f1"}; + std::map fs_scheme_to_identifier_map = {{"file", "local"}, + {"oss", "jindo"}}; + + ASSERT_OK_AND_ASSIGN(auto ctx, + builder.WithStreamingMode(true) + .WithIgnoreNumBucketCheck(true) + .WithIgnorePreviousFiles(true) + .WithMemoryPool(memory_pool) + .WithExecutor(executor) + .WithTempDirectory("/tmp/with-all") + .WithWriteId(123) + .WithBranch("test_branch") + .WithWriteSchema(write_schema) + .WithFileSystemSchemeToIdentifierMap(fs_scheme_to_identifier_map) + .WithFileSystem(file_system) + .AddOption("key", "value") + .Finish()); + + ASSERT_TRUE(ctx->IsStreamingMode()); + ASSERT_TRUE(ctx->IgnoreNumBucketCheck()); + ASSERT_TRUE(ctx->IgnorePreviousFiles()); + ASSERT_FALSE(ctx->EnableMultiThreadSpill()); + ASSERT_EQ(ctx->GetMemoryPool(), memory_pool); + ASSERT_EQ(ctx->GetExecutor(), executor); + ASSERT_EQ(ctx->GetTempDirectory(), "/tmp/with-all"); + ASSERT_EQ(ctx->GetWriteId(), 123); + ASSERT_EQ(ctx->GetBranch(), "test_branch"); + ASSERT_EQ(ctx->GetWriteSchema(), write_schema); + ASSERT_EQ(ctx->GetFileSystemSchemeToIdentifierMap(), fs_scheme_to_identifier_map); + ASSERT_EQ(ctx->GetSpecificFileSystem(), file_system); + std::map expected_options = {{"key", "value"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + +TEST(WriteContextTest, TestSetOptionsOverridesAddedOptions) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + builder.AddOption("old", "value"); + builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}}); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + std::map expected_options = {{"key1", "value1"}, {"key2", "value2"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + +TEST(WriteContextTest, TestSetWriteBufferSpillThreadNumber) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + builder.SetWriteBufferSpillThreadNumber(2); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + ASSERT_TRUE(ctx->EnableMultiThreadSpill()); +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/write_restore.cpp b/src/paimon/core/operation/write_restore.cpp new file mode 100644 index 0000000..98ee2ac --- /dev/null +++ b/src/paimon/core/operation/write_restore.cpp @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/operation/write_restore.h" + +#include +#include +#include + +namespace paimon { + +Result> WriteRestore::ExtractDataFiles( + const std::vector& entries, + std::vector>* data_files) { + std::optional total_buckets; + for (const auto& entry : entries) { + if (total_buckets.has_value() && total_buckets.value() != entry.TotalBuckets()) { + return Status::Invalid(fmt::format( + "Bucket data files has different total bucket number, {} vs {}, this should " + "be a bug.", + total_buckets.value(), entry.TotalBuckets())); + } + total_buckets = entry.TotalBuckets(); + data_files->push_back(entry.File()); + } + return total_buckets; +} + +} // namespace paimon diff --git a/src/paimon/core/operation/write_restore.h b/src/paimon/core/operation/write_restore.h new file mode 100644 index 0000000..94a8e6d --- /dev/null +++ b/src/paimon/core/operation/write_restore.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/result.h" + +namespace paimon { + +/// Restore for write to restore data files by partition and bucket from file system. +class WriteRestore { + public: + static Result> ExtractDataFiles( + const std::vector& entries, + std::vector>* data_files); + + virtual ~WriteRestore() = default; + + virtual Result LatestCommittedIdentifier(const std::string& user) const = 0; + + virtual Result> GetRestoreFiles( + const BinaryRow& partition, int32_t bucket, bool scan_delete_vectors_index) const = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/write_restore_test.cpp b/src/paimon/core/operation/write_restore_test.cpp new file mode 100644 index 0000000..35d994b --- /dev/null +++ b/src/paimon/core/operation/write_restore_test.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/operation/write_restore.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/manifest/file_kind.h" + +namespace paimon::test { + +namespace { + +std::shared_ptr CreateDataFileMeta(const std::string& file_name) { + return std::make_shared( + file_name, /*file_size=*/128, /*row_count=*/10, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/1, + /*level=*/DataFileMeta::DUMMY_LEVEL, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), /*delete_row_count=*/std::nullopt, + /*embedded_index=*/nullptr, /*file_source=*/std::nullopt, + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); +} + +ManifestEntry CreateManifestEntry(int32_t total_buckets, const std::string& file_name) { + return ManifestEntry(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, total_buckets, + CreateDataFileMeta(file_name)); +} + +} // namespace + +TEST(WriteRestoreTest, ExtractDataFilesEmptyEntries) { + std::vector entries; + std::vector> data_files; + + auto result = WriteRestore::ExtractDataFiles(entries, &data_files); + + ASSERT_TRUE(result.ok()) << result.status().ToString(); + ASSERT_FALSE(result.value().has_value()); + ASSERT_TRUE(data_files.empty()); +} + +TEST(WriteRestoreTest, ExtractDataFilesConsistentTotalBuckets) { + std::vector entries = { + CreateManifestEntry(/*total_buckets=*/4, "file-1.parquet"), + CreateManifestEntry(/*total_buckets=*/4, "file-2.parquet"), + CreateManifestEntry(/*total_buckets=*/4, "file-3.parquet")}; + std::vector> data_files; + + auto result = WriteRestore::ExtractDataFiles(entries, &data_files); + + ASSERT_TRUE(result.ok()) << result.status().ToString(); + ASSERT_TRUE(result.value().has_value()); + ASSERT_EQ(result.value().value(), 4); + ASSERT_EQ(data_files.size(), 3); + ASSERT_EQ(data_files[0]->file_name, "file-1.parquet"); + ASSERT_EQ(data_files[1]->file_name, "file-2.parquet"); + ASSERT_EQ(data_files[2]->file_name, "file-3.parquet"); +} + +TEST(WriteRestoreTest, ExtractDataFilesInconsistentTotalBuckets) { + std::vector entries = { + CreateManifestEntry(/*total_buckets=*/2, "file-1.parquet"), + CreateManifestEntry(/*total_buckets=*/3, "file-2.parquet")}; + std::vector> data_files; + + auto result = WriteRestore::ExtractDataFiles(entries, &data_files); + + ASSERT_FALSE(result.ok()); + ASSERT_NE(result.status().ToString().find("different total bucket number"), std::string::npos); + ASSERT_EQ(data_files.size(), 1); + ASSERT_EQ(data_files[0]->file_name, "file-1.parquet"); +} + +} // namespace paimon::test