Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions include/paimon/commit_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <map>
#include <memory>
#include <string>

#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

namespace paimon {
class Executor;
class MemoryPool;

/// `CommitContext` is some configuration for commit operations.
///
/// Please do not use this class directly, use `CommitContextBuilder` to build a `CommitContext`
/// which has input validation.
/// @see CommitContextBuilder
class PAIMON_EXPORT CommitContext {
public:
CommitContext(const std::string& root_path, const std::string& commit_user,
bool ignore_empty_commit, bool use_rest_catalog_commit,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::map<std::string, std::string>& options);
~CommitContext();

const std::string& GetRootPath() const {
return root_path_;
}

const std::string& GetCommitUser() const {
return commit_user_;
}

bool IgnoreEmptyCommit() const {
return ignore_empty_commit_;
}

bool UseRESTCatalogCommit() const {
return use_rest_catalog_commit_;
}

std::shared_ptr<MemoryPool> GetMemoryPool() const {
return memory_pool_;
}

std::shared_ptr<Executor> GetExecutor() const {
return executor_;
}

std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
return specific_file_system_;
}

const std::map<std::string, std::string>& GetOptions() const {
return options_;
}

private:
std::string root_path_;
std::string commit_user_;
bool ignore_empty_commit_;
bool use_rest_catalog_commit_;
std::shared_ptr<MemoryPool> memory_pool_;
std::shared_ptr<Executor> executor_;
std::shared_ptr<FileSystem> specific_file_system_;
std::map<std::string, std::string> options_;
};

/// `CommitContextBuilder` used to build a `CommitContext`, has input validation.
class PAIMON_EXPORT CommitContextBuilder {
public:
/// Constructs a `CommitContextBuilder` with required parameters.
/// @param root_path The root path of the Paimon table.
/// @param commit_user The user identifier for the commit operation.
CommitContextBuilder(const std::string& root_path, const std::string& commit_user);

~CommitContextBuilder();

/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
/// @note The options map will clear the options added by `AddOption()` before.
/// @param options The configuration options map.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& SetOptions(const std::map<std::string, std::string>& options);

/// Add a single configuration option which is not defined in the table schema or whose value
/// you want to overwrite.
///
/// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()`
/// instead.
/// @param key The option key.
/// @param value The option value.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& AddOption(const std::string& key, const std::string& value);

/// Sets whether to ignore empty commits (default is true).
/// When set to true, commits that don't contain any actual data changes will be ignored.
/// @param ignore_empty_commit True to ignore empty commits, false otherwise.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& IgnoreEmptyCommit(bool ignore_empty_commit);

/// Sets whether to use REST catalog commit (default is false).
/// @note Temporary interface, will be removed in the future.
/// @param use_rest_catalog_commit True to use REST catalog commit, false otherwise.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& UseRESTCatalogCommit(bool use_rest_catalog_commit);

/// Sets the memory pool to be used for memory allocation during commit operations.
/// @param memory_pool Shared pointer to the memory pool instance.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>& memory_pool);

/// Sets the executor to be used for asynchronous operations during commit.
/// @param executor Shared pointer to the executor instance.
/// @return Reference to this builder for method chaining.
CommitContextBuilder& WithExecutor(const std::shared_ptr<Executor>& executor);

/// Sets a custom file system instance to be used for all file operations in this commit
/// context.
/// This bypasses the global file system registry and uses the provided implementation directly.
///
/// @param file_system The file system to use.
/// @return Reference to this builder for method chaining.
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`)
CommitContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);

/// Build and return a `CommitContext` instance with input validation.
/// @return Result containing the constructed `CommitContext` or an error status.
Result<std::unique_ptr<CommitContext>> Finish();

private:
class Impl;

std::unique_ptr<Impl> impl_;
};

} // namespace paimon
147 changes: 147 additions & 0 deletions include/paimon/file_store_commit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "paimon/defs.h"
#include "paimon/executor.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/metrics.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

namespace paimon {
class CommitContext;
class CommitMessage;

/// Interface for commit operations in a file store.
///
/// The `FileStoreCommit` class provides interfaces for committing changes, expiring old snapshots,
/// dropping partitions, and retrieving commit metrics.
class PAIMON_EXPORT FileStoreCommit {
public:
/// Create an instance of `FileStoreCommit`.
///
/// @param context A unique pointer to the `CommitContext` used for commit operations.
///
/// @return A Result containing a unique pointer to the `FileStoreCommit` instance.
static Result<std::unique_ptr<FileStoreCommit>> Create(std::unique_ptr<CommitContext> context);

virtual ~FileStoreCommit() = default;

/// Commit changes to the file store.
///
/// @param commit_messages A vector of commit messages to be committed.
/// @param commit_identifier An optional identifier for the commit operation. Default is
/// `BATCH_WRITE_COMMIT_IDENTIFIER`.
/// @param watermark An optional event-time watermark used to indicate the progress of data
/// processing. Default is std::nullopt.
/// @return Status indicating the success or failure of the commit operation.
virtual Status Commit(const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
int64_t commit_identifier = BATCH_WRITE_COMMIT_IDENTIFIER,
std::optional<int64_t> watermark = std::nullopt) = 0;

/// Filter out all `std::vector<CommitMessage>` which have been committed and commit the
/// remaining ones.
///
/// Compared to commit, this method will first check if a commit_identifier has been
/// committed, so this method might be slower. A common usage of this method is to retry the
/// commit process after a failure.
///
/// @param commit_identifier_and_messages A map containing all {@link CommitMessage}s in
/// question. The key is the commit_identifier.
///
/// @param watermark An optional event-time watermark used to indicate the progress of data
/// processing. Default is std::nullopt.
/// @return Number of `std::vector<CommitMessage>` committed.
virtual Result<int32_t> FilterAndCommit(
const std::map<int64_t, std::vector<std::shared_ptr<CommitMessage>>>&
commit_identifier_and_messages,
std::optional<int64_t> watermark = std::nullopt) = 0;

/// Overwrite from manifest committable and partition.
///
/// @param partitions A single partition maps each partition key to a partition value. Depending
/// on the user-defined statement, the partition might not include all partition keys. Also
/// note that this partition does not necessarily equal to the partitions of the newly added
/// key-values. This is just the partition to be cleaned up.
/// @param commit_messages Description of the commit messages.
/// @param commit_identifier Unique identifier.
/// @param watermark An optional event-time watermark used to indicate the progress of data
/// processing. Default is std::nullopt.
/// @return Result of the operation.
virtual Status Overwrite(const std::vector<std::map<std::string, std::string>>& partitions,
const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
int64_t commit_identifier,
std::optional<int64_t> watermark = std::nullopt) = 0;

/// This is a temporary interface for internal use. It will be removed in a future version.
/// Please do not rely on it for long-term use.
///
/// @param partitions Description of the partitions.
/// @param commit_messages Description of the commit messages.
/// @param commit_identifier Unique identifier.
/// @param watermark An optional event-time watermark used to indicate the progress of data
/// processing. Default is std::nullopt.
/// @return Result of the operation.
virtual Result<int32_t> FilterAndOverwrite(
const std::vector<std::map<std::string, std::string>>& partitions,
const std::vector<std::shared_ptr<CommitMessage>>& commit_messages,
int64_t commit_identifier, std::optional<int64_t> watermark = std::nullopt) = 0;

/// If user want to use REST catalog commit, please set
/// `CommitContextBuilder::UseRESTCatalogCommit()`, then call `Commit()` (or
/// `FilterAndCommit()`) normally, then call this method to get the last commit table request,
/// which is a JSON string that can be used to send to REST catalog server.
///
/// @note Temporary interface for internal use, will be removed in the future.
///
/// @return A Result containing a JSON string which including `snapshot` and `statistics`, but
/// excluding `tableId`.
virtual Result<std::string> GetLastCommitTableRequest() = 0;

/// Expire old snapshot in the file store.
///
/// @return Result<int32_t> indicating the number of expired items or an error status.
virtual Result<int32_t> Expire() = 0;

/// Drop specified partitions from the file store.
///
/// @param partitions A vector of partitions to be dropped.
/// @param commit_identifier An identifier for the commit operation.
/// @return Status indicating the success or failure of the drop partition operation.
virtual Status DropPartition(const std::vector<std::map<std::string, std::string>>& partitions,
int64_t commit_identifier) = 0;

/// Retrieve metrics related to commit operations.
///
/// @return A shared pointer to a `Metrics` object containing commit metrics.
virtual std::shared_ptr<Metrics> GetCommitMetrics() const = 0;
};

} // namespace paimon
Loading