diff --git a/src/paimon/common/global_index/btree/btree_compatibility_test.cpp b/src/paimon/common/global_index/btree/btree_compatibility_test.cpp new file mode 100644 index 0000000..e5e95e9 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_compatibility_test.cpp @@ -0,0 +1,931 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "gtest/gtest.h" +#include "paimon/common/global_index/btree/btree_global_indexer.h" +#include "paimon/common/global_index/btree/btree_index_meta.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +class BTreeCompatibilityTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(fs_, FileSystemFactory::Get("local", "/", {})); + data_dir_ = GetDataDir() + "/global_index/btree/btree_compatibility_data"; + } + + struct CsvRecord { + int64_t row_id; + std::string key; // "NULL" if is_null + bool is_null; + }; + + std::string ReadFileAsString(const std::string& path) const { + EXPECT_OK_AND_ASSIGN(auto input, fs_->Open(path)); + EXPECT_OK_AND_ASSIGN(auto length, input->Length()); + std::string buffer(static_cast(length), '\0'); + EXPECT_OK_AND_ASSIGN([[maybe_unused]] auto bytes_read, + input->Read(buffer.data(), static_cast(length))); + return buffer; + } + + // Parse a CSV file into a vector of CsvRecord + std::vector ParseCsvFile(const std::string& csv_path) const { + std::vector records; + std::string content = ReadFileAsString(csv_path); + if (content.empty()) { + return records; + } + + std::istringstream iss(content); + std::string line; + // Skip header line: "row_id,key,is_null" + std::getline(iss, line); + + while (std::getline(iss, line)) { + if (line.empty()) { + continue; + } + std::istringstream ss(line); + std::string row_id_str, key_str, is_null_str; + std::getline(ss, row_id_str, ','); + std::getline(ss, key_str, ','); + std::getline(ss, is_null_str, ','); + + CsvRecord rec; + rec.row_id = std::stoll(row_id_str); + rec.key = key_str; + rec.is_null = (is_null_str == "true"); + records.push_back(rec); + } + return records; + } + + std::set CollectRowIds(const std::shared_ptr& result) const { + std::set ids; + EXPECT_OK_AND_ASSIGN(auto iter, result->CreateIterator()); + while (iter->HasNext()) { + ids.insert(iter->Next()); + } + return ids; + } + + std::set CollectMatchingRows(const std::vector& records, + std::function predicate) const { + std::set ids; + for (const auto& rec : records) { + if (predicate(rec)) { + ids.insert(rec.row_id); + } + } + return ids; + } + + Result> CreateReaderFromFiles( + const std::string& bin_path, const std::string& meta_path, + const std::shared_ptr& arrow_type) const { + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + PAIMON_ASSIGN_OR_RAISE(auto file_status, fs_->GetFileStatus(bin_path)); + auto file_size = static_cast(file_status->GetLen()); + + GlobalIndexIOMeta io_meta(bin_path, file_size, meta_bytes); + std::vector metas = {io_meta}; + + auto schema = arrow::schema({arrow::field("testField", arrow_type)}); + auto c_schema = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, c_schema.get())); + + auto file_reader = std::make_shared(fs_); + std::map options; + PAIMON_ASSIGN_OR_RAISE(auto indexer, BTreeGlobalIndexer::Create(options)); + return indexer->CreateReader(c_schema.get(), file_reader, metas, pool_); + } + + void RunIntQueries(const std::shared_ptr& reader, + const std::vector& records) const { + // VisitIsNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIsNotNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return !r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitEqual for the first non-null key + for (const auto& rec : records) { + if (!rec.is_null) { + int32_t key_val = std::stoi(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for the last non-null key + for (auto it = records.rbegin(); it != records.rend(); ++it) { + if (!it->is_null) { + int32_t key_val = std::stoi(it->key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for a non-existent key + { + Literal literal(static_cast(-999)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } + + int32_t mid_key = -1; + { + std::vector non_null_keys; + for (const auto& rec : records) { + if (!rec.is_null) { + non_null_keys.push_back(std::stoi(rec.key)); + } + } + ASSERT_FALSE(non_null_keys.empty()); + std::sort(non_null_keys.begin(), non_null_keys.end()); + mid_key = non_null_keys[non_null_keys.size() / 2]; + } + // VisitLessThan for a mid-range key + { + Literal literal(mid_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [mid_key](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) < mid_key; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitGreaterOrEqual for a mid-range key + { + Literal literal(mid_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [mid_key](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) >= mid_key; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIn for multiple keys + { + std::set unique_keys; + for (const auto& rec : records) { + if (!rec.is_null) { + unique_keys.insert(std::stoi(rec.key)); + } + } + ASSERT_GE(unique_keys.size(), 3); + auto it = unique_keys.begin(); + int32_t k1 = *it++; + int32_t k2 = *it++; + int32_t k3 = *it++; + std::vector in_literals = {Literal(k1), Literal(k2), Literal(k3)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [k1, k2, k3](const CsvRecord& r) { + if (r.is_null) { + return false; + } + int32_t v = std::stoi(r.key); + return v == k1 || v == k2 || v == k3; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitNotEqual for the first non-null key + for (const auto& rec : records) { + if (!rec.is_null) { + int32_t key_val = std::stoi(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) != key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + } + + // Run string-type queries against a reader with CSV records as ground truth + void RunStringQueries(const std::shared_ptr& reader, + const std::vector& records) const { + // VisitIsNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIsNotNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return !r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitEqual for the first non-null key + for (const auto& rec : records) { + if (!rec.is_null) { + Literal literal(FieldType::STRING, rec.key.c_str(), + static_cast(rec.key.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [&rec](const CsvRecord& r) { return !r.is_null && r.key == rec.key; }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for the last non-null key + for (auto it = records.rbegin(); it != records.rend(); ++it) { + if (!it->is_null) { + Literal literal(FieldType::STRING, it->key.c_str(), + static_cast(it->key.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [&it](const CsvRecord& r) { return !r.is_null && r.key == it->key; }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for a non-existent key + { + std::string non_existent = "zzz_non_existent_key"; + Literal literal(FieldType::STRING, non_existent.c_str(), + static_cast(non_existent.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } + + std::string mid_key; + { + std::vector non_null_keys; + for (const auto& rec : records) { + if (!rec.is_null) { + non_null_keys.push_back(rec.key); + } + } + ASSERT_FALSE(non_null_keys.empty()); + std::sort(non_null_keys.begin(), non_null_keys.end()); + mid_key = non_null_keys[non_null_keys.size() / 2]; + } + // VisitLessThan for a mid-range key + { + Literal literal(FieldType::STRING, mid_key.c_str(), + static_cast(mid_key.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [&mid_key](const CsvRecord& r) { return !r.is_null && r.key < mid_key; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitGreaterOrEqual for a mid-range key + { + Literal literal(FieldType::STRING, mid_key.c_str(), + static_cast(mid_key.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [&mid_key](const CsvRecord& r) { return !r.is_null && r.key >= mid_key; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitStartsWith + { + std::string prefix_str = "test_000"; + Literal literal(FieldType::STRING, prefix_str.c_str(), + static_cast(prefix_str.size())); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitStartsWith(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [&prefix_str](const CsvRecord& r) { + return !r.is_null && StringUtils::StartsWith(r.key, prefix_str); + }); + ASSERT_EQ(actual_ids, expected_ids); + } + } + + // Run float-type queries against a reader with CSV records as ground truth + void RunFloatQueries(const std::shared_ptr& reader, + const std::vector& records) const { + // VisitIsNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIsNotNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return !r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + // check special value + { + float value = -INFINITY; + Literal literal(value); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [](const CsvRecord& r) { return !r.is_null && r.key == "-infinity"; }); + ASSERT_EQ(actual_ids, expected_ids); + } + { + float value = INFINITY; + Literal literal(value); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [](const CsvRecord& r) { return !r.is_null && r.key == "+infinity"; }); + ASSERT_EQ(actual_ids, expected_ids); + } + { + Literal literal(static_cast(std::nan(""))); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [](const CsvRecord& r) { return !r.is_null && r.key == "NaN"; }); + ASSERT_EQ(actual_ids, expected_ids); + } + { + Literal literal(static_cast(-0.00f)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [](const CsvRecord& r) { return !r.is_null && r.key == "-0.00f"; }); + ASSERT_EQ(actual_ids, expected_ids); + } + { + Literal literal(static_cast(0.00f)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [](const CsvRecord& r) { return !r.is_null && r.key == "0.00f"; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitEqual for the first non-null key + for (const auto& rec : records) { + if (!rec.is_null && rec.key != "-infinity") { + float key_val = std::stof(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stof(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for the last non-null key + for (auto it = records.rbegin(); it != records.rend(); ++it) { + if (!it->is_null && it->key != "+infinity" && it->key != "NaN") { + float key_val = std::stof(it->key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stof(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + + // VisitEqual for a non-existent key + { + Literal literal(static_cast(-99.99)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } + + float mid_key = -1; + { + std::vector non_null_keys; + for (const auto& rec : records) { + if (!rec.is_null) { + non_null_keys.push_back(std::stof(rec.key)); + } + } + ASSERT_FALSE(non_null_keys.empty()); + mid_key = non_null_keys[non_null_keys.size() / 2]; + } + // VisitLessThan for a mid-range key + { + Literal literal(mid_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [mid_key](const CsvRecord& r) { + return !r.is_null && std::stof(r.key) < mid_key; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitGreaterOrEqual for a mid-range key + { + Literal literal(mid_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [mid_key](const CsvRecord& r) { + return !r.is_null && (std::stof(r.key) >= mid_key || r.key == "NaN"); + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIn for multiple keys + { + float k1 = -9.27f; + float k2 = 62.91f; + float k3 = 108.17f; + std::vector in_literals = {Literal(k1), Literal(k2), Literal(k3)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [k1, k2, k3](const CsvRecord& r) { + if (r.is_null) { + return false; + } + float v = std::stof(r.key); + return v == k1 || v == k2 || v == k3; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitNotEqual for the first non-null key + for (const auto& rec : records) { + if (!rec.is_null) { + float key_val = std::stof(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows( + records, [rec](const CsvRecord& r) { return !r.is_null && r.key != rec.key; }); + ASSERT_EQ(actual_ids, expected_ids); + break; + } + } + } + + class LocalGlobalIndexFileReader : public GlobalIndexFileReader { + public: + explicit LocalGlobalIndexFileReader(const std::shared_ptr& fs) : fs_(fs) {} + + Result> GetInputStream( + const std::string& file_path) const override { + return fs_->Open(file_path); + } + + private: + std::shared_ptr fs_; + }; + + private: + std::shared_ptr pool_; + std::shared_ptr fs_; + std::string data_dir_; +}; + +TEST_F(BTreeCompatibilityTest, ReadAndQueryIntData) { + for (int32_t count : {50, 100, 500, 1000, 5000}) { + std::string prefix = "btree_test_int_" + std::to_string(count); + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_EQ(static_cast(records.size()), count); + + ASSERT_OK_AND_ASSIGN(auto reader, + CreateReaderFromFiles(bin_path, meta_path, arrow::int32())); + RunIntQueries(reader, records); + } +} + +TEST_F(BTreeCompatibilityTest, ReadAndQueryStringData) { + for (int32_t count : {50, 100, 500, 1000, 5000}) { + std::string prefix = "btree_test_string_" + std::to_string(count); + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_EQ(static_cast(records.size()), count); + + ASSERT_OK_AND_ASSIGN(auto reader, + CreateReaderFromFiles(bin_path, meta_path, arrow::utf8())); + RunStringQueries(reader, records); + } +} + +TEST_F(BTreeCompatibilityTest, ReadAndQueryFloatData) { + int32_t count = 50; + std::string prefix = "btree_test_float_" + std::to_string(count); + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_EQ(static_cast(records.size()), count); + + ASSERT_OK_AND_ASSIGN(auto reader, CreateReaderFromFiles(bin_path, meta_path, arrow::float32())); + RunFloatQueries(reader, records); +} + +TEST_F(BTreeCompatibilityTest, AllNulls) { + std::string prefix = "btree_test_int_all_nulls"; + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_FALSE(records.empty()); + auto count = static_cast(records.size()); + + ASSERT_OK_AND_ASSIGN(auto reader, CreateReaderFromFiles(bin_path, meta_path, arrow::int32())); + + // All rows should be null + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + ASSERT_EQ(static_cast(actual_ids.size()), count); + for (int32_t i = 0; i < count; ++i) { + ASSERT_TRUE(actual_ids.count(i)); + } + } + + // No rows should be non-null + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } + + // VisitEqual should return empty for any key + { + Literal literal(static_cast(42)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } +} + +TEST_F(BTreeCompatibilityTest, NoNulls) { + std::string prefix = "btree_test_int_no_nulls"; + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_FALSE(records.empty()); + auto count = static_cast(records.size()); + + ASSERT_OK_AND_ASSIGN(auto reader, CreateReaderFromFiles(bin_path, meta_path, arrow::int32())); + + // No rows should be null + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } + + // All rows should be non-null + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + ASSERT_EQ(static_cast(actual_ids.size()), count); + } + + // VisitEqual for each unique key + { + std::set tested_keys; + for (const auto& rec : records) { + if (!rec.is_null && tested_keys.find(rec.key) == tested_keys.end()) { + tested_keys.insert(rec.key); + int32_t key_val = std::stoi(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + } + } + + int32_t max_key = 0; + for (const auto& rec : records) { + if (!rec.is_null) { + max_key = std::max(max_key, std::stoi(rec.key)); + } + } + + // VisitLessOrEqual for the max key should return all rows + { + Literal literal(max_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessOrEqual(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_EQ(static_cast(actual_ids.size()), count); + } + + // VisitGreaterThan for the max key should return empty + { + Literal literal(max_key); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterThan(literal)); + auto actual_ids = CollectRowIds(result); + ASSERT_TRUE(actual_ids.empty()); + } +} + +TEST_F(BTreeCompatibilityTest, DuplicateKeys) { + std::string prefix = "btree_test_int_duplicates"; + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_FALSE(records.empty()); + + ASSERT_OK_AND_ASSIGN(auto reader, CreateReaderFromFiles(bin_path, meta_path, arrow::int32())); + + // VisitIsNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitIsNotNull + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + auto actual_ids = CollectRowIds(result); + auto expected_ids = + CollectMatchingRows(records, [](const CsvRecord& r) { return !r.is_null; }); + ASSERT_EQ(actual_ids, expected_ids); + } + + // VisitEqual for each unique key + { + std::set tested_keys; + for (const auto& rec : records) { + if (!rec.is_null && tested_keys.find(rec.key) == tested_keys.end()) { + tested_keys.insert(rec.key); + int32_t key_val = std::stoi(rec.key); + Literal literal(key_val); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [key_val](const CsvRecord& r) { + return !r.is_null && std::stoi(r.key) == key_val; + }); + ASSERT_EQ(actual_ids, expected_ids); + } + } + } + + // VisitIn for keys 0, 5, 9 + { + std::vector in_literals = {Literal(static_cast(0)), + Literal(static_cast(5)), + Literal(static_cast(9))}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals)); + auto actual_ids = CollectRowIds(result); + auto expected_ids = CollectMatchingRows(records, [](const CsvRecord& r) { + if (r.is_null) { + return false; + } + int32_t v = std::stoi(r.key); + return v == 0 || v == 5 || v == 9; + }); + ASSERT_EQ(actual_ids, expected_ids); + } +} + +TEST_F(BTreeCompatibilityTest, MetaDeserialization) { + // Test int_50 meta + { + std::string meta_path = data_dir_ + "/btree_test_int_50.bin.meta"; + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + + auto meta = BTreeIndexMeta::Deserialize(meta_bytes, pool_.get()); + ASSERT_TRUE(meta); + + ASSERT_TRUE(meta->HasNulls()); + ASSERT_FALSE(meta->OnlyNulls()); + + ASSERT_TRUE(meta->FirstKey()); + ASSERT_OK_AND_ASSIGN(auto min_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->FirstKey()), + arrow::int32(), pool_.get())); + ASSERT_EQ(min_key, Literal(3)); + + ASSERT_TRUE(meta->LastKey()); + ASSERT_OK_AND_ASSIGN(auto max_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->LastKey()), + arrow::int32(), pool_.get())); + ASSERT_EQ(max_key, Literal(143)); + } + + // Test float_50 meta + { + std::string meta_path = data_dir_ + "/btree_test_float_50.bin.meta"; + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + + auto meta = BTreeIndexMeta::Deserialize(meta_bytes, pool_.get()); + ASSERT_TRUE(meta); + + ASSERT_TRUE(meta->HasNulls()); + ASSERT_FALSE(meta->OnlyNulls()); + + ASSERT_TRUE(meta->FirstKey()); + ASSERT_OK_AND_ASSIGN(auto min_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->FirstKey()), + arrow::float32(), pool_.get())); + ASSERT_EQ(min_key, Literal(static_cast(-INFINITY))); + + ASSERT_TRUE(meta->LastKey()); + ASSERT_OK_AND_ASSIGN(auto max_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->LastKey()), + arrow::float32(), pool_.get())); + ASSERT_EQ(max_key, Literal(static_cast(std::nan("")))); + } + + // Test all_nulls meta + { + std::string meta_path = data_dir_ + "/btree_test_int_all_nulls.bin.meta"; + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + + auto meta = BTreeIndexMeta::Deserialize(meta_bytes, pool_.get()); + ASSERT_TRUE(meta); + + ASSERT_TRUE(meta->HasNulls()); + ASSERT_TRUE(meta->OnlyNulls()); + ASSERT_FALSE(meta->FirstKey()); + ASSERT_FALSE(meta->LastKey()); + } + + // Test no_nulls meta + { + std::string meta_path = data_dir_ + "/btree_test_int_no_nulls.bin.meta"; + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + + auto meta = BTreeIndexMeta::Deserialize(meta_bytes, pool_.get()); + ASSERT_TRUE(meta); + + ASSERT_TRUE(meta->FirstKey()); + ASSERT_OK_AND_ASSIGN(auto min_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->FirstKey()), + arrow::int32(), pool_.get())); + ASSERT_EQ(min_key, Literal(4)); + + ASSERT_TRUE(meta->LastKey()); + ASSERT_OK_AND_ASSIGN(auto max_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->LastKey()), + arrow::int32(), pool_.get())); + ASSERT_EQ(max_key, Literal(158)); + } + + // Test string_50 meta + { + std::string meta_path = data_dir_ + "/btree_test_string_50.bin.meta"; + auto meta_str = ReadFileAsString(meta_path); + std::shared_ptr meta_bytes = Bytes::AllocateBytes(meta_str, pool_.get()); + + auto meta = BTreeIndexMeta::Deserialize(meta_bytes, pool_.get()); + ASSERT_TRUE(meta); + + ASSERT_TRUE(meta->HasNulls()); + ASSERT_FALSE(meta->OnlyNulls()); + + ASSERT_TRUE(meta->FirstKey()); + ASSERT_OK_AND_ASSIGN(auto min_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->FirstKey()), + arrow::utf8(), pool_.get())); + std::string min_key_str = "test_00000"; + ASSERT_EQ(min_key, Literal(FieldType::STRING, min_key_str.data(), min_key_str.size())); + + ASSERT_TRUE(meta->LastKey()); + ASSERT_OK_AND_ASSIGN(auto max_key, + KeySerializer::DeserializeKey(MemorySlice::Wrap(meta->LastKey()), + arrow::utf8(), pool_.get())); + std::string max_key_str = "test_00049"; + ASSERT_EQ(max_key, Literal(FieldType::STRING, max_key_str.data(), max_key_str.size())); + } +} + +TEST_F(BTreeCompatibilityTest, RowCountConsistency) { + std::vector>> test_cases = { + {"btree_test_int_50", arrow::int32()}, {"btree_test_int_100", arrow::int32()}, + {"btree_test_int_500", arrow::int32()}, {"btree_test_string_50", arrow::utf8()}, + {"btree_test_string_100", arrow::utf8()}, {"btree_test_int_all_nulls", arrow::int32()}, + {"btree_test_int_no_nulls", arrow::int32()}, {"btree_test_int_duplicates", arrow::int32()}, + }; + + for (const auto& [prefix, arrow_type] : test_cases) { + std::string bin_path = data_dir_ + "/" + prefix + ".bin"; + std::string meta_path = bin_path + ".meta"; + std::string csv_path = data_dir_ + "/" + prefix + ".csv"; + + auto records = ParseCsvFile(csv_path); + ASSERT_FALSE(records.empty()); + auto count = static_cast(records.size()); + + ASSERT_OK_AND_ASSIGN(auto reader, CreateReaderFromFiles(bin_path, meta_path, arrow_type)); + + ASSERT_OK_AND_ASSIGN(auto null_result, reader->VisitIsNull()); + auto null_ids = CollectRowIds(null_result); + + ASSERT_OK_AND_ASSIGN(auto non_null_result, reader->VisitIsNotNull()); + auto non_null_ids = CollectRowIds(non_null_result); + + // Null and non-null should be disjoint + for (auto id : null_ids) { + ASSERT_EQ(non_null_ids.count(id), 0u); + } + + // Total should equal record count + ASSERT_EQ(static_cast(null_ids.size() + non_null_ids.size()), count); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/global_index/btree/btree_global_index_factory.cpp b/src/paimon/common/global_index/btree/btree_global_index_factory.cpp new file mode 100644 index 0000000..3b4faa8 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_factory.cpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/global_index/btree/btree_global_index_factory.h" + +#include + +#include "paimon/common/global_index/btree/btree_global_indexer.h" +namespace paimon { + +const char BTreeGlobalIndexerFactory::IDENTIFIER[] = "btree-global"; + +Result> BTreeGlobalIndexerFactory::Create( + const std::map& options) const { + return BTreeGlobalIndexer::Create(options); +} + +REGISTER_PAIMON_FACTORY(BTreeGlobalIndexerFactory); + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_index_factory.h b/src/paimon/common/global_index/btree/btree_global_index_factory.h new file mode 100644 index 0000000..0d8c875 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_factory.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/global_index/global_indexer_factory.h" + +namespace paimon { +/// Factory for creating btree global indexers. +class BTreeGlobalIndexerFactory : public GlobalIndexerFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::map& options) const override; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.cpp b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp new file mode 100644 index 0000000..2ed8798 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/global_index/btree/btree_global_index_reader.h" + +#include "fmt/format.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/memory/memory_slice.h" +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/memory/bytes.h" +#include "paimon/predicate/literal.h" +namespace paimon { + +Result> BTreeGlobalIndexReader::Create( + const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, + const std::optional& min_key_slice, + const std::optional& max_key_slice, + const std::shared_ptr& key_type, const std::shared_ptr& pool) { + std::optional min_key; + std::optional max_key; + if (min_key_slice) { + PAIMON_ASSIGN_OR_RAISE( + min_key, KeySerializer::DeserializeKey(min_key_slice.value(), key_type, pool.get())); + } + if (max_key_slice) { + PAIMON_ASSIGN_OR_RAISE( + max_key, KeySerializer::DeserializeKey(max_key_slice.value(), key_type, pool.get())); + } + return std::shared_ptr(new BTreeGlobalIndexReader( + sst_file_reader, std::move(null_bitmap), std::move(min_key), std::move(max_key), + min_key_slice, max_key_slice, key_type, pool)); +} + +BTreeGlobalIndexReader::BTreeGlobalIndexReader( + const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, + std::optional min_key, std::optional max_key, + std::optional min_key_slice, std::optional max_key_slice, + const std::shared_ptr& key_type, const std::shared_ptr& pool) + : pool_(pool), + sst_file_reader_(sst_file_reader), + null_bitmap_(std::move(null_bitmap)), + min_key_(std::move(min_key)), + max_key_(std::move(max_key)), + min_key_slice_(std::move(min_key_slice)), + max_key_slice_(std::move(max_key_slice)), + key_type_(key_type), + comparator_(KeySerializer::CreateComparator(key_type, pool)) {} + +Result> BTreeGlobalIndexReader::VisitIsNotNull() { + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); +} + +Result> BTreeGlobalIndexReader::VisitIsNull() { + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->null_bitmap_; + }); +} + +Result> BTreeGlobalIndexReader::VisitStartsWith( + const Literal& prefix) { + if (prefix.IsNull()) { + return Status::Invalid("StartsWith pattern cannot be null"); + } + + if (prefix.GetType() == FieldType::STRING) { + auto prefix_str = prefix.GetValue(); + if (prefix_str.empty()) { + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); + } + + // Compute the exclusive upper bound for the prefix range. + // Increment the last byte; carry over if it overflows 0xFF. + std::string upper_str = prefix_str; + bool overflow = true; + for (int32_t i = static_cast(upper_str.size()) - 1; i >= 0 && overflow; --i) { + auto c = static_cast(upper_str[i]); + if (c < 0xFF) { + upper_str[i] = static_cast(c + 1); + overflow = false; + } else { + upper_str[i] = 0x00; + } + } + + if (!overflow) { + Literal upper_bound(FieldType::STRING, upper_str.data(), upper_str.size()); + return std::make_shared( + [reader = shared_from_this(), prefix = prefix, + upper_bound = std::move(upper_bound)]() -> Result { + return reader->RangeQuery(prefix, upper_bound, /*from_inclusive=*/true, + /*to_inclusive=*/false); + }); + } + + // All bytes were 0xFF, use max_key_ as upper bound + return std::make_shared( + [reader = shared_from_this(), prefix = prefix]() -> Result { + return reader->RangeQuery(prefix, reader->max_key_, /*from_inclusive=*/true, + /*to_inclusive=*/true); + }); + } + + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); +} + +Result> BTreeGlobalIndexReader::VisitEndsWith( + const Literal& suffix) { + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); +} + +Result> BTreeGlobalIndexReader::VisitContains( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); +} + +Result> BTreeGlobalIndexReader::VisitLike( + const Literal& literal) { + if (literal.IsNull()) { + return Status::Invalid("LIKE pattern cannot be null"); + } + if (literal.GetType() == FieldType::STRING) { + auto pattern = literal.GetValue(); + + bool is_prefix_pattern = false; + std::string prefix; + + size_t first_wildcard = pattern.find_first_of("_%"); + + if (first_wildcard != std::string::npos && pattern[first_wildcard] == '%' && + first_wildcard == pattern.length() - 1) { + is_prefix_pattern = true; + prefix = pattern.substr(0, first_wildcard); + } + + if (is_prefix_pattern) { + Literal prefix_literal(FieldType::STRING, prefix.data(), prefix.length()); + return VisitStartsWith(prefix_literal); + } + } + return std::make_shared( + [reader = shared_from_this()]() -> Result { + return reader->AllNonNullRows(); + }); +} + +Result> BTreeGlobalIndexReader::VisitLessThan( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + return reader->RangeQuery(reader->min_key_, literal, /*from_inclusive=*/true, + /*to_inclusive=*/false); + }); +} + +Result> BTreeGlobalIndexReader::VisitGreaterOrEqual( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + return reader->RangeQuery(literal, reader->max_key_, /*from_inclusive=*/true, + /*to_inclusive=*/true); + }); +} + +Result> BTreeGlobalIndexReader::VisitNotEqual( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + PAIMON_ASSIGN_OR_RAISE(RoaringBitmap64 result, reader->AllNonNullRows()); + PAIMON_ASSIGN_OR_RAISE(RoaringBitmap64 equal_result, + reader->RangeQuery(literal, literal, /*from_inclusive=*/true, + /*to_inclusive=*/true)); + result -= equal_result; + return result; + }); +} + +Result> BTreeGlobalIndexReader::VisitLessOrEqual( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + return reader->RangeQuery(reader->min_key_, literal, /*from_inclusive=*/true, + /*to_inclusive=*/true); + }); +} + +Result> BTreeGlobalIndexReader::VisitEqual( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + return reader->RangeQuery(literal, literal, /*from_inclusive=*/true, + /*to_inclusive=*/true); + }); +} + +Result> BTreeGlobalIndexReader::VisitGreaterThan( + const Literal& literal) { + return std::make_shared( + [reader = shared_from_this(), literal = literal]() -> Result { + return reader->RangeQuery(literal, reader->max_key_, /*from_inclusive=*/false, + /*to_inclusive=*/true); + }); +} + +Result> BTreeGlobalIndexReader::VisitIn( + const std::vector& literals) { + return std::make_shared( + [reader = shared_from_this(), literals = literals]() -> Result { + RoaringBitmap64 result; + for (const auto& literal : literals) { + PAIMON_ASSIGN_OR_RAISE(RoaringBitmap64 literal_result, + reader->RangeQuery(literal, literal, /*from_inclusive=*/true, + /*to_inclusive=*/true)); + result |= literal_result; + } + return result; + }); +} + +Result> BTreeGlobalIndexReader::VisitNotIn( + const std::vector& literals) { + return std::make_shared( + [reader = shared_from_this(), literals = literals]() -> Result { + PAIMON_ASSIGN_OR_RAISE(RoaringBitmap64 result, reader->AllNonNullRows()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr in_result, + reader->VisitIn(literals)); + auto* typed_in_result = dynamic_cast(in_result.get()); + if (!typed_in_result) { + return Status::Invalid( + "VisitIn should return BitmapGlobalIndexResult in BTreeGlobalIndexReader"); + } + PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap64* in_bitmap, typed_in_result->GetBitmap()); + result -= (*in_bitmap); + return result; + }); +} + +Result> BTreeGlobalIndexReader::VisitVectorSearch( + const std::shared_ptr& vector_search) { + return Status::Invalid("Vector search not supported in BTree index"); +} + +Result> BTreeGlobalIndexReader::VisitFullTextSearch( + const std::shared_ptr& full_text_search) { + return Status::Invalid("Full text search not supported in BTree index"); +} + +Result BTreeGlobalIndexReader::RangeQuery(const std::optional& from, + const std::optional& to, + bool from_inclusive, bool to_inclusive) { + if (!from || !to) { + return RoaringBitmap64(); + } + + // Create an index block iterator to iterate through data blocks + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr from_bytes, + KeySerializer::SerializeKey(from.value(), key_type_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr to_bytes, + KeySerializer::SerializeKey(to.value(), key_type_, pool_.get())); + MemorySlice from_slice = MemorySlice::Wrap(from_bytes); + MemorySlice to_slice = MemorySlice::Wrap(to_bytes); + + // Determine if we can skip lower/upper bound checks using cached serialized min/max keys. + // When from == min_key_, all entries are >= from, so skip lower bound comparison. + // When to == max_key_, all entries are <= to, so skip upper bound comparison. + bool skip_from_check = false; + bool skip_to_check = false; + if (min_key_slice_) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_min, comparator_(from_slice, min_key_slice_.value())); + skip_from_check = (cmp_min == 0) && from_inclusive; + } + if (max_key_slice_) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_max, comparator_(to_slice, max_key_slice_.value())); + skip_to_check = (cmp_max == 0) && to_inclusive; + } + + RoaringBitmap64 result; + + // Per-data-block row-id buffer. Collect row-ids within a single data + // block and flush them with one AddMany call. + std::vector block_row_ids; + + auto index_iterator = sst_file_reader_->CreateIndexIterator(); + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result, index_iterator->SeekTo(from_slice)); + + bool first_block = true; + // After SeekTo positions at the first entry >= from, only entries in the first + // block before the seek position could be < from. Once we pass them, all subsequent + // entries are guaranteed >= from, so we can skip the lower bound check. + bool passed_from_bound = skip_from_check; + + while (index_iterator->HasNext()) { + // Get the next data block + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr data_iterator, + sst_file_reader_->GetNextBlock(index_iterator)); + + if (!data_iterator || !data_iterator->HasNext()) { + assert(false); + break; + } + + // For the first block, we need to seek within the block to the exact position + if (first_block) { + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool found, data_iterator->SeekTo(from_slice)); + first_block = false; + } + + block_row_ids.clear(); + + if (skip_to_check && passed_from_bound) { + // Fast path: no boundary checks needed, skip key parsing entirely + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(MemorySlice value, data_iterator->SkipKeyAndReadValue()); + PAIMON_RETURN_NOT_OK(DeserializeRowIds(value, &block_row_ids)); + } + } else if (skip_to_check) { + // Only need to check lower bound (first few entries in first block) + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + if (!passed_from_bound) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); + if (!from_inclusive && cmp_from == 0) { + continue; + } + if (cmp_from > 0) { + passed_from_bound = true; + } + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + passed_from_bound = true; + } else if (passed_from_bound) { + // Only need to check upper bound + bool reached_end = false; + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); + if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { + reached_end = true; + break; + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); + } + if (reached_end) { + return result; + } + continue; + } else { + // Need to check both bounds + bool reached_end = false; + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + if (!passed_from_bound) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); + if (!from_inclusive && cmp_from == 0) { + continue; + } + if (cmp_from > 0) { + passed_from_bound = true; + } + } + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); + if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { + reached_end = true; + break; + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + passed_from_bound = true; + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); + } + if (reached_end) { + return result; + } + continue; + } + + // Flush this block's row-ids in one batched call + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); + } + } + + return result; +} + +Status BTreeGlobalIndexReader::DeserializeRowIds(const MemorySlice& slice, + std::vector* out) const { + auto input = slice.ToInput(); + PAIMON_ASSIGN_OR_RAISE(int32_t num_row_ids, input.ReadVarLenInt()); + if (num_row_ids <= 0) { + return Status::Invalid(fmt::format( + "Invalid row id length {} in DeserializeRowIds for BTreeGlobalIndexReader, must > 0", + num_row_ids)); + } + out->reserve(out->size() + static_cast(num_row_ids)); + for (int32_t i = 0; i < num_row_ids; i++) { + PAIMON_ASSIGN_OR_RAISE(int64_t row_id, input.ReadVarLenLong()); + out->push_back(row_id); + } + return Status::OK(); +} + +Result BTreeGlobalIndexReader::AllNonNullRows() { + // Traverse all data to avoid returning null values, which is very advantageous in + // situations where there are many null values + // TODO(xinyu.lxy) do not traverse all data if less null values + if (!min_key_) { + return RoaringBitmap64(); + } + return RangeQuery(min_key_, max_key_, /*from_inclusive=*/true, /*to_inclusive=*/true); +} + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.h b/src/paimon/common/global_index/btree/btree_global_index_reader.h new file mode 100644 index 0000000..1bdbe71 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.h @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/common/global_index/btree/btree_defs.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/sst/sst_file_reader.h" +#include "paimon/global_index/global_index_io_meta.h" +#include "paimon/global_index/global_index_reader.h" +#include "paimon/utils/roaring_bitmap64.h" +namespace paimon { + +/// Reader for BTree Global Index files. +/// This reader evaluates filter predicates against a BTree-based SST file +/// where each key maps to a list of row IDs. +class BTreeGlobalIndexReader : public GlobalIndexReader, + public std::enable_shared_from_this { + public: + static Result> Create( + const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, + const std::optional& min_key_slice, + const std::optional& max_key_slice, + const std::shared_ptr& key_type, const std::shared_ptr& pool); + + Result> VisitIsNotNull() override; + + Result> VisitIsNull() override; + + Result> VisitEqual(const Literal& literal) override; + + Result> VisitNotEqual(const Literal& literal) override; + + Result> VisitLessThan(const Literal& literal) override; + + Result> VisitLessOrEqual(const Literal& literal) override; + + Result> VisitGreaterThan(const Literal& literal) override; + + Result> VisitGreaterOrEqual(const Literal& literal) override; + + Result> VisitIn( + const std::vector& literals) override; + + Result> VisitNotIn( + const std::vector& literals) override; + + Result> VisitStartsWith(const Literal& prefix) override; + + Result> VisitEndsWith(const Literal& suffix) override; + + Result> VisitContains(const Literal& literal) override; + + Result> VisitLike(const Literal& literal) override; + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override; + + Result> VisitFullTextSearch( + const std::shared_ptr& full_text_search) override; + + bool IsThreadSafe() const override { + return false; + } + + std::string GetIndexType() const override { + return BtreeDefs::kIdentifier; + } + + private: + BTreeGlobalIndexReader(const std::shared_ptr& sst_file_reader, + RoaringBitmap64&& null_bitmap, std::optional min_key, + std::optional max_key, std::optional min_key_slice, + std::optional max_key_slice, + const std::shared_ptr& key_type, + const std::shared_ptr& pool); + + Result RangeQuery(const std::optional& from, + const std::optional& to, bool from_inclusive, + bool to_inclusive); + + Status DeserializeRowIds(const MemorySlice& slice, std::vector* out) const; + + Result AllNonNullRows(); + + std::shared_ptr pool_; + std::shared_ptr sst_file_reader_; + RoaringBitmap64 null_bitmap_; + std::optional min_key_; + std::optional max_key_; + /// Cached serialized min/max key slices to avoid repeated serialization in RangeQuery. + std::optional min_key_slice_; + std::optional max_key_slice_; + + std::shared_ptr key_type_; + MemorySlice::SliceComparator comparator_; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_index_writer.cpp b/src/paimon/common/global_index/btree/btree_global_index_writer.cpp new file mode 100644 index 0000000..2e3c99a --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_writer.cpp @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/global_index/btree/btree_global_index_writer.h" + +#include + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/common/global_index/btree/btree_defs.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/global_index/global_index_utils.h" +#include "paimon/common/memory/memory_slice_output.h" +#include "paimon/common/predicate/literal_converter.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/crc32c.h" +#include "paimon/common/utils/preconditions.h" +#include "paimon/memory/bytes.h" +namespace paimon { +Result> BTreeGlobalIndexWriter::Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, int32_t block_size, + const std::shared_ptr& compression_factory, + const std::shared_ptr& pool) { + auto key_field = arrow_type->GetFieldByName(field_name); + PAIMON_RETURN_NOT_OK(Preconditions::CheckNotNull( + key_field, + fmt::format("field {} not in arrow_array when Create BTreeGlobalIndexWriter", field_name))); + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, + file_writer->NewFileName(BtreeDefs::kIdentifier)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr output_stream, + file_writer->NewOutputStream(index_file_name)); + auto sst_file_writer = std::make_unique(output_stream, /*bloom_filter=*/nullptr, + block_size, compression_factory, pool); + return std::shared_ptr(new BTreeGlobalIndexWriter( + field_name, arrow_type, key_field->type(), file_writer, index_file_name, output_stream, + std::move(sst_file_writer), pool)); +} + +BTreeGlobalIndexWriter::BTreeGlobalIndexWriter( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& key_type, + const std::shared_ptr& file_writer, const std::string& index_file_name, + const std::shared_ptr& output_stream, std::unique_ptr&& sst_writer, + const std::shared_ptr& pool) + : field_name_(field_name), + arrow_type_(arrow_type), + key_type_(key_type), + pool_(pool), + file_writer_(file_writer), + index_file_name_(index_file_name), + output_stream_(output_stream), + sst_writer_(std::move(sst_writer)) {} + +Status BTreeGlobalIndexWriter::AddBatch(::ArrowArray* arrow_array, + std::vector&& relative_row_ids) { + PAIMON_RETURN_NOT_OK(GlobalIndexUtils::CheckRelativeRowIds( + arrow_array, relative_row_ids, /*expected_next_row_id=*/std::nullopt)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, + arrow::ImportArray(arrow_array, arrow_type_)); + auto struct_array = std::dynamic_pointer_cast(array); + PAIMON_RETURN_NOT_OK(Preconditions::CheckNotNull( + struct_array, "arrow array must be struct array when AddBatch to BTreeGlobalIndexWriter")); + auto value_array = struct_array->GetFieldByName(field_name_); + PAIMON_RETURN_NOT_OK(Preconditions::CheckNotNull( + value_array, + fmt::format("field {} not in arrow_array when AddBatch to BTreeGlobalIndexWriter", + field_name_))); + + // Process each element in the array + PAIMON_ASSIGN_OR_RAISE(std::vector literals, + LiteralConverter::ConvertLiteralsFromArray(*value_array, + /*own_data=*/true)); + for (size_t i = 0; i < literals.size(); ++i) { + int64_t row_id = relative_row_ids[i]; + const auto& literal = literals[i]; + if (literal.IsNull()) { + // Track null values + null_bitmap_.Add(row_id); + continue; + } + if (last_key_) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp, literal.CompareTo(last_key_.value())); + if (cmp > 0) { + PAIMON_RETURN_NOT_OK(Flush()); + } else if (cmp < 0) { + return Status::Invalid( + fmt::format("Users must keep written keys monotonically incremental in " + "BTreeGlobalIndexWriter, current literal {}, last_key {}", + literal.ToString(), last_key_.value().ToString())); + } + } + last_key_ = literal; + current_row_ids_.push_back(row_id); + if (!first_key_) { + first_key_ = literal; + } + } + return Status::OK(); +} + +Status BTreeGlobalIndexWriter::Flush() { + if (current_row_ids_.empty()) { + return Status::OK(); + } + MemorySliceOutput output(current_row_ids_.size() * 9 + 5, pool_.get()); + if (current_row_ids_.size() > INT32_MAX) { + return Status::Invalid("invalid row id numbers, exceed INT32_MAX"); + } + PAIMON_RETURN_NOT_OK(output.WriteVarLenInt(static_cast(current_row_ids_.size()))); + for (int64_t row_id : current_row_ids_) { + PAIMON_RETURN_NOT_OK(output.WriteVarLenLong(row_id)); + } + current_row_ids_.clear(); + assert(last_key_); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_bytes, + KeySerializer::SerializeKey(last_key_.value(), key_type_, pool_.get())); + return sst_writer_->Write(std::move(key_bytes), output.ToSlice().CopyBytes(pool_.get())); +} + +Result> BTreeGlobalIndexWriter::WriteNullBitmap( + const std::shared_ptr& out) { + if (null_bitmap_.IsEmpty()) { + return std::optional(); + } + std::shared_ptr bitmap_bytes = null_bitmap_.Serialize(pool_.get()); + uint32_t crc = CRC32C::calculate(bitmap_bytes->data(), bitmap_bytes->size()); + + MemorySliceOutput slice_out(bitmap_bytes->size() + 4, pool_.get()); + slice_out.WriteBytes(bitmap_bytes); + slice_out.WriteValue(static_cast(crc)); + + // Get current position for the block handle + PAIMON_ASSIGN_OR_RAISE(int64_t offset, out->GetPos()); + PAIMON_RETURN_NOT_OK(sst_writer_->WriteSlice(slice_out.ToSlice())); + return std::optional(BlockHandle(offset, bitmap_bytes->size())); +} + +Result> BTreeGlobalIndexWriter::Finish() { + // write remaining row ids + PAIMON_RETURN_NOT_OK(Flush()); + + // Flush any remaining data in the data block writer + PAIMON_RETURN_NOT_OK(sst_writer_->Flush()); + + // Write null bitmap first + PAIMON_ASSIGN_OR_RAISE(std::optional null_bitmap_handle, + WriteNullBitmap(output_stream_)); + // write bloom filter (currently is always null, but we could add it for equal + // and in condition.) + PAIMON_ASSIGN_OR_RAISE(std::optional bloom_filter_handle, + sst_writer_->WriteBloomFilter()); + // Write index block + PAIMON_ASSIGN_OR_RAISE(BlockHandle index_block_handle, sst_writer_->WriteIndexBlock()); + + // Write BTree file footer + auto footer = std::make_shared(bloom_filter_handle, index_block_handle, + null_bitmap_handle); + auto footer_slice = BTreeFileFooter::Write(footer, pool_.get()); + PAIMON_RETURN_NOT_OK(sst_writer_->WriteSlice(footer_slice)); + + PAIMON_RETURN_NOT_OK(output_stream_->Close()); + + if (!first_key_ && null_bitmap_.IsEmpty()) { + return Status::Invalid("Should never write an empty btree index file."); + } + + // Get file size + std::shared_ptr first_key_bytes; + std::shared_ptr last_key_bytes; + if (first_key_) { + PAIMON_ASSIGN_OR_RAISE(first_key_bytes, KeySerializer::SerializeKey( + first_key_.value(), key_type_, pool_.get())); + } + if (last_key_) { + PAIMON_ASSIGN_OR_RAISE( + last_key_bytes, KeySerializer::SerializeKey(last_key_.value(), key_type_, pool_.get())); + } + // Create index meta + auto index_meta = + std::make_shared(first_key_bytes, last_key_bytes, !null_bitmap_.IsEmpty()); + auto meta_bytes = index_meta->Serialize(pool_.get()); + + // Create GlobalIndexIOMeta + std::string file_path = file_writer_->ToPath(index_file_name_); + PAIMON_ASSIGN_OR_RAISE(int64_t file_size, file_writer_->GetFileSize(index_file_name_)); + GlobalIndexIOMeta io_meta(file_path, file_size, meta_bytes); + return std::vector{io_meta}; +} + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_index_writer.h b/src/paimon/common/global_index/btree/btree_global_index_writer.h new file mode 100644 index 0000000..f8c9ff9 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_index_writer.h @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "arrow/api.h" +#include "paimon/common/global_index/btree/btree_file_footer.h" +#include "paimon/common/global_index/btree/btree_index_meta.h" +#include "paimon/common/sst/sst_file_writer.h" +#include "paimon/global_index/global_index_writer.h" +#include "paimon/global_index/io/global_index_file_writer.h" +#include "paimon/predicate/literal.h" +#include "paimon/utils/roaring_bitmap64.h" +namespace paimon { + +/// Writer for BTree Global Index files. +/// This writer builds an SST file where each key maps to a list of row IDs. +/// Note that users must keep written keys monotonically incremental. All null keys are stored in a +/// separate bitmap, which will be serialized and appended to the file end on close. The layout is +/// as below: +/// +/// +-----------------------------------+------+ +/// | Footer | | +/// +-----------------------------------+ | +/// | Index Block | +--> Loaded on open +/// +-----------------------------------+ | +/// | Bloom Filter Block | | +/// +-----------------------------------+------+ +/// | Null Bitmap Block | | +/// +-----------------------------------+ | +/// | Data Block | | +/// +-----------------------------------+ +--> Loaded on requested +/// | ...... | | +/// +-----------------------------------+ | +/// | Data Block | | +/// +-----------------------------------+------+ +/// +/// For efficiency, we combine entries with the same keys and store a compact list of row ids for +/// each key. +class BTreeGlobalIndexWriter : public GlobalIndexWriter { + public: + /// Factory method that may fail during initialization (e.g., + /// Arrow schema import). Use this instead of the constructor. + static Result> Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, int32_t block_size, + const std::shared_ptr& compression_factory, + const std::shared_ptr& pool); + + ~BTreeGlobalIndexWriter() override = default; + + Status AddBatch(::ArrowArray* arrow_array, std::vector&& relative_row_ids) override; + + /// Finish writing and return the index metadata. + Result> Finish() override; + + private: + BTreeGlobalIndexWriter(const std::string& field_name, + const std::shared_ptr& arrow_type, + const std::shared_ptr& key_type, + const std::shared_ptr& file_writer, + const std::string& index_file_name, + const std::shared_ptr& output_stream, + std::unique_ptr&& sst_writer, + const std::shared_ptr& pool); + + Status Flush(); + + Result> WriteNullBitmap(const std::shared_ptr& out); + + private: + std::string field_name_; + std::shared_ptr arrow_type_; + std::shared_ptr key_type_; + std::shared_ptr pool_; + + std::shared_ptr file_writer_; + std::string index_file_name_; + std::shared_ptr output_stream_; + std::unique_ptr sst_writer_; + + std::optional first_key_; + std::optional last_key_; + + // Null bitmap tracking + RoaringBitmap64 null_bitmap_; + std::vector current_row_ids_; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_indexer.cpp b/src/paimon/common/global_index/btree/btree_global_indexer.cpp new file mode 100644 index 0000000..fd41008 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_indexer.cpp @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "paimon/common/global_index/btree/btree_global_indexer.h" + +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/compression/block_compression_factory.h" +#include "paimon/common/global_index/btree/btree_file_footer.h" +#include "paimon/common/global_index/btree/btree_global_index_writer.h" +#include "paimon/common/global_index/btree/btree_index_meta.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/global_index/btree/lazy_filtered_btree_reader.h" +#include "paimon/common/memory/memory_slice.h" +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/common/options/memory_size.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/crc32c.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/preconditions.h" +#include "paimon/core/options/compress_options.h" +#include "paimon/executor.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/memory/bytes.h" +#include "paimon/utils/roaring_bitmap64.h" +namespace paimon { +Result> BTreeGlobalIndexer::Create( + const std::map& options) { + // parse cache options + PAIMON_ASSIGN_OR_RAISE(std::string cache_size_str, OptionsUtils::GetValueFromMap( + options, BtreeDefs::kBtreeIndexCacheSize, + BtreeDefs::kDefaultBtreeIndexCacheSize)); + PAIMON_ASSIGN_OR_RAISE(int64_t cache_size, MemorySize::ParseBytes(cache_size_str)); + + PAIMON_ASSIGN_OR_RAISE( + double high_priority_pool_ratio, + OptionsUtils::GetValueFromMap(options, BtreeDefs::kBtreeIndexHighPriorityPoolRatio, + BtreeDefs::kDefaultBtreeIndexHighPriorityPoolRatio)); + auto cache_manager = std::make_shared(cache_size, high_priority_pool_ratio); + return std::unique_ptr(new BTreeGlobalIndexer(cache_manager, options)); +} + +Result> BTreeGlobalIndexer::CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_type, + arrow::ImportType(arrow_schema)); + // check data type + auto struct_type = std::dynamic_pointer_cast(arrow_type); + PAIMON_RETURN_NOT_OK(Preconditions::CheckNotNull( + struct_type, "arrow schema must be struct type when create BTreeGlobalIndexWriter")); + + // parse options + PAIMON_ASSIGN_OR_RAISE( + std::string block_size_str, + OptionsUtils::GetValueFromMap(options_, BtreeDefs::kBtreeIndexBlockSize, + BtreeDefs::kDefaultBtreeIndexBlockSize)); + PAIMON_ASSIGN_OR_RAISE(int64_t block_size, MemorySize::ParseBytes(block_size_str)); + if (block_size > INT32_MAX) { + return Status::Invalid("invalid block size, exceed INT32_MAX"); + } + PAIMON_ASSIGN_OR_RAISE( + std::string compress_str, + OptionsUtils::GetValueFromMap(options_, BtreeDefs::kBtreeIndexCompression, + BtreeDefs::kDefaultBtreeIndexCompression)); + PAIMON_ASSIGN_OR_RAISE( + int32_t compress_level, + OptionsUtils::GetValueFromMap(options_, BtreeDefs::kBtreeIndexCompressionLevel, + BtreeDefs::kDefaultBtreeIndexCompressionLevel)); + CompressOptions compress_options{compress_str, compress_level}; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr compression_factory, + BlockCompressionFactory::Create(compress_options)); + return BTreeGlobalIndexWriter::Create(field_name, struct_type, file_writer, + static_cast(block_size), compression_factory, + pool); +} + +Result> BTreeGlobalIndexer::CreateReader( + ::ArrowSchema* arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, const std::shared_ptr& pool) const { + // Get field type from arrow schema + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, + arrow::ImportSchema(arrow_schema)); + if (schema->num_fields() != 1) { + return Status::Invalid( + "invalid schema for BTreeGlobalIndexReader, supposed to have single field."); + } + auto key_type = schema->field(0)->type(); + + std::optional read_buffer_size; + if (auto iter = options_.find(BtreeDefs::kBtreeIndexReadBufferSize); iter != options_.end()) { + PAIMON_ASSIGN_OR_RAISE(int64_t tmp_buffer_size, MemorySize::ParseBytes(iter->second)); + if (tmp_buffer_size <= 0 || tmp_buffer_size > INT_MAX) { + return Status::Invalid( + fmt::format("In BTreeGlobalIndexer::CreateReader: option {} is {}, exceed INT_MAX " + "or less than 0", + BtreeDefs::kBtreeIndexReadBufferSize, iter->second)); + } + read_buffer_size = static_cast(tmp_buffer_size); + } + // TODO(lisizhuo.lsz): Allow users to specify an executor + std::shared_ptr executor = CreateDefaultExecutor(); + return std::make_shared(read_buffer_size, files, key_type, file_reader, + cache_manager_, pool, executor); +} + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_global_indexer.h b/src/paimon/common/global_index/btree/btree_global_indexer.h new file mode 100644 index 0000000..d1f5e92 --- /dev/null +++ b/src/paimon/common/global_index/btree/btree_global_indexer.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/global_index/btree/btree_defs.h" +#include "paimon/common/global_index/btree/btree_global_index_reader.h" +#include "paimon/common/sst/block_cache.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/utils/roaring_bitmap64.h" +namespace paimon { +/// The indexer for btree index. We do not build a B-tree directly in memory, instead, we form a +/// logical B-tree via multi-level metadata over SST files that store the actual data, as below: +/// +/// BTree-Index +/// / | +/// / ... | +/// / | +/// +--------------------------------------+ +------------+ +/// | SST File | | | +/// +--------------------------------------+ | | +/// | Root Index | | | +/// | / ... | | ... | SST File | +/// | Leaf Index ... Leaf Index | | | +/// | / ... | / ... | | | | +/// | DataBlock ... DataBlock | | | +/// +--------------------------------------+ +------------+ +/// +/// This approach significantly reduces memory pressure during index reads. + +class BTreeGlobalIndexer : public GlobalIndexer { + public: + static Result> Create( + const std::map& options); + + Result> CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const override; + + Result> CreateReader( + ::ArrowSchema* arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, + const std::shared_ptr& pool) const override; + + private: + BTreeGlobalIndexer(const std::shared_ptr& cache_manager, + const std::map& options) + : cache_manager_(cache_manager), options_(options) {} + + private: + std::shared_ptr cache_manager_; + std::map options_; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp new file mode 100644 index 0000000..bc66e17 --- /dev/null +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/global_index/btree/lazy_filtered_btree_reader.h" + +#include +#include + +#include "paimon/common/executor/future.h" +#include "paimon/common/global_index/btree/btree_file_footer.h" +#include "paimon/common/global_index/btree/btree_global_index_reader.h" +#include "paimon/common/global_index/btree/btree_index_meta.h" +#include "paimon/common/global_index/btree/key_serializer.h" +#include "paimon/common/global_index/union_global_index_reader.h" +#include "paimon/common/memory/memory_slice.h" +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/common/sst/block_cache.h" +#include "paimon/common/sst/sst_file_reader.h" +#include "paimon/common/utils/crc32c.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/io/buffered_input_stream.h" +#include "paimon/utils/roaring_bitmap64.h" + +namespace paimon { +LazyFilteredBTreeReader::LazyFilteredBTreeReader( + std::optional read_buffer_size, const std::vector& files, + const std::shared_ptr& key_type, + const std::shared_ptr& file_reader, + const std::shared_ptr& cache_manager, const std::shared_ptr& pool, + const std::shared_ptr& executor) + : read_buffer_size_(read_buffer_size), + pool_(pool), + file_selector_(files, key_type, pool), + key_type_(key_type), + file_reader_(file_reader), + cache_manager_(cache_manager), + executor_(executor) {} + +Result> LazyFilteredBTreeReader::VisitIsNotNull() { + return DispatchVisit( + [this]() { return file_selector_.VisitIsNotNull(); }, + [](const std::shared_ptr& reader) { return reader->VisitIsNotNull(); }); +} + +Result> LazyFilteredBTreeReader::VisitIsNull() { + return DispatchVisit( + [this]() { return file_selector_.VisitIsNull(); }, + [](const std::shared_ptr& reader) { return reader->VisitIsNull(); }); +} + +Result> LazyFilteredBTreeReader::VisitEqual( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitEqual(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitEqual(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitNotEqual( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitNotEqual(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitNotEqual(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitLessThan( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitLessThan(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitLessThan(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitLessOrEqual( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitLessOrEqual(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitLessOrEqual(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitGreaterThan( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitGreaterThan(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitGreaterThan(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitGreaterOrEqual( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitGreaterOrEqual(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitGreaterOrEqual(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitIn( + const std::vector& literals) { + return DispatchVisit([this, &literals]() { return file_selector_.VisitIn(literals); }, + [&literals](const std::shared_ptr& reader) { + return reader->VisitIn(literals); + }); +} + +Result> LazyFilteredBTreeReader::VisitNotIn( + const std::vector& literals) { + return DispatchVisit([this, &literals]() { return file_selector_.VisitNotIn(literals); }, + [&literals](const std::shared_ptr& reader) { + return reader->VisitNotIn(literals); + }); +} + +Result> LazyFilteredBTreeReader::VisitStartsWith( + const Literal& prefix) { + return DispatchVisit([this, &prefix]() { return file_selector_.VisitStartsWith(prefix); }, + [&prefix](const std::shared_ptr& reader) { + return reader->VisitStartsWith(prefix); + }); +} + +Result> LazyFilteredBTreeReader::VisitEndsWith( + const Literal& suffix) { + return DispatchVisit([this, &suffix]() { return file_selector_.VisitEndsWith(suffix); }, + [&suffix](const std::shared_ptr& reader) { + return reader->VisitEndsWith(suffix); + }); +} + +Result> LazyFilteredBTreeReader::VisitContains( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitContains(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitContains(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitLike( + const Literal& literal) { + return DispatchVisit([this, &literal]() { return file_selector_.VisitLike(literal); }, + [&literal](const std::shared_ptr& reader) { + return reader->VisitLike(literal); + }); +} + +Result> LazyFilteredBTreeReader::VisitVectorSearch( + const std::shared_ptr& vector_search) { + return Status::Invalid("LazyFilteredBTreeReader does not support vector search"); +} + +Result> LazyFilteredBTreeReader::VisitFullTextSearch( + const std::shared_ptr& full_text_search) { + return Status::Invalid("LazyFilteredBTreeReader does not support full text search"); +} + +Result> LazyFilteredBTreeReader::DispatchVisit( + SelectAction select_files, ReaderAction action) { + PAIMON_ASSIGN_OR_RAISE(std::vector selected_files, select_files()); + if (selected_files.empty()) { + return std::make_shared([]() { return RoaringBitmap64(); }); + } + + // Create a UnionGlobalIndexReader from cached readers for the selected files + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr union_reader, + CreateUnionReader(selected_files)); + + // Delegate the action to the union reader + return action(union_reader); +} + +Result> LazyFilteredBTreeReader::CreateUnionReader( + const std::vector& files) { + std::vector> readers; + readers.reserve(files.size()); + for (const auto& meta : files) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, GetOrCreateReader(meta)); + readers.push_back(std::move(reader)); + } + + return std::make_shared(std::move(readers), executor_); +} + +Result> LazyFilteredBTreeReader::GetOrCreateReader( + const GlobalIndexIOMeta& meta) { + auto iterator = reader_cache_.find(meta.file_path); + if (iterator != reader_cache_.end()) { + return iterator->second; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, CreateSingleReader(meta)); + reader_cache_[meta.file_path] = reader; + return reader; +} + +Result> LazyFilteredBTreeReader::CreateSingleReader( + const GlobalIndexIOMeta& meta) { + // Create comparator based on field type + auto comparator = KeySerializer::CreateComparator(key_type_, pool_); + + // Get min/max key slices from meta data (keep as slices; Create() will deserialize) + auto index_meta = BTreeIndexMeta::Deserialize(meta.metadata, pool_.get()); + std::optional min_key_slice; + std::optional max_key_slice; + if (index_meta->FirstKey()) { + min_key_slice = MemorySlice::Wrap(index_meta->FirstKey()); + } + if (index_meta->LastKey()) { + max_key_slice = MemorySlice::Wrap(index_meta->LastKey()); + } + + // Open input stream and create block cache + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr input_stream, + file_reader_->GetInputStream(meta.file_path)); + if (read_buffer_size_) { + input_stream = std::make_shared( + input_stream, read_buffer_size_.value(), pool_.get()); + } + + auto block_cache = + std::make_shared(meta.file_path, input_stream, cache_manager_, pool_); + + // Read footer + PAIMON_ASSIGN_OR_RAISE(MemorySegment footer_segment, + block_cache->GetBlock(meta.file_size - BTreeFileFooter::kEncodingLength, + BTreeFileFooter::kEncodingLength, true, + /*decompress_func=*/nullptr)); + auto footer_slice = MemorySlice::Wrap(footer_segment); + auto footer_input = footer_slice.ToInput(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr footer, + BTreeFileFooter::Read(&footer_input)); + + // Read null bitmap + PAIMON_ASSIGN_OR_RAISE(RoaringBitmap64 null_bitmap, + ReadNullBitmap(block_cache, footer->GetNullBitmapHandle())); + + // Create SST file reader + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr sst_file_reader, + SstFileReader::Create(footer->GetIndexBlockHandle(), footer->GetBloomFilterHandle(), + comparator, block_cache, pool_)); + + return BTreeGlobalIndexReader::Create(sst_file_reader, std::move(null_bitmap), min_key_slice, + max_key_slice, key_type_, pool_); +} + +Result LazyFilteredBTreeReader::ReadNullBitmap( + const std::shared_ptr& cache, const std::optional& block_handle) { + RoaringBitmap64 null_bitmap; + if (!block_handle.has_value()) { + return null_bitmap; + } + + // Read bytes and CRC value + PAIMON_ASSIGN_OR_RAISE( + MemorySegment segment, + cache->GetBlock(block_handle->Offset(), block_handle->Size() + 4, /*is_index=*/false, + /*decompress_func=*/nullptr)); + + auto slice = MemorySlice::Wrap(segment); + auto slice_input = slice.ToInput(); + + // Read null bitmap data + auto null_bitmap_bytes = slice_input.ReadSliceView(block_handle->Size()).CopyBytes(pool_.get()); + + // Calculate and verify CRC32C checksum + uint32_t calculated_crc = + CRC32C::calculate(null_bitmap_bytes->data(), null_bitmap_bytes->size()); + int32_t expected_crc = slice_input.ReadInt(); + if (calculated_crc != static_cast(expected_crc)) { + return Status::Invalid(fmt::format( + "CRC check failure during decoding null bitmap. Expected: {}, Calculated: {}", + expected_crc, calculated_crc)); + } + + // Deserialize null bitmap + PAIMON_RETURN_NOT_OK( + null_bitmap.Deserialize(null_bitmap_bytes->data(), null_bitmap_bytes->size())); + return null_bitmap; +} + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h new file mode 100644 index 0000000..0603e78 --- /dev/null +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/common/global_index/btree/btree_defs.h" +#include "paimon/common/global_index/btree/btree_file_meta_selector.h" +#include "paimon/common/io/cache/cache_manager.h" +#include "paimon/common/sst/block_cache.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/executor.h" +#include "paimon/global_index/global_index_reader.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/utils/roaring_bitmap64.h" + +namespace paimon { +class LazyFilteredBTreeReader : public GlobalIndexReader { + public: + LazyFilteredBTreeReader(std::optional read_buffer_size, + const std::vector& files, + const std::shared_ptr& key_type, + const std::shared_ptr& file_reader, + const std::shared_ptr& cache_manager, + const std::shared_ptr& pool, + const std::shared_ptr& executor); + + Result> VisitIsNotNull() override; + Result> VisitIsNull() override; + Result> VisitEqual(const Literal& literal) override; + Result> VisitNotEqual(const Literal& literal) override; + Result> VisitLessThan(const Literal& literal) override; + Result> VisitLessOrEqual(const Literal& literal) override; + Result> VisitGreaterThan(const Literal& literal) override; + Result> VisitGreaterOrEqual(const Literal& literal) override; + Result> VisitIn( + const std::vector& literals) override; + Result> VisitNotIn( + const std::vector& literals) override; + Result> VisitStartsWith(const Literal& prefix) override; + Result> VisitEndsWith(const Literal& suffix) override; + Result> VisitContains(const Literal& literal) override; + Result> VisitLike(const Literal& literal) override; + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override; + + Result> VisitFullTextSearch( + const std::shared_ptr& full_text_search) override; + + bool IsThreadSafe() const override { + return false; + } + + std::string GetIndexType() const override { + return BtreeDefs::kIdentifier; + } + + private: + using SelectAction = std::function>()>; + using ReaderAction = std::function>( + const std::shared_ptr&)>; + + Result> DispatchVisit(SelectAction select_files, + ReaderAction action); + Result> CreateUnionReader( + const std::vector& files); + Result> GetOrCreateReader(const GlobalIndexIOMeta& meta); + Result> CreateSingleReader(const GlobalIndexIOMeta& meta); + Result ReadNullBitmap(const std::shared_ptr& cache, + const std::optional& block_handle); + + private: + std::optional read_buffer_size_; + std::shared_ptr pool_; + BTreeFileMetaSelector file_selector_; + std::shared_ptr key_type_; + std::shared_ptr file_reader_; + std::shared_ptr cache_manager_; + std::map> reader_cache_; + std::shared_ptr executor_; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp new file mode 100644 index 0000000..655efa9 --- /dev/null +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/global_index/btree/lazy_filtered_btree_reader.h" + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/global_index/btree/btree_global_index_writer.h" +#include "paimon/common/global_index/btree/btree_global_indexer.h" +#include "paimon/executor.h" +#include "paimon/fs/file_system.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/global_index/io/global_index_file_writer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class FakeLazyFileWriter : public GlobalIndexFileWriter { + public: + FakeLazyFileWriter(const std::shared_ptr& fs, const std::string& base_path) + : fs_(fs), base_path_(base_path) {} + + Result NewFileName(const std::string& prefix) const override { + return prefix + "_" + std::to_string(file_counter_++); + } + + Result> NewOutputStream( + const std::string& file_name) const override { + return fs_->Create(base_path_ + "/" + file_name, true); + } + + Result GetFileSize(const std::string& file_name) const override { + PAIMON_ASSIGN_OR_RAISE(auto file_status, fs_->GetFileStatus(base_path_ + "/" + file_name)); + return static_cast(file_status->GetLen()); + } + + std::string ToPath(const std::string& file_name) const override { + return base_path_ + "/" + file_name; + } + + private: + std::shared_ptr fs_; + std::string base_path_; + mutable int64_t file_counter_ = 0; +}; + +class FakeLazyFileReader : public GlobalIndexFileReader { + public: + FakeLazyFileReader(const std::shared_ptr& fs, const std::string& base_path) + : fs_(fs), base_path_(base_path) {} + + Result> GetInputStream( + const std::string& file_path) const override { + return fs_->Open(file_path); + } + + private: + std::shared_ptr fs_; + std::string base_path_; +}; + +class LazyFilteredBTreeReaderTest : public ::testing::Test { + public: + void SetUp() override { + test_dir_ = UniqueTestDirectory::Create("local"); + pool_ = GetDefaultPool(); + fs_ = test_dir_->GetFileSystem(); + base_path_ = test_dir_->Str(); + file_writer_ = std::make_shared(fs_, base_path_); + + // Create 3 btree index files with different key ranges: + // + // file1: keys [1, 1, null, 2, 2], row_ids [0, 1, 2, 3, 4] + // range [1, 2], has_null=true + // + // file2: keys [5, 6, 7], row_ids [5, 6, 7] + // range [5, 7], has_null=false + // + // file3: keys [10, null, 12], row_ids [8, 9, 10] + // range [10, 12], has_null=true + all_metas_ = {}; + WriteSingleFile(R"([[1],[1],[null],[2],[2]])", {0, 1, 2, 3, 4}); + WriteSingleFile(R"([[5],[6],[7]])", {5, 6, 7}); + WriteSingleFile(R"([[10],[null],[12]])", {8, 9, 10}); + } + + void WriteSingleFile(const std::string& json_data, const std::vector& row_ids) { + auto c_schema = CreateArrowSchema(); + std::map options = {{BtreeDefs::kBtreeIndexBlockSize, "4096"}, + {BtreeDefs::kBtreeIndexCompression, "NONE"}}; + ASSERT_OK_AND_ASSIGN(auto indexer, BTreeGlobalIndexer::Create(options)); + ASSERT_OK_AND_ASSIGN( + auto writer, indexer->CreateWriter("int_field", c_schema.get(), file_writer_, pool_)); + auto array = arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_({arrow::field("int_field", arrow::int32())}), json_data) + .ValueOrDie(); + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + + ASSERT_OK(writer->AddBatch(&c_array, std::vector(row_ids))); + ASSERT_OK_AND_ASSIGN(auto metas, writer->Finish()); + ASSERT_EQ(metas.size(), 1); + all_metas_.push_back(metas[0]); + } + + std::unique_ptr CreateArrowSchema() const { + auto schema = arrow::schema({arrow::field("int_field", arrow::int32())}); + auto c_schema = std::make_unique(); + EXPECT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok()); + return c_schema; + } + + std::shared_ptr CreateReader( + const std::shared_ptr& executor = nullptr) const { + auto file_reader = std::make_shared(fs_, base_path_); + auto cache_manager = std::make_shared(1024 * 1024, 0.5); + return std::make_shared(/*read_buffer_size=*/std::nullopt, + all_metas_, arrow::int32(), file_reader, + cache_manager, pool_, executor); + } + + void CheckResult(const std::shared_ptr& result, + const std::vector& expected) const { + ASSERT_TRUE(result); + auto typed_result = std::dynamic_pointer_cast(result); + ASSERT_TRUE(typed_result); + ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap()); + ASSERT_TRUE(bitmap); + ASSERT_EQ(*bitmap, RoaringBitmap64::From(expected)) + << "result=" << bitmap->ToString() + << ", expected=" << RoaringBitmap64::From(expected).ToString(); + } + + void CheckEmpty(const std::shared_ptr& result) const { + ASSERT_TRUE(result); + auto typed_result = std::dynamic_pointer_cast(result); + ASSERT_TRUE(typed_result); + ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap()); + ASSERT_TRUE(bitmap); + ASSERT_TRUE(bitmap->IsEmpty()); + } + + private: + std::unique_ptr test_dir_; + std::shared_ptr pool_; + std::shared_ptr fs_; + std::string base_path_; + std::shared_ptr file_writer_; + std::vector all_metas_; +}; + +// --- VisitEqual --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitEqualHitSingleFile) { + auto reader = CreateReader(); + // key=1 is in file1 only -> rows 0, 1 + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_1)); + CheckResult(result, {0, 1}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitEqualHitDifferentFile) { + auto reader = CreateReader(); + // key=6 is in file2 only -> row 6 + Literal literal_6(6); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_6)); + CheckResult(result, {6}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitEqualNoMatch) { + auto reader = CreateReader(); + // key=99 is out of all ranges -> empty bitmap + Literal literal_99(99); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_99)); + CheckEmpty(result); +} + +// --- VisitNotEqual --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitNotEqual) { + auto reader = CreateReader(); + // NotEqual 5 -> all non-null rows except row 5 + // non-null rows: 0,1,3,4 (file1) + 5,6,7 (file2) + 8,10 (file3) + // minus row 5 (key=5) -> 0,1,3,4,6,7,8,10 + Literal literal_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(literal_5)); + CheckResult(result, {0, 1, 3, 4, 6, 7, 8, 10}); +} + +// --- VisitLessThan --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitLessThan) { + auto reader = CreateReader(); + // LessThan 5 -> keys 1,2 from file1 -> rows 0,1,3,4 + Literal literal_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal_5)); + CheckResult(result, {0, 1, 3, 4}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitLessThanNoMatch) { + auto reader = CreateReader(); + // LessThan 1 -> no key < 1 -> empty + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal_1)); + CheckEmpty(result); +} + +// --- VisitLessOrEqual --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitLessOrEqual) { + auto reader = CreateReader(); + // LessOrEqual 5 -> keys 1,2 from file1 + key 5 from file2 -> rows 0,1,3,4,5 + Literal literal_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessOrEqual(literal_5)); + CheckResult(result, {0, 1, 3, 4, 5}); +} + +// --- VisitGreaterThan --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitGreaterThan) { + auto reader = CreateReader(); + // GreaterThan 7 -> keys 10,12 from file3 -> rows 8,10 + Literal literal_7(7); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterThan(literal_7)); + CheckResult(result, {8, 10}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitGreaterThanNoMatch) { + auto reader = CreateReader(); + // GreaterThan 12 -> no key > 12 -> empty + Literal literal_12(12); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterThan(literal_12)); + CheckEmpty(result); +} + +// --- VisitGreaterOrEqual --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitGreaterOrEqual) { + auto reader = CreateReader(); + // GreaterOrEqual 6 -> keys 6,7 from file2 + keys 10,12 from file3 -> rows 6,7,8,10 + Literal literal_6(6); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(literal_6)); + CheckResult(result, {6, 7, 8, 10}); +} + +// --- VisitIn --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitInAcrossFiles) { + auto reader = CreateReader(); + // IN(2, 7) -> file1 has key 2 (rows 3,4), file2 has key 7 (row 7) + std::vector literals = {Literal(2), Literal(7)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(literals)); + CheckResult(result, {3, 4, 7}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitInNoMatch) { + auto reader = CreateReader(); + // IN(99, 100) -> out of all ranges -> empty + std::vector literals = {Literal(99), Literal(100)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(literals)); + CheckEmpty(result); +} + +// --- VisitNotIn --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitNotIn) { + auto reader = CreateReader(); + // NotIn(1, 5, 10) -> all non-null rows minus rows with key=1,5,10 + // non-null rows: 0,1,3,4 + 5,6,7 + 8,10 + // remove key=1 (rows 0,1), key=5 (row 5), key=10 (row 8) + // -> 3,4,6,7,10 + std::vector literals = {Literal(1), Literal(5), Literal(10)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotIn(literals)); + CheckResult(result, {3, 4, 6, 7, 10}); +} + +// --- VisitIsNull --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitIsNull) { + auto reader = CreateReader(); + // file1 has_null=true (row 2), file3 has_null=true (row 9) + // file2 has no nulls + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + CheckResult(result, {2, 9}); +} + +// --- VisitIsNotNull --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitIsNotNull) { + auto reader = CreateReader(); + // All non-null rows across all files: 0,1,3,4 + 5,6,7 + 8,10 + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + CheckResult(result, {0, 1, 3, 4, 5, 6, 7, 8, 10}); +} + +// --- VisitVectorSearch / VisitFullTextSearch --- + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitVectorSearchNotSupported) { + auto reader = CreateReader(); + ASSERT_NOK_WITH_MSG(reader->VisitVectorSearch(nullptr), + "LazyFilteredBTreeReader does not support vector search"); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestVisitFullTextSearchNotSupported) { + auto reader = CreateReader(); + ASSERT_NOK_WITH_MSG(reader->VisitFullTextSearch(nullptr), + "LazyFilteredBTreeReader does not support full text search"); +} + +// --- GetIndexType / IsThreadSafe --- + +TEST_F(LazyFilteredBTreeReaderTest, TestGetIndexType) { + auto reader = CreateReader(); + ASSERT_EQ(reader->GetIndexType(), BtreeDefs::kIdentifier); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestIsNotThreadSafe) { + auto reader = CreateReader(); + ASSERT_FALSE(reader->IsThreadSafe()); +} + +// --- Reader cache verification --- + +TEST_F(LazyFilteredBTreeReaderTest, TestReaderCacheReuse) { + auto reader = CreateReader(); + // Call VisitEqual twice on the same file's range to verify reader cache works + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result1, reader->VisitEqual(literal_1)); + CheckResult(result1, {0, 1}); + + Literal literal_2(2); + ASSERT_OK_AND_ASSIGN(auto result2, reader->VisitEqual(literal_2)); + CheckResult(result2, {3, 4}); +} + +// --- Empty files list --- + +TEST_F(LazyFilteredBTreeReaderTest, TestEmptyFilesList) { + std::vector empty_metas; + auto file_reader = std::make_shared(fs_, base_path_); + auto cache_manager = std::make_shared(1024 * 1024, 0.5); + auto reader = std::make_shared( + /*read_buffer_size=*/std::nullopt, empty_metas, arrow::int32(), file_reader, cache_manager, + pool_, /*executor=*/nullptr); + + // Any query on empty files should return empty bitmap + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_1)); + CheckEmpty(result); + + ASSERT_OK_AND_ASSIGN(result, reader->VisitIsNotNull()); + CheckEmpty(result); +} + +// ============================================================================ +// Parallel execution with Executor +// ============================================================================ + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitEqual) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // key=1 is in file1 -> rows 0, 1 + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_1)); + CheckResult(result, {0, 1}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitNotEqual) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // NotEqual 5 -> all non-null rows except row 5 -> 0,1,3,4,6,7,8,10 + Literal literal_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(literal_5)); + CheckResult(result, {0, 1, 3, 4, 6, 7, 8, 10}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitLessThan) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // LessThan 5 -> keys 1,2 from file1 -> rows 0,1,3,4 + Literal literal_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(literal_5)); + CheckResult(result, {0, 1, 3, 4}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitGreaterOrEqual) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // GreaterOrEqual 6 -> keys 6,7 from file2 + keys 10,12 from file3 -> rows 6,7,8,10 + Literal literal_6(6); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(literal_6)); + CheckResult(result, {6, 7, 8, 10}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitIn) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // IN(2, 7) -> file1 has key 2 (rows 3,4), file2 has key 7 (row 7) + std::vector literals = {Literal(2), Literal(7)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(literals)); + CheckResult(result, {3, 4, 7}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitIsNull) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // file1 has_null=true (row 2), file3 has_null=true (row 9) + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + CheckResult(result, {2, 9}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitIsNotNull) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // All non-null rows across all files: 0,1,3,4 + 5,6,7 + 8,10 + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + CheckResult(result, {0, 1, 3, 4, 5, 6, 7, 8, 10}); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelVisitEqualNoMatch) { + std::shared_ptr executor = CreateDefaultExecutor(); + auto reader = CreateReader(executor); + // key=99 out of range -> empty bitmap + Literal literal_99(99); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_99)); + CheckEmpty(result); +} + +TEST_F(LazyFilteredBTreeReaderTest, TestParallelEmptyFilesList) { + std::shared_ptr executor = CreateDefaultExecutor(); + std::vector empty_metas; + auto file_reader = std::make_shared(fs_, base_path_); + auto cache_manager = std::make_shared(1024 * 1024, 0.5); + auto reader = std::make_shared( + /*read_buffer_size=*/std::nullopt, empty_metas, arrow::int32(), file_reader, cache_manager, + pool_, executor); + Literal literal_1(1); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_1)); + CheckEmpty(result); +} + +} // namespace paimon::test