From 8364c73e88b35b6f1b9626e34a4fd7a84690f228 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Thu, 18 Jun 2026 11:06:35 +0800 Subject: [PATCH] feat: introduce table scan, split and plan --- include/paimon/table/source/data_split.h | 84 +++++ include/paimon/table/source/plan.h | 38 ++ include/paimon/table/source/split.h | 69 ++++ include/paimon/table/source/startup_mode.h | 71 ++++ include/paimon/table/source/table_read.h | 79 ++++ include/paimon/table/source/table_scan.h | 48 +++ .../core/table/source/abstract_table_scan.h | 125 +++++++ .../core/table/source/data_split_impl.cpp | 167 +++++++++ .../core/table/source/data_split_impl.h | 195 ++++++++++ src/paimon/core/table/source/deletion_file.h | 152 ++++++++ .../core/table/source/deletion_file_test.cpp | 64 ++++ src/paimon/core/table/source/plan_impl.cpp | 30 ++ src/paimon/core/table/source/plan_impl.h | 52 +++ src/paimon/core/table/source/scan_mode.h | 35 ++ src/paimon/core/table/source/split.cpp | 241 ++++++++++++ .../core/table/source/split_generator.h | 76 ++++ .../table/source/split_generator_test.cpp | 351 ++++++++++++++++++ src/paimon/core/table/source/startup_mode.cpp | 81 ++++ .../core/table/source/startup_mode_test.cpp | 44 +++ src/paimon/core/table/source/table_scan.cpp | 282 ++++++++++++++ .../core/table/source/table_scan_test.cpp | 63 ++++ 21 files changed, 2347 insertions(+) create mode 100644 include/paimon/table/source/data_split.h create mode 100644 include/paimon/table/source/plan.h create mode 100644 include/paimon/table/source/split.h create mode 100644 include/paimon/table/source/startup_mode.h create mode 100644 include/paimon/table/source/table_read.h create mode 100644 include/paimon/table/source/table_scan.h create mode 100644 src/paimon/core/table/source/abstract_table_scan.h create mode 100644 src/paimon/core/table/source/data_split_impl.cpp create mode 100644 src/paimon/core/table/source/data_split_impl.h create mode 100644 src/paimon/core/table/source/deletion_file.h create mode 100644 src/paimon/core/table/source/deletion_file_test.cpp create mode 100644 src/paimon/core/table/source/plan_impl.cpp create mode 100644 src/paimon/core/table/source/plan_impl.h create mode 100644 src/paimon/core/table/source/scan_mode.h create mode 100644 src/paimon/core/table/source/split.cpp create mode 100644 src/paimon/core/table/source/split_generator.h create mode 100644 src/paimon/core/table/source/split_generator_test.cpp create mode 100644 src/paimon/core/table/source/startup_mode.cpp create mode 100644 src/paimon/core/table/source/startup_mode_test.cpp create mode 100644 src/paimon/core/table/source/table_scan.cpp create mode 100644 src/paimon/core/table/source/table_scan_test.cpp diff --git a/include/paimon/table/source/data_split.h b/include/paimon/table/source/data_split.h new file mode 100644 index 0000000..865e785 --- /dev/null +++ b/include/paimon/table/source/data_split.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/data/timestamp.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/table/source/split.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// Input data split for reading operation. Needed by most batch computation engines. +class PAIMON_EXPORT DataSplit : public Split { + public: + /// Metadata structure for simple data files. + /// + /// Contains essential information about a data file including its location, + /// size, row count, sequence numbers, schema information, and timestamps. + /// This structure is used to track file metadata without loading the actual file content. + struct SimpleDataFileMeta { + SimpleDataFileMeta(const std::string& _file_path, int64_t _file_size, int64_t _row_count, + int64_t _min_sequence_number, int64_t _max_sequence_number, + int64_t _schema_id, int32_t _level, const Timestamp& _creation_time, + const std::optional& _delete_row_count) + : file_path(_file_path), + file_size(_file_size), + row_count(_row_count), + min_sequence_number(_min_sequence_number), + max_sequence_number(_max_sequence_number), + schema_id(_schema_id), + level(_level), + creation_time(_creation_time), + delete_row_count(_delete_row_count) {} + + /// Absolute path of the data file. + /// + /// If external path is enabled, `file_path` indicates the actual location in the external + /// storage system. + std::string file_path; + int64_t file_size; + int64_t row_count; + int64_t min_sequence_number; + int64_t max_sequence_number; + int64_t schema_id; + int32_t level; + Timestamp creation_time; + std::optional delete_row_count; + + bool operator==(const SimpleDataFileMeta& other) const; + + std::string ToString() const; + }; + + /// Get the list of metadata for all data files in this split. + /// @note This method will be removed in future versions and is only used for append tables. + virtual std::vector GetFileList() const = 0; +}; +} // namespace paimon diff --git a/include/paimon/table/source/plan.h b/include/paimon/table/source/plan.h new file mode 100644 index 0000000..1c71743 --- /dev/null +++ b/include/paimon/table/source/plan.h @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/table/source/split.h" + +namespace paimon { +/// %Result plan of this `TableScan`. +class PAIMON_EXPORT Plan { + public: + virtual ~Plan() = default; + /// %Result splits. + virtual const std::vector>& Splits() const = 0; + /// Snapshot id of this plan, return `std::nullopt` if the table is empty. + virtual std::optional SnapshotId() const = 0; +}; +} // namespace paimon diff --git a/include/paimon/table/source/split.h b/include/paimon/table/source/split.h new file mode 100644 index 0000000..a968f02 --- /dev/null +++ b/include/paimon/table/source/split.h @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// An input split for reading operation. Needed by most batch computation engines. Support +/// Serialize and Deserialize, compatible with java version. +/// This split can be either a `DataSplit` (for direct data file reads) or an `IndexedSplit` +/// (for reads leveraging global indexes). +class PAIMON_EXPORT Split { + public: + virtual ~Split() = default; + + /// Deserialize a `Split` from a binary buffer. + /// + /// Creates a `Split` instance from its serialized binary representation. + /// This is typically used in distributed computing scenarios where splits + /// are transmitted between different nodes or processes. + /// + /// @param buffer Const pointer to the binary data containing the serialized `Split`. + /// @param length Size of the buffer in bytes. + /// @param pool Memory pool for allocating objects during deserialization. + /// @return Result containing the deserialized `Split` or an error status. + static Result> Deserialize(const char* buffer, size_t length, + const std::shared_ptr& pool); + + /// Serialize a `Split` to a binary string. + /// + /// Converts a `Split` instance to its binary representation for storage + /// or transmission. The serialized data can later be deserialized using + /// the Deserialize method. + /// + /// @param split The `Split` instance to serialize. + /// @param pool Memory pool for allocating temporary objects during serialization. + /// @return Result containing the serialized binary data as a string or an error status. + static Result Serialize(const std::shared_ptr& split, + const std::shared_ptr& pool); +}; +} // namespace paimon diff --git a/include/paimon/table/source/startup_mode.h b/include/paimon/table/source/startup_mode.h new file mode 100644 index 0000000..e43c243 --- /dev/null +++ b/include/paimon/table/source/startup_mode.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include + +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { +/// Specifies the startup mode for log consumer. +class PAIMON_EXPORT StartupMode { + public: + /// Determines actual startup mode according to other table properties. If "scan.snapshot-id" is + /// set the actual startup mode will be "from-snapshot", otherwise the actual startup mode will + /// be "latest-full". + static const StartupMode Default(); + + /// For streaming sources, produces the latest snapshot on the table upon first startup, and + /// continue to read the latest changes. For batch sources, just produce the latest snapshot but + /// does not read new changes. + static const StartupMode LatestFull(); + + /// For streaming sources, continuously reads latest changes without producing a snapshot at the + /// beginning. For batch sources, behaves the same as the "latest-full" startup mode. + static const StartupMode Latest(); + + /// For streaming sources, continuously reads changes starting from snapshot specified by + /// "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, + /// produces a snapshot specified by "scan.snapshot-id" but does not read new changes. + static const StartupMode FromSnapshot(); + + /// For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table + /// upon first startup, and continuously reads changes. For batch sources, produces a snapshot + /// specified by "scan.snapshot-id" but does not read new changes + static const StartupMode FromSnapshotFull(); + + /// Starts from a timestamp specified by either "scan.timestamp-millis" or + /// "scan.timestamp". For batch sources, produces the latest snapshot whose + /// timestamp is <= the specified timestamp. For streaming sources, continuously + /// reads changes starting from the first snapshot at or after the timestamp. + static const StartupMode FromTimestamp(); + + public: + std::string ToString() const; + bool operator==(const StartupMode& other) const; + static Result FromString(const std::string& str); + + private: + explicit StartupMode(const std::string& value) : value_(value) {} + + private: + std::string value_; +}; +} // namespace paimon diff --git a/include/paimon/table/source/table_read.h b/include/paimon/table/source/table_read.h new file mode 100644 index 0000000..157561f --- /dev/null +++ b/include/paimon/table/source/table_read.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/read_context.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/table/source/split.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; +class ReadContext; + +/// Given a `Split` or a list of `Split`, generate a reader for batch reading. +class PAIMON_EXPORT TableRead { + public: + virtual ~TableRead() = default; + + /// Create an instance of `TableRead`. + /// + /// @param context A unique pointer to the `ReadContext` used for read operations. + /// @return A Result containing a unique pointer to the `TableRead` instance. + static Result> Create(std::unique_ptr context); + + /// Creates a `BatchReader` instance for reading data. + /// + /// This method creates a BatchReader that will be responsible for reading data from the + /// provided splits. + /// + /// @param splits A vector of shared pointers to `Split` instances representing the + /// data to be read. + /// @return A Result containing a unique pointer to the `BatchReader` instance. + /// @note `BatchReader`s created by the same `TableRead` are not thread-safe for + /// concurrent reading. + virtual Result> CreateReader( + const std::vector>& splits); + + /// Creates a `BatchReader` instance for a single split. + /// + /// @param split A shared pointer to the `Split` instance that defines the data to be + /// read. + /// @return A Result containing a unique pointer to the `BatchReader` instance. + virtual Result> CreateReader( + const std::shared_ptr& split) = 0; + + protected: + explicit TableRead(const std::shared_ptr& memory_pool); + + std::shared_ptr GetMemoryPool() const { + return pool_; + } + + private: + std::shared_ptr pool_; +}; +} // namespace paimon diff --git a/include/paimon/table/source/table_scan.h b/include/paimon/table/source/table_scan.h new file mode 100644 index 0000000..c9b4291 --- /dev/null +++ b/include/paimon/table/source/table_scan.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/result.h" +#include "paimon/table/source/plan.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class ScanContext; + +/// A scanner interface for reading table's meta and create a plan. +class PAIMON_EXPORT TableScan { + public: + /// Create an instance of `TableScan`. + /// + /// @param context A unique pointer to the `ScanContext` used for scan operations. + /// @return A Result containing a unique pointer to the `TableScan` instance. + static Result> Create(std::unique_ptr context); + + virtual ~TableScan() = default; + + /// Create a scan plan. + /// + /// @return A Result containing a shared pointer to the created `Plan` or an error status. + virtual Result> CreatePlan() = 0; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/abstract_table_scan.h b/src/paimon/core/table/source/abstract_table_scan.h new file mode 100644 index 0000000..363fe0d --- /dev/null +++ b/src/paimon/core/table/source/abstract_table_scan.h @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/core_options.h" +#include "paimon/core/table/source/snapshot/continuous_from_snapshot_full_starting_scanner.h" +#include "paimon/core/table/source/snapshot/continuous_from_snapshot_starting_scanner.h" +#include "paimon/core/table/source/snapshot/continuous_latest_starting_scanner.h" +#include "paimon/core/table/source/snapshot/full_starting_scanner.h" +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +#include "paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h" +#include "paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h" +#include "paimon/table/source/startup_mode.h" +#include "paimon/table/source/table_scan.h" +namespace paimon { +/// An abstraction layer above `FileStoreScan` to provide input split generation. +class AbstractTableScan : public TableScan { + public: + AbstractTableScan(const CoreOptions& core_options, + const std::shared_ptr& snapshot_reader) + : core_options_(core_options), snapshot_reader_(snapshot_reader) {} + + protected: + Result> CreateStartingScanner(bool is_streaming) const { + const auto& snapshot_manager = snapshot_reader_->GetSnapshotManager(); + auto startup_mode = core_options_.GetStartupMode(); + std::optional specified_snapshot_id = core_options_.GetScanSnapshotId(); + if (startup_mode == StartupMode::LatestFull()) { + return std::make_shared(snapshot_manager); + } else if (startup_mode == StartupMode::Latest()) { + if (is_streaming) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr starting_scanner, + ContinuousLatestStartingScanner::Create(snapshot_manager)); + return starting_scanner; + } else { + return std::shared_ptr(new FullStartingScanner(snapshot_manager)); + } + } else if (startup_mode == StartupMode::FromSnapshot()) { + const std::optional scan_tag_name = core_options_.GetScanTagName(); + if (specified_snapshot_id != std::nullopt) { + return is_streaming + ? std::shared_ptr( + new ContinuousFromSnapshotStartingScanner( + snapshot_manager, specified_snapshot_id.value())) + : std::shared_ptr(new StaticFromSnapshotStartingScanner( + snapshot_manager, specified_snapshot_id.value())); + } else if (scan_tag_name != std::nullopt) { + if (is_streaming) { + return Status::Invalid("Cannot scan from tag in streaming mode"); + } + return std::make_shared(snapshot_manager, + scan_tag_name.value()); + } else { + return Status::Invalid( + "scan.snapshot-id or scan.tag-name must be set when startup mode is " + "FROM_SNAPSHOT"); + } + } else if (startup_mode == StartupMode::FromSnapshotFull()) { + if (specified_snapshot_id != std::nullopt) { + return is_streaming + ? std::shared_ptr( + new ContinuousFromSnapshotFullStartingScanner( + snapshot_manager, specified_snapshot_id.value())) + : std::shared_ptr(new StaticFromSnapshotStartingScanner( + snapshot_manager, specified_snapshot_id.value())); + } else { + return Status::Invalid( + "scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT_FULL"); + } + } else if (startup_mode == StartupMode::FromTimestamp()) { + std::optional timestamp_millis = core_options_.GetScanTimestampMillis(); + if (timestamp_millis == std::nullopt) { + return Status::Invalid( + "scan.timestamp-millis or scan.timestamp must be set when startup mode is " + "FROM_TIMESTAMP"); + } + if (is_streaming) { + PAIMON_ASSIGN_OR_RAISE( + std::optional earlier_snapshot, + snapshot_manager->EarlierThanTimeMillis(timestamp_millis.value())); + int64_t start_id = + earlier_snapshot ? earlier_snapshot->Id() + 1 : Snapshot::FIRST_SNAPSHOT_ID; + return std::make_shared(snapshot_manager, + start_id); + } else { + PAIMON_ASSIGN_OR_RAISE( + std::optional snapshot, + snapshot_manager->EarlierOrEqualTimeMillis(timestamp_millis.value())); + if (snapshot == std::nullopt) { + return Status::Invalid(fmt::format( + "There is currently no snapshot earlier than or equal to timestamp [{}]", + timestamp_millis.value())); + } + return std::make_shared(snapshot_manager, + snapshot->Id()); + } + } + return Status::Invalid( + fmt::format("Unsupported snapshot startup mode {}", startup_mode.ToString())); + } + + protected: + CoreOptions core_options_; + std::shared_ptr snapshot_reader_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/data_split_impl.cpp b/src/paimon/core/table/source/data_split_impl.cpp new file mode 100644 index 0000000..020b501 --- /dev/null +++ b/src/paimon/core/table/source/data_split_impl.cpp @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/data_split_impl.h" + +#include +#include +#include +#include + +namespace paimon { + +bool DataSplit::SimpleDataFileMeta::operator==(const SimpleDataFileMeta& other) const { + if (this == &other) { + return true; + } + return file_path == other.file_path && file_size == other.file_size && + row_count == other.row_count && min_sequence_number == other.min_sequence_number && + max_sequence_number == other.max_sequence_number && schema_id == other.schema_id && + level == other.level && creation_time == other.creation_time && + delete_row_count == other.delete_row_count; +} +std::string DataSplit::SimpleDataFileMeta::ToString() const { + return fmt::format( + "{{filePath: {}, fileSize: {}, rowCount: {}, minSequenceNumber: {}, " + "maxSequenceNumber:{}, schemaId: {}, level: {}, creationTime: {}, deleteRowCount: " + "{}}}", + file_path, file_size, row_count, min_sequence_number, max_sequence_number, schema_id, level, + creation_time.ToString(), + delete_row_count == std::nullopt ? "null" : std::to_string(delete_row_count.value())); +} + +Result> DataSplitImpl::LatestFileCreationEpochMillis() const { + if (data_files_.empty()) { + return std::optional(); + } + int64_t epoch = INT64_MIN; + for (const auto& file : data_files_) { + PAIMON_ASSIGN_OR_RAISE(int64_t epoch_milli, file->CreationTimeEpochMillis()); + epoch = std::max(epoch, epoch_milli); + } + return std::optional(epoch); +} + +int64_t DataSplitImpl::RowCount() const { + int64_t row_count = 0; + for (const auto& file : data_files_) { + row_count += file->row_count; + } + return row_count; +} + +std::vector DataSplitImpl::GetFileList() const { + std::vector result_files; + result_files.reserve(data_files_.size()); + for (const auto& file : data_files_) { + std::string result_file_path; + if (!file->external_path) { + result_file_path = PathUtil::JoinPath(bucket_path_, file->file_name); + } else { + result_file_path = file->external_path.value(); + } + result_files.emplace_back(result_file_path, file->file_size, file->row_count, + file->min_sequence_number, file->max_sequence_number, + file->schema_id, file->level, file->creation_time, + file->delete_row_count); + } + return result_files; +} + +bool DataSplitImpl::operator==(const DataSplitImpl& other) const { + if (this == &other) { + return true; + } + return snapshot_id_ == other.snapshot_id_ && partition_ == other.partition_ && + bucket_ == other.bucket_ && bucket_path_ == other.bucket_path_ && + total_buckets_ == other.total_buckets_ && + ObjectUtils::Equal(before_files_, other.before_files_) && + before_deletion_files_ == other.before_deletion_files_ && + ObjectUtils::Equal(data_files_, other.data_files_) && + data_deletion_files_ == other.data_deletion_files_ && + is_streaming_ == other.is_streaming_ && raw_convertible_ == other.raw_convertible_; +} + +bool DataSplitImpl::TEST_Equal(const DataSplitImpl& other) const { + if (this == &other) { + return true; + } + return snapshot_id_ == other.snapshot_id_ && partition_ == other.partition_ && + bucket_ == other.bucket_ && bucket_path_ == other.bucket_path_ && + total_buckets_ == other.total_buckets_ && + ObjectUtils::TEST_Equal(before_files_, other.before_files_) && + before_deletion_files_ == other.before_deletion_files_ && + ObjectUtils::TEST_Equal(data_files_, other.data_files_) && + data_deletion_files_ == other.data_deletion_files_ && + is_streaming_ == other.is_streaming_ && raw_convertible_ == other.raw_convertible_; +} + +int64_t DataSplitImpl::PartialMergedRowCount() const { + if (!raw_convertible_) { + return 0; + } + int64_t sum = 0; + for (size_t i = 0; i < data_files_.size(); i++) { + const auto& data_file = data_files_[i]; + if (data_deletion_files_.empty() || data_deletion_files_[i] == std::nullopt) { + sum += data_file->row_count; + } else if (data_deletion_files_[i].value().cardinality != std::nullopt) { + sum += data_file->row_count - data_deletion_files_[i].value().cardinality.value(); + } + } + return sum; +} + +Result>>> +DataSplitImpl::GetFileMetaSerializer(int32_t version, const std::shared_ptr& pool) { + if (version == 1) { + // TODO(xinyu.lxy): C++ paimon do not support data file meta 08 + return Status::NotImplemented("Do not support data file meta 08."); + } else if (version == 2) { + return std::make_unique(pool); + } else if (version == 3 || version == 4) { + return std::make_unique(pool); + } else if (version == 5 || version == 6) { + return std::make_unique(pool); + } else if (version == 7) { + return std::make_unique(pool); + } else if (version == VERSION) { + return std::make_unique(pool); + } else { + return Status::Invalid( + fmt::format("Expecting DataSplit version to be smaller or equal than {}, but found {}.", + VERSION, version)); + } +} + +std::string DataSplitImpl::ToString() const { + return fmt::format( + "snapshotId={}, partition={}, bucket={}, bucketPath={}, totalBuckets={}, " + "beforeFiles={}, " + "beforeDeletionFiles={}, dataFiles={}, dataDeletionFiles={}, isStreaming={}, " + "rawConvertible={}", + snapshot_id_, partition_.ToString(), bucket_, bucket_path_, + total_buckets_ == std::nullopt ? "null" : std::to_string(total_buckets_.value()), + StringUtils::VectorToString(before_files_), + StringUtils::VectorToString(before_deletion_files_), + StringUtils::VectorToString(data_files_), StringUtils::VectorToString(data_deletion_files_), + is_streaming_, raw_convertible_); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_split_impl.h b/src/paimon/core/table/source/data_split_impl.h new file mode 100644 index 0000000..ce1124a --- /dev/null +++ b/src/paimon/core/table/source/data_split_impl.h @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/utils/object_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/preconditions.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/io/data_file_meta_09_serializer.h" +#include "paimon/core/io/data_file_meta_10_serializer.h" +#include "paimon/core/io/data_file_meta_12_serializer.h" +#include "paimon/core/io/data_file_meta_first_row_id_legacy_serializer.h" +#include "paimon/core/io/data_file_meta_serializer.h" +#include "paimon/core/table/source/deletion_file.h" +#include "paimon/table/source/data_split.h" + +namespace paimon { +/// Input splits. Needed by most batch computation engines. +class DataSplitImpl : public DataSplit { + public: + static constexpr int64_t MAGIC = -2394839472490812314L; + static constexpr int32_t VERSION = 8; + + int64_t SnapshotId() const { + return snapshot_id_; + } + + const BinaryRow& Partition() const { + return partition_; + } + + int32_t Bucket() const { + return bucket_; + } + + const std::string& BucketPath() const { + return bucket_path_; + } + + const std::optional& TotalBuckets() const { + return total_buckets_; + } + + const std::vector>& BeforeFiles() const { + return before_files_; + } + + const std::vector>& BeforeDeletionFiles() const { + return before_deletion_files_; + } + + const std::vector>& DataFiles() const { + return data_files_; + } + + const std::vector>& DeletionFiles() const { + return data_deletion_files_; + } + + bool IsStreaming() const { + return is_streaming_; + } + + bool RawConvertible() const { + return raw_convertible_; + } + + Result> LatestFileCreationEpochMillis() const; + + int64_t RowCount() const; + + std::vector GetFileList() const override; + + bool operator==(const DataSplitImpl& other) const; + bool TEST_Equal(const DataSplitImpl& other) const; + + /// Obtain merged row count as much as possible. There are two scenarios where accurate row + /// count + /// can be calculated: + /// + /// 1. raw file and no deletion file. + /// + /// 2. raw file + deletion file with cardinality. + int64_t PartialMergedRowCount() const; + + // Builder + /// Builder for `DataSplitImpl`. + class Builder { + public: + Builder(const BinaryRow& partition, int32_t bucket, const std::string& bucket_path, + std::vector>&& data_files) + : split_(std::shared_ptr( + new DataSplitImpl(partition, bucket, bucket_path, std::move(data_files)))) {} + + const std::vector>& DataFiles() { + return split_->DataFiles(); + } + + Builder& WithTotalBuckets(const std::optional& total_buckets) { + split_->total_buckets_ = total_buckets; + return *this; + } + + Builder& WithSnapshot(int64_t snapshot) { + split_->snapshot_id_ = snapshot; + return *this; + } + + Builder& WithBeforeFiles(std::vector>&& before_files) { + split_->before_files_ = std::move(before_files); + return *this; + } + + Builder& WithBeforeDeletionFiles( + const std::vector>& before_deletion_files) { + split_->before_deletion_files_ = before_deletion_files; + return *this; + } + + Builder& WithDataDeletionFiles( + const std::vector>& data_deletion_files) { + split_->data_deletion_files_ = data_deletion_files; + return *this; + } + + Builder& IsStreaming(bool is_streaming) { + split_->is_streaming_ = is_streaming; + return *this; + } + + Builder& RawConvertible(bool raw_convertible) { + split_->raw_convertible_ = raw_convertible; + return *this; + } + + Result> Build() const { + PAIMON_RETURN_NOT_OK(Preconditions::CheckArgument(split_->bucket_ != -1)); + return split_; + } + + private: + std::shared_ptr split_; + }; + + static Result>>> + GetFileMetaSerializer(int32_t version, const std::shared_ptr& pool); + + std::string ToString() const; + + private: + DataSplitImpl(const BinaryRow& partition, int32_t bucket, const std::string& bucket_path, + std::vector>&& data_files) + : partition_(partition), + bucket_(bucket), + bucket_path_(bucket_path), + data_files_(std::move(data_files)) {} + + private: + int64_t snapshot_id_ = 0; + BinaryRow partition_ = BinaryRow::EmptyRow(); + int32_t bucket_ = -1; + std::string bucket_path_; + std::optional total_buckets_; + + std::vector> before_files_; + std::vector> before_deletion_files_; + std::vector> data_files_; + std::vector> data_deletion_files_; + + bool is_streaming_ = false; + bool raw_convertible_ = false; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/deletion_file.h b/src/paimon/core/table/source/deletion_file.h new file mode 100644 index 0000000..2f7c5c4 --- /dev/null +++ b/src/paimon/core/table/source/deletion_file.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "fmt/core.h" +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +/// Deletion file for data file, the first 4 bytes are length, should, the following is the bitmap +/// content. +/// +/// The first 4 bytes are length, should equal to `length`. +/// Next 4 bytes are the magic number, should be equal to 1581511376. +/// The remaining content should be a RoaringBitmap. +/// Member path indicates the deletion vector index file name of the corresponding data file in +/// DataSplits. +struct DeletionFile { + DeletionFile(const std::string& _path, int64_t _offset, int64_t _length, + const std::optional& _cardinality) + : path(_path), offset(_offset), length(_length), cardinality(_cardinality) {} + + bool operator==(const DeletionFile& other) const { + if (this == &other) { + return true; + } + return path == other.path && offset == other.offset && length == other.length && + cardinality == other.cardinality; + } + + std::string ToString() const { + return fmt::format( + "{{path = {}, offset = {}, length = {}, cardinality = {}}}", path, offset, length, + cardinality == std::nullopt ? "null" : std::to_string(cardinality.value())); + } + + static void Serialize(const std::optional& file, MemorySegmentOutputStream* out) { + if (file == std::nullopt) { + out->WriteValue(0); + } else { + out->WriteValue(1); + out->WriteString(file.value().path); + out->WriteValue(file.value().offset); + out->WriteValue(file.value().length); + if (file.value().cardinality == std::nullopt) { + out->WriteValue(-1); + } else { + out->WriteValue(file.value().cardinality.value()); + } + } + } + + static void SerializeList(const std::vector>& files, + MemorySegmentOutputStream* out) { + if (files.empty()) { + out->WriteValue(0); + } else { + out->WriteValue(1); + out->WriteValue(files.size()); + for (const auto& file : files) { + Serialize(file, out); + } + } + } + + static Result>> DeserializeList(DataInputStream* in, + int32_t version) { + std::vector> files; + PAIMON_ASSIGN_OR_RAISE(char has_deletion_file, in->ReadValue()); + if (has_deletion_file == static_cast(1)) { + PAIMON_ASSIGN_OR_RAISE(int32_t size, in->ReadValue()); + files.reserve(size); + for (int32_t i = 0; i < size; i++) { + std::optional file; + if (version >= 4) { + PAIMON_ASSIGN_OR_RAISE(file, Deserialize(in)); + } else if (version >= 1 && version <= 3) { + PAIMON_ASSIGN_OR_RAISE(file, DeserializeV3(in)); + } else { + return Status::Invalid( + fmt::format("Unsupported deletion file version: {}", version)); + } + files.emplace_back(std::move(file)); + } + } + return files; + } + + private: + static Result> Deserialize(DataInputStream* in) { + char has_deletion_file = 0; + PAIMON_ASSIGN_OR_RAISE(has_deletion_file, in->ReadValue()); + if (has_deletion_file == static_cast(0)) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string path, in->ReadString()); + PAIMON_ASSIGN_OR_RAISE(int64_t offset, in->ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t length, in->ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t cardinality, in->ReadValue()); + return std::optional( + DeletionFile(path, offset, length, + cardinality == -1 ? std::nullopt : std::optional(cardinality))); + } + + static Result> DeserializeV3(DataInputStream* in) { + PAIMON_ASSIGN_OR_RAISE(char has_deletion_file, in->ReadValue()); + if (has_deletion_file == static_cast(0)) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string path, in->ReadString()); + PAIMON_ASSIGN_OR_RAISE(int64_t offset, in->ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t length, in->ReadValue()); + return std::optional(DeletionFile(path, offset, length, std::nullopt)); + } + + public: + std::string path = ""; + int64_t offset = -1; + int64_t length = -1; + // the number of deleted rows. + std::optional cardinality; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/deletion_file_test.cpp b/src/paimon/core/table/source/deletion_file_test.cpp new file mode 100644 index 0000000..b283992 --- /dev/null +++ b/src/paimon/core/table/source/deletion_file_test.cpp @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/deletion_file.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(DeletionFileTest, TestSimple) { + { + DeletionFile df("my_path", 100, 233, std::nullopt); + ASSERT_EQ("{path = my_path, offset = 100, length = 233, cardinality = null}", + df.ToString()); + } + { + DeletionFile df("my_path", 100, 233, 234); + ASSERT_EQ("{path = my_path, offset = 100, length = 233, cardinality = 234}", df.ToString()); + } +} + +TEST(DeletionFileTest, TestSerializeAndDeserialize) { + auto pool = GetDefaultPool(); + MemorySegmentOutputStream out(/*segment_size=*/8, pool); + DeletionFile df("my_path", 100, 233, 234); + ASSERT_EQ(df, df); + DeletionFile::SerializeList({df, std::nullopt}, &out); + auto bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + auto byte_array_input_stream = + std::make_shared(bytes->data(), bytes->size()); + DataInputStream in(byte_array_input_stream); + ASSERT_OK_AND_ASSIGN(std::vector> deletion_files, + DeletionFile::DeserializeList(&in, /*version=*/4)); + ASSERT_EQ(2, deletion_files.size()); + ASSERT_TRUE(deletion_files[0]); + ASSERT_EQ(deletion_files[0], df); + ASSERT_FALSE(deletion_files[1]); + ASSERT_EQ("{path = my_path, offset = 100, length = 233, cardinality = 234}", + deletion_files[0].value().ToString()); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/plan_impl.cpp b/src/paimon/core/table/source/plan_impl.cpp new file mode 100644 index 0000000..1d418c5 --- /dev/null +++ b/src/paimon/core/table/source/plan_impl.cpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/plan_impl.h" + +namespace paimon { + +const std::shared_ptr PlanImpl::EmptyPlan() { + static const std::shared_ptr empty_plan = + std::make_shared(std::optional(), std::vector>()); + return empty_plan; +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/plan_impl.h b/src/paimon/core/table/source/plan_impl.h new file mode 100644 index 0000000..e609ccb --- /dev/null +++ b/src/paimon/core/table/source/plan_impl.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/table/source/plan.h" + +namespace paimon { + +/// An implementation of `Plan`. +class PlanImpl : public Plan { + public: + PlanImpl(const std::optional& snapshot_id, + const std::vector>& splits) + : snapshot_id_(snapshot_id), splits_(splits) {} + + std::optional SnapshotId() const override { + return snapshot_id_; + } + + const std::vector>& Splits() const override { + return splits_; + } + + static const std::shared_ptr EmptyPlan(); + + private: + std::optional snapshot_id_; + std::vector> splits_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/scan_mode.h b/src/paimon/core/table/source/scan_mode.h new file mode 100644 index 0000000..c236aad --- /dev/null +++ b/src/paimon/core/table/source/scan_mode.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +namespace paimon { +/// Scan which part of the snapshot. +enum class ScanMode { + /// Scan complete data files of a snapshot. + ALL = 0, + + /// Only scan newly changed files of a snapshot. + DELTA = 1 + + /// Only scan changelog files of a snapshot. + /* CHANGELOG = 2 */ +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/split.cpp b/src/paimon/core/table/source/split.cpp new file mode 100644 index 0000000..007df3c --- /dev/null +++ b/src/paimon/core/table/source/split.cpp @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/utils/serialization_utils.h" +#include "paimon/core/global_index/indexed_split_impl.h" +#include "paimon/core/io/data_file_meta_serializer.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/deletion_file.h" +#include "paimon/core/table/source/fallback_data_split.h" +#include "paimon/core/utils/object_serializer.h" +#include "paimon/global_index/indexed_split.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/table/source/data_split.h" +namespace paimon { +struct DataFileMeta; +namespace { +Status WriteDataSplit(const std::shared_ptr& data_split_impl, + MemorySegmentOutputStream* out, const std::shared_ptr& pool) { + out->WriteValue(DataSplitImpl::MAGIC); + out->WriteValue(DataSplitImpl::VERSION); + out->WriteValue(data_split_impl->SnapshotId()); + + PAIMON_RETURN_NOT_OK(SerializationUtils::SerializeBinaryRow(data_split_impl->Partition(), out)); + out->WriteValue(data_split_impl->Bucket()); + out->WriteString(data_split_impl->BucketPath()); + + std::optional total_buckets = data_split_impl->TotalBuckets(); + if (total_buckets == std::nullopt) { + out->WriteValue(false); + } else { + out->WriteValue(true); + out->WriteValue(total_buckets.value()); + } + + DataFileMetaSerializer serializer(pool); + PAIMON_RETURN_NOT_OK(serializer.SerializeList(data_split_impl->BeforeFiles(), out)); + + DeletionFile::SerializeList(data_split_impl->BeforeDeletionFiles(), out); + PAIMON_RETURN_NOT_OK(serializer.SerializeList(data_split_impl->DataFiles(), out)); + DeletionFile::SerializeList(data_split_impl->DeletionFiles(), out); + out->WriteValue(data_split_impl->IsStreaming()); + out->WriteValue(data_split_impl->RawConvertible()); + return Status::OK(); +} + +Result> ReadDataSplitWithoutMagicNumber( + int64_t magic, DataInputStream* in, const std::shared_ptr& pool) { + int32_t version = 1; + if (magic == DataSplitImpl::MAGIC) { + PAIMON_ASSIGN_OR_RAISE(version, in->ReadValue()); + } + + // version 1 does not write magic number in, so the first long is snapshot id. + int64_t snapshot_id = magic; + if (version != 1) { + PAIMON_ASSIGN_OR_RAISE(snapshot_id, in->ReadValue()); + } + + PAIMON_ASSIGN_OR_RAISE(BinaryRow partition, + SerializationUtils::DeserializeBinaryRow(in, pool.get())); + int32_t bucket = -1; + PAIMON_ASSIGN_OR_RAISE(bucket, in->ReadValue()); + std::string bucket_path; + PAIMON_ASSIGN_OR_RAISE(bucket_path, in->ReadString()); + + std::optional total_buckets; + if (version >= 6) { + PAIMON_ASSIGN_OR_RAISE(bool total_buckets_exist, in->ReadValue()); + if (total_buckets_exist) { + PAIMON_ASSIGN_OR_RAISE(total_buckets, in->ReadValue()); + } + } + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr>> data_file_serializer, + DataSplitImpl::GetFileMetaSerializer(version, pool)); + std::vector> before_files; + PAIMON_ASSIGN_OR_RAISE(before_files, data_file_serializer->DeserializeList(in)); + // compatible for deletion file + std::vector> before_deletion_files; + PAIMON_ASSIGN_OR_RAISE(before_deletion_files, DeletionFile::DeserializeList(in, version)); + + std::vector> data_files; + PAIMON_ASSIGN_OR_RAISE(data_files, data_file_serializer->DeserializeList(in)); + // compatible for deletion file + std::vector> data_deletion_files; + PAIMON_ASSIGN_OR_RAISE(data_deletion_files, DeletionFile::DeserializeList(in, version)); + + bool is_streaming = false; + PAIMON_ASSIGN_OR_RAISE(is_streaming, in->ReadValue()); + bool raw_convertible = false; + PAIMON_ASSIGN_OR_RAISE(raw_convertible, in->ReadValue()); + + DataSplitImpl::Builder builder(partition, bucket, bucket_path, std::move(data_files)); + builder.WithTotalBuckets(total_buckets) + .WithSnapshot(snapshot_id) + .WithBeforeFiles(std::move(before_files)) + .IsStreaming(is_streaming) + .RawConvertible(raw_convertible); + if (!before_deletion_files.empty()) { + builder.WithBeforeDeletionFiles(before_deletion_files); + } + if (!data_deletion_files.empty()) { + builder.WithDataDeletionFiles(data_deletion_files); + } + return builder.Build(); +} + +} // namespace + +Result Split::Serialize(const std::shared_ptr& split, + const std::shared_ptr& pool) { + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + if (auto data_split_impl = std::dynamic_pointer_cast(split)) { + PAIMON_RETURN_NOT_OK(WriteDataSplit(data_split_impl, &out, pool)); + } else if (auto indexed_split_impl = std::dynamic_pointer_cast(split)) { + out.WriteValue(IndexedSplitImpl::MAGIC); + out.WriteValue(IndexedSplitImpl::VERSION); + auto inner_split_impl = + std::dynamic_pointer_cast(indexed_split_impl->GetDataSplit()); + if (!inner_split_impl) { + return Status::Invalid("inner split in IndexedSplit is supposed to be DataSplit"); + } + PAIMON_RETURN_NOT_OK(WriteDataSplit(inner_split_impl, &out, pool)); + auto row_ranges = indexed_split_impl->RowRanges(); + out.WriteValue(row_ranges.size()); + for (const auto& range : row_ranges) { + out.WriteValue(range.from); + out.WriteValue(range.to); + } + + auto scores = indexed_split_impl->Scores(); + if (!scores.empty()) { + out.WriteValue(true); + out.WriteValue(scores.size()); + for (const auto& score : scores) { + out.WriteValue(score); + } + } else { + out.WriteValue(false); + } + } else { + return Status::Invalid("invalid split, cannot cast to DataSplit or IndexedSplit"); + } + PAIMON_UNIQUE_PTR bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + return std::string(bytes->data(), bytes->size()); +} + +Result> Split::Deserialize(const char* buffer, size_t length, + const std::shared_ptr& pool) { + auto input_stream = std::make_shared(buffer, length); + DataInputStream in(input_stream); + + int64_t magic = -1; + PAIMON_ASSIGN_OR_RAISE(magic, in.ReadValue()); + + if (magic == IndexedSplitImpl::MAGIC) { + PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue()); + if (version != IndexedSplitImpl::VERSION) { + return Status::Invalid(fmt::format("Unsupported IndexedSplit version: {}", version)); + } + PAIMON_ASSIGN_OR_RAISE(int64_t data_split_magic, in.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_split, + ReadDataSplitWithoutMagicNumber(data_split_magic, &in, pool)); + PAIMON_ASSIGN_OR_RAISE(int32_t range_size, in.ReadValue()); + std::vector row_ranges; + row_ranges.reserve(range_size); + for (int32_t i = 0; i < range_size; ++i) { + PAIMON_ASSIGN_OR_RAISE(int64_t range_from, in.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t range_to, in.ReadValue()); + row_ranges.emplace_back(range_from, range_to); + } + std::vector scores; + PAIMON_ASSIGN_OR_RAISE(bool has_scores, in.ReadValue()); + if (has_scores) { + PAIMON_ASSIGN_OR_RAISE(int32_t scores_length, in.ReadValue()); + scores.resize(scores_length); + for (int32_t i = 0; i < scores_length; ++i) { + PAIMON_ASSIGN_OR_RAISE(float score, in.ReadValue()); + scores[i] = score; + } + } + // TODO(lisizhuo.lsz): support fallback split in IndexedSplit + PAIMON_ASSIGN_OR_RAISE(int64_t pos, in.GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in.Length()); + if (pos == stream_length) { + return std::make_shared(data_split, row_ranges, scores); + } else if (pos == stream_length - 1) { + return Status::Invalid( + "invalid IndexedSplit, do not support FallbackSplit in IndexedSplit"); + } else { + return Status::Invalid( + fmt::format("invalid IndexedSplit, remaining {} bytes after deserializing", + stream_length - pos)); + } + } else if (magic == DataSplitImpl::MAGIC) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_split, + ReadDataSplitWithoutMagicNumber(magic, &in, pool)); + PAIMON_ASSIGN_OR_RAISE(int64_t pos, in.GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in.Length()); + if (pos == stream_length) { + return data_split; + } else if (pos == stream_length - 1) { + PAIMON_ASSIGN_OR_RAISE(bool is_fallback, in.ReadValue()); + return std::make_shared(data_split, is_fallback); + } else { + return Status::Invalid(fmt::format( + "invalid data split byte stream, remaining {} bytes after deserializing", + stream_length - pos)); + } + } + return Status::Invalid("invalid split, must be DataSplit or IndexedSplit"); +} +} // namespace paimon diff --git a/src/paimon/core/table/source/split_generator.h b/src/paimon/core/table/source/split_generator.h new file mode 100644 index 0000000..e7a5661 --- /dev/null +++ b/src/paimon/core/table/source/split_generator.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/result.h" + +namespace paimon { +struct DataFileMeta; + +/// Generate splits from `DataFileMeta`s. +class SplitGenerator { + public: + struct SplitGroup { + static SplitGroup RawConvertibleGroup(std::vector>&& files) { + return SplitGroup(std::move(files), true); + } + + static SplitGroup NonRawConvertibleGroup( + std::vector>&& files) { + return SplitGroup(std::move(files), false); + } + + bool operator==(const SplitGroup& other) const { + if (this == &other) { + return true; + } + if (files.size() != other.files.size()) { + return false; + } + for (size_t i = 0; i < files.size(); ++i) { + if (!(*(files[i]) == *(other.files[i]))) { + return false; + } + } + return raw_convertible == other.raw_convertible; + } + + std::vector> files; + bool raw_convertible; + + private: + SplitGroup(std::vector>&& _files, bool _raw_convertible) + : files(std::move(_files)), raw_convertible(_raw_convertible) {} + }; + + public: + virtual ~SplitGenerator() = default; + virtual Result> SplitForBatch( + std::vector>&& files) const = 0; + + virtual Result> SplitForStreaming( + std::vector>&& files) const = 0; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/split_generator_test.cpp b/src/paimon/core/table/source/split_generator_test.cpp new file mode 100644 index 0000000..caf9e70 --- /dev/null +++ b/src/paimon/core/table/source/split_generator_test.cpp @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/split_generator.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/options/merge_engine.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/source/append_only_split_generator.h" +#include "paimon/core/table/source/merge_tree_split_generator.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SplitGeneratorTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + key_comparator_ = + FieldsComparator::Create({DataField(0, arrow::field("f0", arrow::int32(), false))}, + /*is_ascending_order=*/true) + .value(); + } + void TearDown() override {} + + std::shared_ptr CreateDataFileMeta(const std::string& file_name, + int64_t file_size, int64_t min_sequence_number, + int64_t max_sequence_number) { + return std::make_shared( + file_name, file_size, /*row_count=*/1, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/SimpleStats::EmptyStats(), min_sequence_number, max_sequence_number, + /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), /*delete_row_count=*/0, + /*embedded_index=*/nullptr, FileSource::Append(), /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::optional(), + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + } + + std::shared_ptr CreateDataFileMeta(const std::string& file_name, int32_t level, + int32_t min_key, int32_t max_key, + int64_t max_sequence_number) { + return CreateDataFileMeta(file_name, level, min_key, max_key, max_sequence_number, 0l); + } + + std::shared_ptr CreateDataFileMeta(const std::string& file_name, int32_t level, + int32_t min_key, int32_t max_key, + int64_t max_sequence_number, + std::optional delete_row_count) { + return std::make_shared( + file_name, max_key - min_key + 1, /*row_count=*/max_key - min_key + 1, + BinaryRowGenerator::GenerateRow({min_key}, pool_.get()), + BinaryRowGenerator::GenerateRow({max_key}, pool_.get()), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/SimpleStats::EmptyStats(), 0, max_sequence_number, + /*schema_id=*/0, + /*level=*/level, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), /*delete_row_count=*/delete_row_count, + /*embedded_index=*/nullptr, FileSource::Append(), + /*external_path=*/std::nullopt, + /*value_stats_cols=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + } + + std::shared_ptr CreateDataFileMeta(const std::string& file_name, int32_t min_key, + int32_t max_key) { + return std::make_shared( + file_name, /*file_size=*/max_key - min_key + 1, + /*row_count=*/max_key - min_key + 1, /*min_key=*/ + BinaryRowGenerator::GenerateRow({min_key}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({max_key}, pool_.get()), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, + /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), /*delete_row_count=*/0, + /*embedded_index=*/nullptr, FileSource::Append(), /*external_path=*/std::nullopt, + /*value_stats_cols=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + } + + static void CheckResult(const std::vector& result_groups, + const std::vector>& expected_file_names, + const std::vector& expected_raw_convertible) { + std::vector result_raw_convertible; + for (const auto& group : result_groups) { + result_raw_convertible.push_back(group.raw_convertible); + } + ASSERT_EQ(result_raw_convertible, expected_raw_convertible); + CheckResult(result_groups, expected_file_names); + } + + static void CheckResult(const std::vector& result_groups, + const std::vector>& expected_file_names) { + std::vector> expected = expected_file_names; + std::vector> result; + for (const auto& group : result_groups) { + std::vector one_group_files; + for (const auto& file : group.files) { + one_group_files.push_back(file->file_name); + } + result.push_back(std::move(one_group_files)); + } + for (auto& re : result) { + std::sort(re.begin(), re.end()); + } + for (auto& ep : expected) { + std::sort(ep.begin(), ep.end()); + } + std::sort(result.begin(), result.end()); + std::sort(expected.begin(), expected.end()); + ASSERT_EQ(result, expected); + } + + private: + std::shared_ptr pool_; + std::shared_ptr key_comparator_; +}; + +TEST_F(SplitGeneratorTest, TestAppend) { + std::vector> files = { + CreateDataFileMeta("1", 11, 0, 20), CreateDataFileMeta("2", 13, 21, 30), + CreateDataFileMeta("3", 46, 31, 40), CreateDataFileMeta("4", 23, 41, 50), + CreateDataFileMeta("5", 4, 51, 60), CreateDataFileMeta("6", 101, 61, 100)}; + { + auto tmp_files = files; + AppendOnlySplitGenerator split_generator(/*target_split_size=*/40, /*open_file_cost=*/2, + BucketMode::HASH_FIXED); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(tmp_files))); + std::vector> expected = {{"1", "2"}, {"3"}, {"4", "5"}, {"6"}}; + CheckResult(split_groups, expected); + } + { + auto tmp_files = files; + AppendOnlySplitGenerator split_generator(/*target_split_size=*/70, /*open_file_cost=*/2, + BucketMode::HASH_FIXED); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(tmp_files))); + std::vector> expected = {{"1", "2", "3"}, {"4", "5"}, {"6"}}; + CheckResult(split_groups, expected); + } + { + auto tmp_files = files; + AppendOnlySplitGenerator split_generator(/*target_split_size=*/40, /*open_file_cost=*/20, + BucketMode::HASH_FIXED); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(tmp_files))); + std::vector> expected = {{"1", "2"}, {"3"}, {"4"}, {"5"}, {"6"}}; + CheckResult(split_groups, expected); + } + { + auto tmp_files = files; + AppendOnlySplitGenerator split_generator(/*target_split_size=*/40, /*open_file_cost=*/2, + BucketMode::BUCKET_UNAWARE); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForStreaming(std::move(tmp_files))); + std::vector> expected = {{"1", "2"}, {"3"}, {"4", "5"}, {"6"}}; + CheckResult(split_groups, expected); + } + { + auto tmp_files = files; + AppendOnlySplitGenerator split_generator(/*target_split_size=*/40, /*open_file_cost=*/2, + BucketMode::HASH_FIXED); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForStreaming(std::move(tmp_files))); + std::vector> expected = {{"1", "2", "3", "4", "5", "6"}}; + CheckResult(split_groups, expected); + } +} + +TEST_F(SplitGeneratorTest, TestMergeTree) { + std::vector> files = { + CreateDataFileMeta("1", 0, 10), CreateDataFileMeta("2", 0, 12), + CreateDataFileMeta("3", 15, 60), CreateDataFileMeta("4", 18, 40), + CreateDataFileMeta("5", 82, 85), CreateDataFileMeta("6", 100, 200)}; + { + auto tmp_files = files; + MergeTreeSplitGenerator split_generator(/*target_split_size=*/100, /*open_file_cost=*/2, + /*deletion_vectors_enabled=*/false, + MergeEngine::DEDUPLICATE, key_comparator_); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(tmp_files))); + std::vector> expected = {{"1", "2", "3", "4", "5"}, {"6"}}; + CheckResult(split_groups, expected); + } + { + auto tmp_files = files; + MergeTreeSplitGenerator split_generator(/*target_split_size=*/100, /*open_file_cost=*/30, + /*deletion_vectors_enabled=*/false, + MergeEngine::DEDUPLICATE, key_comparator_); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(tmp_files))); + std::vector> expected = {{"1", "2", "3", "4"}, {"5"}, {"6"}}; + CheckResult(split_groups, expected); + } +} + +TEST_F(SplitGeneratorTest, TestSplitRawConvertible) { + MergeTreeSplitGenerator split_generator(/*target_split_size=*/100, /*open_file_cost=*/2, + /*deletion_vectors_enabled=*/false, + MergeEngine::DEDUPLICATE, key_comparator_); + { + // When level0 exists, should not be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 0, 0, 10, 10l), CreateDataFileMeta("2", 0, 10, 20, 20l)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {false}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // When deleteRowCount > 0, should not be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l, /*delete_row_count=*/1l), + CreateDataFileMeta("2", 1, 10, 20, 20l)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {false}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // No level0 and deleteRowCount == 0: + // All in one level, should be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l), CreateDataFileMeta("2", 1, 10, 20, 20l)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {true}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // Not all in one level, should not be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l), CreateDataFileMeta("2", 2, 10, 20, 20l)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {false}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // Not all in one level but with deletion vectors enabled, should be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l), CreateDataFileMeta("2", 2, 10, 20, 20l)}; + MergeTreeSplitGenerator split_generator_dv(/*target_split_size=*/100, /*open_file_cost=*/2, + /*deletion_vectors_enabled=*/true, + MergeEngine::DEDUPLICATE, key_comparator_); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator_dv.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {true}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // Not all in one level but with first row merge engine, should be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l), CreateDataFileMeta("2", 2, 10, 20, 20l)}; + MergeTreeSplitGenerator split_generator_dv(/*target_split_size=*/100, /*open_file_cost=*/2, + /*deletion_vectors_enabled=*/false, + MergeEngine::FIRST_ROW, key_comparator_); + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator_dv.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {true}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // Split with one file should be rawConvertible + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10L), CreateDataFileMeta("2", 2, 0, 12, 12L), + CreateDataFileMeta("3", 3, 15, 60, 60L), CreateDataFileMeta("4", 4, 18, 40, 40L), + CreateDataFileMeta("5", 5, 82, 85, 85L), CreateDataFileMeta("6", 6, 100, 200, 200L)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2", "3", "4", "5"}, {"6"}}; + std::vector expected_raw_convertible = {false, true}; + CheckResult(split_groups, expected, expected_raw_convertible); + } + { + // test convertible for old version + std::vector> files = { + CreateDataFileMeta("1", 1, 0, 10, 10l, std::nullopt), + CreateDataFileMeta("2", 1, 10, 20, 20l, std::nullopt)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2"}}; + std::vector expected_raw_convertible = {true}; + CheckResult(split_groups, expected, expected_raw_convertible); + } +} + +TEST_F(SplitGeneratorTest, TestMergeTreeSplitRawConvertible) { + MergeTreeSplitGenerator split_generator(/*target_split_size=*/100, /*open_file_cost=*/2, + /*deletion_vectors_enabled=*/false, + MergeEngine::DEDUPLICATE, key_comparator_); + std::vector> files = { + CreateDataFileMeta("1", 0, 0, 10, 10L), CreateDataFileMeta("2", 0, 0, 12, 12L), + CreateDataFileMeta("3", 0, 13, 20, 20L), CreateDataFileMeta("4", 0, 21, 200, 200L), + CreateDataFileMeta("5", 0, 201, 210, 210L), CreateDataFileMeta("6", 0, 211, 220, 220L)}; + ASSERT_OK_AND_ASSIGN(std::vector split_groups, + split_generator.SplitForBatch(std::move(files))); + std::vector> expected = {{"1", "2", "3"}, {"4"}, {"5", "6"}}; + std::vector expected_raw_convertible = {false, true, false}; + CheckResult(split_groups, expected, expected_raw_convertible); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/startup_mode.cpp b/src/paimon/core/table/source/startup_mode.cpp new file mode 100644 index 0000000..f6e3cec --- /dev/null +++ b/src/paimon/core/table/source/startup_mode.cpp @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/table/source/startup_mode.h" + +#include "fmt/format.h" +#include "paimon/status.h" + +namespace paimon { + +const StartupMode StartupMode::Default() { + static const StartupMode mode = StartupMode("default"); + return mode; +} + +const StartupMode StartupMode::LatestFull() { + static const StartupMode mode = StartupMode("latest-full"); + return mode; +} +const StartupMode StartupMode::Latest() { + static const StartupMode mode = StartupMode("latest"); + return mode; +} +const StartupMode StartupMode::FromSnapshot() { + static const StartupMode mode = StartupMode("from-snapshot"); + return mode; +} +const StartupMode StartupMode::FromSnapshotFull() { + static const StartupMode mode = StartupMode("from-snapshot-full"); + return mode; +} +const StartupMode StartupMode::FromTimestamp() { + static const StartupMode mode = StartupMode("from-timestamp"); + return mode; +} + +std::string StartupMode::ToString() const { + return value_; +} + +bool StartupMode::operator==(const StartupMode& other) const { + if (this == &other) { + return true; + } + return value_ == other.value_; +} + +Result StartupMode::FromString(const std::string& str) { + if (str == StartupMode::Default().ToString()) { + return StartupMode::Default(); + } else if (str == StartupMode::LatestFull().ToString()) { + return StartupMode::LatestFull(); + } else if (str == StartupMode::Latest().ToString()) { + return StartupMode::Latest(); + } else if (str == StartupMode::FromSnapshot().ToString()) { + return StartupMode::FromSnapshot(); + } else if (str == StartupMode::FromSnapshotFull().ToString()) { + return StartupMode::FromSnapshotFull(); + } else if (str == StartupMode::FromTimestamp().ToString()) { + return StartupMode::FromTimestamp(); + } else { + return Status::Invalid(fmt::format("invalid startup mode {}", str)); + } +} +} // namespace paimon diff --git a/src/paimon/core/table/source/startup_mode_test.cpp b/src/paimon/core/table/source/startup_mode_test.cpp new file mode 100644 index 0000000..d455c57 --- /dev/null +++ b/src/paimon/core/table/source/startup_mode_test.cpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/table/source/startup_mode.h" + +#include "gtest/gtest.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(StartupModeTest, FromString) { + ASSERT_OK_AND_ASSIGN(StartupMode mode, StartupMode::FromString("default")); + ASSERT_EQ(StartupMode::Default(), mode); + ASSERT_OK_AND_ASSIGN(mode, StartupMode::FromString("latest-full")); + ASSERT_EQ(StartupMode::LatestFull(), mode); + ASSERT_OK_AND_ASSIGN(mode, StartupMode::FromString("latest")); + ASSERT_EQ(StartupMode::Latest(), mode); + ASSERT_OK_AND_ASSIGN(mode, StartupMode::FromString("from-snapshot")); + ASSERT_EQ(StartupMode::FromSnapshot(), mode); + ASSERT_OK_AND_ASSIGN(mode, StartupMode::FromString("from-snapshot-full")); + ASSERT_EQ(StartupMode::FromSnapshotFull(), mode); + ASSERT_OK_AND_ASSIGN(mode, StartupMode::FromString("from-timestamp")); + ASSERT_EQ(StartupMode::FromTimestamp(), mode); + ASSERT_NOK(StartupMode::FromString("unknown")); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp new file mode 100644 index 0000000..5afe993 --- /dev/null +++ b/src/paimon/core/table/source/table_scan.cpp @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/table/source/table_scan.h" + +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/predicate/predicate_validator.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/core_options.h" +#include "paimon/core/index/index_file_handler.h" +#include "paimon/core/manifest/index_manifest_file.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/append_only_file_store_scan.h" +#include "paimon/core/operation/data_evolution_file_store_scan.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/key_value_file_store_scan.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/schema_validation.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/source/abstract_table_scan.h" +#include "paimon/core/table/source/append_only_split_generator.h" +#include "paimon/core/table/source/data_evolution_batch_scan.h" +#include "paimon/core/table/source/data_evolution_split_generator.h" +#include "paimon/core/table/source/data_table_batch_scan.h" +#include "paimon/core/table/source/data_table_stream_scan.h" +#include "paimon/core/table/source/merge_tree_split_generator.h" +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +#include "paimon/core/table/source/split_generator.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/index_file_path_factories.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/format/file_format.h" +#include "paimon/result.h" +#include "paimon/scan_context.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class Executor; +class MemoryPool; + +namespace { + +class TableScanImpl { + public: + static Result> CreateFileStoreScan( + const std::shared_ptr& path_factory, + const std::shared_ptr& arrow_schema, + const std::shared_ptr& table_schema, const CoreOptions& core_options, + const std::shared_ptr& executor, const std::shared_ptr& memory_pool, + const ScanContext* context) { + auto fs = core_options.GetFileSystem(); + auto manifest_file_format = core_options.GetManifestFormat(); + std::string branch = BranchManager::NormalizeBranch(core_options.GetBranch()); + auto snapshot_manager = std::make_shared(fs, context->GetPath(), branch); + // TODO(liancheng.lsz): support fallback branch in scan + auto schema_manager = std::make_shared(fs, context->GetPath(), branch); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(fs, manifest_file_format, core_options.GetManifestCompression(), + path_factory, memory_pool)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, table_schema->PartitionKeys())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(fs, manifest_file_format, core_options.GetManifestCompression(), + path_factory, core_options.GetManifestTargetFileSize(), + memory_pool, core_options, partition_schema)); + if (table_schema->PrimaryKeys().empty()) { + if (core_options.DataEvolutionEnabled()) { + return DataEvolutionFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, + arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + } + return AppendOnlyFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, + arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + } + return KeyValueFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, + arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + } + + static Result> CreateSplitGenerator( + const std::shared_ptr& table_schema, const CoreOptions& core_options, + const ScanContext* context) { + auto source_split_target_size = core_options.GetSourceSplitTargetSize(); + auto source_split_open_file_cost = core_options.GetSourceSplitOpenFileCost(); + if (table_schema->PrimaryKeys().empty()) { + if (core_options.DataEvolutionEnabled()) { + return std::make_unique(source_split_target_size, + source_split_open_file_cost); + } + BucketMode bucket_mode = (core_options.GetBucket() == -1 ? BucketMode::BUCKET_UNAWARE + : BucketMode::HASH_FIXED); + return std::make_unique( + source_split_target_size, source_split_open_file_cost, bucket_mode); + } else { + // TODO(liancheng.lsz): support evolution + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_keys, + table_schema->TrimmedPrimaryKeys()); + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_pk_fields, + table_schema->GetFields(trimmed_primary_keys)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr key_comparator, + FieldsComparator::Create(trimmed_pk_fields, /*is_ascending_order=*/true)); + return std::make_unique( + source_split_target_size, source_split_open_file_cost, + core_options.DeletionVectorsEnabled(), core_options.GetMergeEngine(), + key_comparator); + } + } + + static Result> CreateIndexFileHandler( + const CoreOptions& core_options, const std::shared_ptr& path_factory, + const std::shared_ptr& memory_pool) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr index_manifest_file, + IndexManifestFile::Create( + core_options.GetFileSystem(), core_options.GetManifestFormat(), + core_options.GetManifestCompression(), path_factory, + core_options.GetBucket(), memory_pool, core_options)); + return std::make_unique( + core_options.GetFileSystem(), std::move(index_manifest_file), + std::make_shared(path_factory), + core_options.DeletionVectorsBitmap64(), memory_pool); + } +}; + +Result> NewDataTableScan(const std::shared_ptr& context); + +} // namespace + +Result> TableScan::Create(std::unique_ptr context) { + if (context == nullptr) { + return Status::Invalid("scan context is null pointer"); + } + if (context->GetMemoryPool() == nullptr) { + return Status::Invalid("memory pool is null pointer"); + } + if (context->GetExecutor() == nullptr) { + return Status::Invalid("executor is null pointer"); + } + + std::shared_ptr shared_context = std::move(context); + // load schema + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, + CoreOptions::FromMap(shared_context->GetOptions(), + shared_context->GetSpecificFileSystem())); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + SystemTableLoader::TryParsePath(shared_context->GetPath())); + if (system_table_path) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + SystemTableLoader::LoadFromPath(tmp_options.GetFileSystem(), shared_context->GetPath(), + shared_context->GetOptions())); + return system_table->NewScan(shared_context); + } + return NewDataTableScan(shared_context); +} + +namespace { + +Result> NewDataTableScan(const std::shared_ptr& context) { + PAIMON_ASSIGN_OR_RAISE( + CoreOptions tmp_options, + CoreOptions::FromMap(context->GetOptions(), context->GetSpecificFileSystem())); + std::string branch = BranchManager::NormalizeBranch(tmp_options.GetBranch()); + SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath(), branch); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, + schema_manager.Latest()); + if (latest_table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + const auto& table_schema = latest_table_schema.value(); + if (table_schema->Id() != TableSchema::FIRST_SCHEMA_ID && + !table_schema->PrimaryKeys().empty()) { + return Status::NotImplemented( + "do not support schema evolution in pk table while scan process"); + } + // merge options + auto options = table_schema->Options(); + for (const auto& [key, value] : context->GetOptions()) { + options[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + CoreOptions::FromMap(options, context->GetSpecificFileSystem())); + // validate options + if (core_options.GetBucket() == -1) { + if (!table_schema->PrimaryKeys().empty()) { + return Status::NotImplemented(fmt::format( + "do not support pk table bucket={} in scan process", core_options.GetBucket())); + } + } else if (core_options.GetBucket() < 1 && + !SchemaValidation::IsPostponeBucketTable(*table_schema, core_options.GetBucket())) { + return Status::Invalid( + fmt::format("do not support bucket={} in scan process", core_options.GetBucket())); + } + + // validate schema and scan filter + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + if (context->GetScanFilters() && context->GetScanFilters()->GetPredicate()) { + PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithSchema( + *arrow_schema, context->GetScanFilters()->GetPredicate(), + /*validate_field_idx=*/false)); + PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithLiterals( + context->GetScanFilters()->GetPredicate())); + } + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + core_options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + context->GetPath(), arrow_schema, table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(), + core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(), + external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(), + context->GetMemoryPool())); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_store_scan, + TableScanImpl::CreateFileStoreScan( + path_factory, arrow_schema, table_schema, core_options, + context->GetExecutor(), context->GetMemoryPool(), context.get())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr split_generator, + TableScanImpl::CreateSplitGenerator(table_schema, core_options, context.get())); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr index_file_handler, + TableScanImpl::CreateIndexFileHandler(core_options, path_factory, + context->GetMemoryPool())); + auto snapshot_reader = std::make_shared( + file_store_scan, path_factory, std::move(split_generator), std::move(index_file_handler)); + if (context->IsStreamingMode()) { + return std::make_unique(core_options, snapshot_reader); + } + auto batch_scan = + std::make_unique(/*pk_table=*/!table_schema->PrimaryKeys().empty(), + core_options, snapshot_reader, context->GetLimit()); + if (!core_options.DataEvolutionEnabled()) { + return batch_scan; + } + return std::make_unique( + context->GetPath(), snapshot_reader, std::move(batch_scan), context->GetGlobalIndexResult(), + core_options, context->GetMemoryPool(), context->GetExecutor()); +} + +} // namespace + +} // namespace paimon diff --git a/src/paimon/core/table/source/table_scan_test.cpp b/src/paimon/core/table/source/table_scan_test.cpp new file mode 100644 index 0000000..8e43ad4 --- /dev/null +++ b/src/paimon/core/table/source/table_scan_test.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/table/source/table_scan.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/defs.h" +#include "paimon/scan_context.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(TableScanTest, TestNoSnapshot) { + std::string path = paimon::test::GetDataDir() + + "/orc/append_table_with_nested_type.db/append_table_with_nested_type/"; + ScanContextBuilder builder(path); + builder.AddOption(Options::FILE_FORMAT, "orc"); + ASSERT_OK_AND_ASSIGN(auto context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_FALSE(plan->SnapshotId()); + ASSERT_TRUE(plan->Splits().empty()); +} + +TEST(TableScanTest, TestNonExistTable) { + std::string path = paimon::test::GetDataDir() + "/non-exist.db/non-exist/"; + ScanContextBuilder builder(path); + builder.AddOption(Options::FILE_FORMAT, "orc"); + ASSERT_OK_AND_ASSIGN(auto context, builder.Finish()); + ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(context)), "not found latest schema"); +} + +TEST(TableScanTest, TestNoSchemaEvolution) { + // do not bear schema evolution in scan + std::string path = + paimon::test::GetDataDir() + "/orc/pk_table_with_alter_table.db/pk_table_with_alter_table/"; + ScanContextBuilder builder(path); + builder.AddOption(Options::FILE_FORMAT, "orc"); + ASSERT_OK_AND_ASSIGN(auto context, builder.Finish()); + ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(context)), "do not support schema evolution"); +} + +} // namespace paimon::test