Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions include/paimon/file_store_write.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#pragma once

#include <cstdint>
#include <map>
#include <memory>
#include <vector>

#include "paimon/commit_message.h"
#include "paimon/defs.h"
#include "paimon/executor.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/metrics.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

namespace paimon {
class RecordBatch;
class WriteContext;

/// Interface for write operations in a file store.
class PAIMON_EXPORT FileStoreWrite {
public:
/// Create an instance of `FileStoreWrite`.
///
/// @param context A unique pointer to the `WriteContext` used for write operations.
///
/// @return A Result containing a unique pointer to the `FileStoreWrite` instance.
static Result<std::unique_ptr<FileStoreWrite>> Create(std::unique_ptr<WriteContext> context);

virtual ~FileStoreWrite() = default;

/// Support write an input `RecordBatch` to internal buffer or file.
/// @note If a field in table schema is marked as non-nullable (`nullable = false`),
/// the corresponding array in `batch` must have zero null entries.
virtual Status Write(std::unique_ptr<RecordBatch>&& batch) = 0;

/// Compact data stored in given partition and bucket. Note that compaction process is only
/// submitted and may not be completed when the method returns.
///
/// @param partition the partition to compact
/// @param bucket the bucket to compact
/// @param full_compaction whether to trigger full compaction or just normal compaction
///
/// @return status for compacting the records
virtual Status Compact(const std::map<std::string, std::string>& partition, int32_t bucket,
bool full_compaction) = 0;

/// Generate a list of commit messages with the latest generated data file meta
/// information of the current snapshot.
///
/// When we need commit, call PrepareCommit to get the current {@link CommitMessage}s with the
/// latest generated data file meta information of the current snapshot.
///
/// This function is designed to be called when a commit is required. Depending on the writing
/// scenario, the behavior will differ:
///
/// - For batch write, simply call `PrepareCommit()` without any parameters.
/// - For streaming write, you need to provide both parameters:
/// `PrepareCommit(bool wait_compaction, int64_t commit_identifier)`.
///
/// @param wait_compaction Indicates whether to wait for any ongoing compaction process to
/// complete.
/// @param commit_identifier A unique identifier for the commit operation. This parameter is
/// only relevant in streaming write scenarios.
///
/// @return A Result containing `std::vector<std::shared_ptr<CommitMessage>>` objects,
/// representing the generated commit messages.
virtual Result<std::vector<std::shared_ptr<CommitMessage>>> PrepareCommit(
bool wait_compaction = true, int64_t commit_identifier = BATCH_WRITE_COMMIT_IDENTIFIER) = 0;
virtual std::shared_ptr<Metrics> GetMetrics() const = 0;
virtual Status Close() = 0;
};

} // namespace paimon
250 changes: 250 additions & 0 deletions include/paimon/write_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#pragma once

#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

namespace paimon {
class Executor;
class MemoryPool;

/// `WriteContext` is some configuration for write operations.
///
/// Please do not use this class directly, use `WriteContextBuilder` to build a `WriteContext` which
/// has input validation.
/// @see WriteContextBuilder
class PAIMON_EXPORT WriteContext {
public:
WriteContext(const std::string& root_path, const std::string& commit_user,
bool is_streaming_mode, bool ignore_num_bucket_check, bool ignore_previous_files,
bool enable_multi_thread_spill, const std::optional<int32_t>& write_id,
const std::string& branch, const std::vector<std::string>& write_schema,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor, const std::string& temp_directory,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
const std::map<std::string, std::string>& options);

~WriteContext();

const std::string& GetRootPath() const {
return root_path_;
}

const std::string& GetCommitUser() const {
return commit_user_;
}

const std::map<std::string, std::string>& GetFileSystemSchemeToIdentifierMap() const {
return fs_scheme_to_identifier_map_;
}

const std::map<std::string, std::string>& GetOptions() const {
return options_;
}

bool IsStreamingMode() const {
return is_streaming_mode_;
}

bool IgnoreNumBucketCheck() const {
return ignore_num_bucket_check_;
}

bool IgnorePreviousFiles() const {
return ignore_previous_files_;
}

const std::optional<int32_t> GetWriteId() const {
return write_id_;
}

const std::string& GetBranch() const {
return branch_;
}

const std::vector<std::string>& GetWriteSchema() const {
return write_schema_;
}

std::shared_ptr<MemoryPool> GetMemoryPool() const {
return memory_pool_;
}

std::shared_ptr<Executor> GetExecutor() const {
return executor_;
}

const std::string& GetTempDirectory() const {
return temp_directory_;
}

std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
return specific_file_system_;
}

bool EnableMultiThreadSpill() const {
return enable_multi_thread_spill_;
}

private:
std::string root_path_;
std::string commit_user_;
std::string branch_;
bool is_streaming_mode_;
bool ignore_num_bucket_check_;
bool ignore_previous_files_;
bool enable_multi_thread_spill_;
std::optional<int32_t> write_id_;
std::vector<std::string> write_schema_;
std::shared_ptr<MemoryPool> memory_pool_;
std::shared_ptr<Executor> executor_;
std::string temp_directory_;
std::shared_ptr<FileSystem> specific_file_system_;
std::map<std::string, std::string> fs_scheme_to_identifier_map_;
std::map<std::string, std::string> options_;
};

/// `WriteContextBuilder` used to build a `WriteContext`, has input validation.
class PAIMON_EXPORT WriteContextBuilder {
public:
/// Constructs a `WriteContextBuilder` with required parameters.
/// @param root_path The root path of the table.
/// @param commit_user The user identifier for commit operations.
WriteContextBuilder(const std::string& root_path, const std::string& commit_user);

~WriteContextBuilder();

/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
/// @note The options map will clear the options added by `AddOption()` before.
/// @param options The configuration options map.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& SetOptions(const std::map<std::string, std::string>& options);

/// Add a single configuration option which is not defined in the table schema or whose value
/// you want to overwrite.
///
/// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()`
/// instead.
/// @param key The option key.
/// @param value The option value.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& AddOption(const std::string& key, const std::string& value);

/// Set whether to enable streaming mode (default is false)
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithStreamingMode(bool is_streaming_mode);

/// Set whether to skip num-bucket consistency check (default is false)
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithIgnoreNumBucketCheck(bool ignore_num_bucket_check);

/// Set whether the write operation should ignore previously stored files. (default is false)
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithIgnorePreviousFiles(bool ignore_previous_files);

/// Set custom memory pool for memory management.
/// @param memory_pool The memory pool to use.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>& memory_pool);

/// Set custom executor for task execution.
/// @param executor The executor to use.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithExecutor(const std::shared_ptr<Executor>& executor);

/// Set the temporary directory path for IO operations (lookup and external disk spill).
/// @param temp_dir The temporary directory path.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithTempDirectory(const std::string& temp_dir);

/// For postpone bucket mode in pk table, `WithWriteId()` supposed to be used.
///
/// Each worker must have its own unique `write_id` within a task, which is used as the prefix
/// for its data files. This ensures that files from the same worker share the same prefix and
/// can be consumed by the same compaction reader to preserve input order.
///
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithWriteId(int32_t write_id);

/// Write to specific branch, default is main.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithBranch(const std::string& branch);

/// For data evolution, user can write partial specific fields from table schema.
/// If not set, write all fields in table.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& WithWriteSchema(const std::vector<std::string>& write_schema);

/// Sets a custom file system instance to be used for all file operations in this write context.
/// This bypasses the global file system registry and uses the provided implementation directly.
///
/// @param file_system The file system to use.
/// @return Reference to this builder for method chaining.
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`)
WriteContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);

/// Sets a mapping from URI schemes (e.g., "file", "oss") to registered file system
/// identifiers. This allows selecting different pre-registered file system implementations
/// based on the URI scheme at runtime.
///
/// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to the corresponding
/// file system identifier.
/// @return Reference to this builder for method chaining.
/// @note
/// - This method is intended for environments where multiple file systems are pre-registered.
/// - The specified identifiers must correspond to file systems that have been registered at
/// compile time or initialization.
/// - Cannot be used together with `WithFileSystem()`.
/// - If not set, use default file system (configured in `Options::FILE_SYSTEM`).
/// Example:
/// builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"}, {"file", "local"}});
///
WriteContextBuilder& WithFileSystemSchemeToIdentifierMap(
const std::map<std::string, std::string>& fs_scheme_to_identifier_map);

/// Set the thread number for write buffer spill operations. (default is 0)
/// If <= 0, threading is disabled for spill IPC read/write.
/// If > 0, sets arrow CPU thread pool capacity for spill operations.
/// @param thread_number The thread number to use for spill operations.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& SetWriteBufferSpillThreadNumber(int32_t thread_number);

/// Build and return a `WriteContext` instance with input validation.
/// @return Result containing the constructed `WriteContext` or an error status.
Result<std::unique_ptr<WriteContext>> Finish();

private:
class Impl;

std::unique_ptr<Impl> impl_;
};

} // namespace paimon
Loading