From ee910c270fedee35b610cd29e291f409419f4bbf Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Thu, 18 Jun 2026 10:49:59 +0800 Subject: [PATCH] feat: Migrate lookup file support for pk compact --- .../default_lookup_serializer_factory.h | 67 ++ .../core/mergetree/lookup/file_position.h | 28 + .../lookup/lookup_serializer_factory.h | 48 + .../lookup/persist_empty_processor.h | 59 ++ .../lookup/persist_position_processor.h | 79 ++ .../core/mergetree/lookup/persist_processor.h | 56 ++ .../lookup/persist_processor_test.cpp | 155 +++ .../lookup/persist_value_and_pos_processor.h | 107 ++ .../lookup/persist_value_processor.h | 93 ++ .../mergetree/lookup/positioned_key_value.h | 30 + src/paimon/core/mergetree/lookup_file.cpp | 104 ++ src/paimon/core/mergetree/lookup_file.h | 105 ++ .../core/mergetree/lookup_file_test.cpp | 225 +++++ src/paimon/core/mergetree/lookup_levels.cpp | 446 +++++++++ src/paimon/core/mergetree/lookup_levels.h | 156 +++ .../core/mergetree/lookup_levels_test.cpp | 946 ++++++++++++++++++ src/paimon/core/mergetree/lookup_utils.h | 121 +++ 17 files changed, 2825 insertions(+) create mode 100644 src/paimon/core/mergetree/lookup/default_lookup_serializer_factory.h create mode 100644 src/paimon/core/mergetree/lookup/file_position.h create mode 100644 src/paimon/core/mergetree/lookup/lookup_serializer_factory.h create mode 100644 src/paimon/core/mergetree/lookup/persist_empty_processor.h create mode 100644 src/paimon/core/mergetree/lookup/persist_position_processor.h create mode 100644 src/paimon/core/mergetree/lookup/persist_processor.h create mode 100644 src/paimon/core/mergetree/lookup/persist_processor_test.cpp create mode 100644 src/paimon/core/mergetree/lookup/persist_value_and_pos_processor.h create mode 100644 src/paimon/core/mergetree/lookup/persist_value_processor.h create mode 100644 src/paimon/core/mergetree/lookup/positioned_key_value.h create mode 100644 src/paimon/core/mergetree/lookup_file.cpp create mode 100644 src/paimon/core/mergetree/lookup_file.h create mode 100644 src/paimon/core/mergetree/lookup_file_test.cpp create mode 100644 src/paimon/core/mergetree/lookup_levels.cpp create mode 100644 src/paimon/core/mergetree/lookup_levels.h create mode 100644 src/paimon/core/mergetree/lookup_levels_test.cpp create mode 100644 src/paimon/core/mergetree/lookup_utils.h diff --git a/src/paimon/core/mergetree/lookup/default_lookup_serializer_factory.h b/src/paimon/core/mergetree/lookup/default_lookup_serializer_factory.h new file mode 100644 index 0000000..feb725c --- /dev/null +++ b/src/paimon/core/mergetree/lookup/default_lookup_serializer_factory.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/common/data/serializer/row_compacted_serializer.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +namespace paimon { +/// A `LookupSerializerFactory` using `RowCompactedSerializer`. +class DefaultLookupSerializerFactory : public LookupSerializerFactory { + public: + static constexpr char kVersion[] = "v1"; + + std::string Version() const override { + return kVersion; + } + + Result CreateSerializer(const std::shared_ptr& schema, + const std::shared_ptr& pool) const override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr serializer, + RowCompactedSerializer::Create(schema, pool)); + return LookupSerializerFactory::SerializeFunc( + [serializer = + std::move(serializer)](const InternalRow& row) -> Result> { + return serializer->SerializeToBytes(row); + }); + } + + Result CreateDeserializer( + const std::string& file_ser_version, const std::shared_ptr& current_schema, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const override { + if (Version() != file_ser_version) { + return Status::Invalid(fmt::format( + "file_ser_version {} mismatch DefaultLookupSerializerFactory version {}", + file_ser_version, Version())); + } + if (!ArrowUtils::EqualsIgnoreNullable(arrow::struct_(file_schema->fields()), + arrow::struct_(current_schema->fields()))) { + return Status::Invalid( + fmt::format("current_schema {} must be equal with file_schema {} in " + "DefaultLookupSerializerFactory", + current_schema->ToString(), file_schema->ToString())); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr serializer, + RowCompactedSerializer::Create(current_schema, pool)); + return LookupSerializerFactory::DeserializeFunc( + [serializer = std::move(serializer)](const std::shared_ptr& bytes) + -> Result> { return serializer->Deserialize(bytes); }); + } +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/file_position.h b/src/paimon/core/mergetree/lookup/file_position.h new file mode 100644 index 0000000..a2dea3a --- /dev/null +++ b/src/paimon/core/mergetree/lookup/file_position.h @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +namespace paimon { +/// File name and row position for DeletionVector. +struct FilePosition { + std::string file_name; + int64_t row_position; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/lookup_serializer_factory.h b/src/paimon/core/mergetree/lookup/lookup_serializer_factory.h new file mode 100644 index 0000000..b63ee26 --- /dev/null +++ b/src/paimon/core/mergetree/lookup/lookup_serializer_factory.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +#include "arrow/api.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +namespace paimon { +/// Factory to create serializer for lookup. +class LookupSerializerFactory { + public: + virtual ~LookupSerializerFactory() = default; + + virtual std::string Version() const = 0; + + using SerializeFunc = std::function>(const InternalRow&)>; + using DeserializeFunc = + std::function>(const std::shared_ptr&)>; + + virtual Result CreateSerializer( + const std::shared_ptr& schema, + const std::shared_ptr& pool) const = 0; + + virtual Result CreateDeserializer( + const std::string& file_ser_version, const std::shared_ptr& current_schema, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const = 0; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/persist_empty_processor.h b/src/paimon/core/mergetree/lookup/persist_empty_processor.h new file mode 100644 index 0000000..ece2b1c --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_empty_processor.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_processor.h" +namespace paimon { +/// A `PersistProcessor` to return `bool` only. +class PersistEmptyProcessor : public PersistProcessor { + public: + bool WithPosition() const override { + return false; + } + + Result> PersistToDisk(const KeyValue& kv) const override { + return Bytes::EmptyBytes(); + } + + Result ReadFromDisk(std::shared_ptr key, int32_t level, + const std::shared_ptr& value_bytes, + const std::string& file_name) const override { + return true; + } + + /// Factory to create `PersistProcessor`. + class Factory : public PersistProcessor::Factory { + public: + std::string Identifier() const override { + return "empty"; + } + + Result>> Create( + const std::string& file_ser_version, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const override { + return std::unique_ptr(new PersistEmptyProcessor()); + } + }; + + private: + PersistEmptyProcessor() = default; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/persist_position_processor.h b/src/paimon/core/mergetree/lookup/persist_position_processor.h new file mode 100644 index 0000000..9b209dc --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_position_processor.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/common/utils/var_length_int_utils.h" +#include "paimon/core/mergetree/lookup/file_position.h" +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_processor.h" + +namespace paimon { +/// A `PersistProcessor` to return `FilePosition`. +class PersistPositionProcessor : public PersistProcessor { + public: + bool WithPosition() const override { + return true; + } + + Result> PersistToDisk(const KeyValue& kv) const override { + return Status::Invalid( + "invalid operation, do not support persist to disk without position in " + "PersistPositionProcessor"); + } + + Result> PersistToDisk(const KeyValue& kv, + int64_t row_position) const override { + auto bytes = std::make_shared(VarLengthIntUtils::kMaxVarLongSize, pool_.get()); + PAIMON_ASSIGN_OR_RAISE(int64_t len, + VarLengthIntUtils::EncodeLong(row_position, bytes->data())); + std::shared_ptr copy_bytes = Bytes::CopyOf(*bytes, len, pool_.get()); + return copy_bytes; + } + + Result ReadFromDisk(std::shared_ptr key, int32_t level, + const std::shared_ptr& value_bytes, + const std::string& file_name) const override { + int32_t decode_offset = 0; + PAIMON_ASSIGN_OR_RAISE(int64_t row_position, + VarLengthIntUtils::DecodeLong(value_bytes->data(), &decode_offset)); + return FilePosition{file_name, row_position}; + } + + /// Factory to create `PersistProcessor`. + class Factory : public PersistProcessor::Factory { + public: + std::string Identifier() const override { + return "position"; + } + + Result>> Create( + const std::string& file_ser_version, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const override { + return std::unique_ptr(new PersistPositionProcessor(pool)); + } + }; + + private: + explicit PersistPositionProcessor(const std::shared_ptr& pool) : pool_(pool) {} + + private: + std::shared_ptr pool_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/persist_processor.h b/src/paimon/core/mergetree/lookup/persist_processor.h new file mode 100644 index 0000000..5d0b7bc --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_processor.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/core/key_value.h" +#include "paimon/memory/bytes.h" +namespace paimon { +class LookupSerializerFactory; +/// Processor to process value. +template +class PersistProcessor { + public: + virtual ~PersistProcessor() = default; + + virtual bool WithPosition() const = 0; + + virtual Result> PersistToDisk(const KeyValue& kv) const = 0; + + virtual Result> PersistToDisk(const KeyValue& kv, + int64_t row_position) const { + return Status::Invalid("Not support for PersistToDisk with position"); + } + + virtual Result ReadFromDisk(std::shared_ptr key, int32_t level, + const std::shared_ptr& value_bytes, + const std::string& file_name) const = 0; + + /// Factory to create `PersistProcessor`. + class Factory { + public: + virtual ~Factory() = default; + virtual std::string Identifier() const = 0; + + virtual Result>> Create( + const std::string& file_ser_version, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const = 0; + }; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/persist_processor_test.cpp b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp new file mode 100644 index 0000000..b0478a5 --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_processor_test.cpp @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/lookup/persist_processor.h" + +#include "gtest/gtest.h" +#include "paimon/core/mergetree/lookup/default_lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_empty_processor.h" +#include "paimon/core/mergetree/lookup/persist_position_processor.h" +#include "paimon/core/mergetree/lookup/persist_value_and_pos_processor.h" +#include "paimon/core/mergetree/lookup/persist_value_processor.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +class PersistProcessorTest : public testing::Test { + public: + void CheckResult(const KeyValue& kv) { + ASSERT_EQ(kv_.key, kv.key); + + ASSERT_EQ(kv.value->GetFieldCount(), 4); + ASSERT_FALSE(kv.value->IsNullAt(0)); + ASSERT_EQ(std::string(kv.value->GetStringView(0)), std::string("Alice")); + ASSERT_EQ(kv.value->GetInt(1), 10); + ASSERT_TRUE(kv.value->IsNullAt(2)); + ASSERT_EQ(kv.value->GetDouble(3), 10.1); + + ASSERT_EQ(kv_.level, kv.level); + ASSERT_EQ(kv_.sequence_number, kv.sequence_number); + ASSERT_EQ(*kv_.value_kind, *kv.value_kind); + } + + private: + std::shared_ptr pool_ = GetDefaultPool(); + KeyValue kv_ = KeyValue(RowKind::Insert(), /*sequence_number=*/500, /*level=*/4, /*key=*/ + BinaryRowGenerator::GenerateRowPtr({10}, pool_.get()), + /*value=*/ + BinaryRowGenerator::GenerateRowPtr( + {std::string("Alice"), 10, NullType(), 10.1}, pool_.get())); + std::shared_ptr file_schema_ = + arrow::schema({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}); + std::shared_ptr serializer_factory_ = + std::make_shared(); + std::string file_name_ = "test.file"; +}; + +TEST_F(PersistProcessorTest, TestEmptyProcessor) { + auto processor_factory = std::make_shared(); + ASSERT_EQ(processor_factory->Identifier(), "empty"); + ASSERT_OK_AND_ASSIGN(auto processor, + processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_)); + ASSERT_FALSE(processor->WithPosition()); + ASSERT_OK_AND_ASSIGN(auto bytes, processor->PersistToDisk(kv_)); + ASSERT_EQ(bytes->size(), 0); + ASSERT_OK_AND_ASSIGN(bool value, + processor->ReadFromDisk(kv_.key, kv_.level, bytes, file_name_)); + ASSERT_TRUE(value); + + ASSERT_NOK_WITH_MSG(processor->PersistToDisk(kv_, /*row_position=*/30), + "Not support for PersistToDisk with position"); +} + +TEST_F(PersistProcessorTest, TestPositionProcessor) { + auto processor_factory = std::make_shared(); + ASSERT_EQ(processor_factory->Identifier(), "position"); + ASSERT_OK_AND_ASSIGN(auto processor, + processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_)); + ASSERT_TRUE(processor->WithPosition()); + ASSERT_OK_AND_ASSIGN(auto bytes, processor->PersistToDisk(kv_, /*row_position=*/30)); + ASSERT_OK_AND_ASSIGN(FilePosition file_position, + processor->ReadFromDisk(kv_.key, kv_.level, bytes, file_name_)); + ASSERT_EQ(file_position.file_name, file_name_); + ASSERT_EQ(file_position.row_position, 30); + + ASSERT_NOK_WITH_MSG(processor->PersistToDisk(kv_), + "invalid operation, do not support persist to disk without position in " + "PersistPositionProcessor"); +} + +TEST_F(PersistProcessorTest, TestValueProcessor) { + auto processor_factory = std::make_shared(file_schema_); + ASSERT_EQ(processor_factory->Identifier(), "value"); + ASSERT_OK_AND_ASSIGN(auto processor, + processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_)); + ASSERT_FALSE(processor->WithPosition()); + ASSERT_OK_AND_ASSIGN(auto bytes, processor->PersistToDisk(kv_)); + ASSERT_OK_AND_ASSIGN(KeyValue kv, + processor->ReadFromDisk(kv_.key, kv_.level, bytes, file_name_)); + CheckResult(kv); + + ASSERT_NOK_WITH_MSG(processor->PersistToDisk(kv_, /*row_position=*/30), + "Not support for PersistToDisk with position"); +} + +TEST_F(PersistProcessorTest, TestInvalidValueProcessor) { + std::shared_ptr current_schema = + arrow::schema({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float32())}); + + auto processor_factory = std::make_shared(current_schema); + ASSERT_EQ(processor_factory->Identifier(), "value"); + // test schema not equal + ASSERT_NOK_WITH_MSG(processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_), + "f3: float must be equal with file_schema"); + // test version mismatch + ASSERT_NOK_WITH_MSG( + processor_factory->Create("invalid version", + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/current_schema, pool_), + "file_ser_version invalid version mismatch DefaultLookupSerializerFactory version v1"); +} + +TEST_F(PersistProcessorTest, TestValueAndPositionProcessor) { + auto processor_factory = std::make_shared(file_schema_); + ASSERT_EQ(processor_factory->Identifier(), "position-and-value"); + ASSERT_OK_AND_ASSIGN(auto processor, + processor_factory->Create(serializer_factory_->Version(), + /*serializer_factory=*/serializer_factory_, + /*file_schema=*/file_schema_, pool_)); + ASSERT_TRUE(processor->WithPosition()); + ASSERT_OK_AND_ASSIGN(auto bytes, processor->PersistToDisk(kv_, /*row_position=*/30)); + ASSERT_OK_AND_ASSIGN(PositionedKeyValue pos_kv, + processor->ReadFromDisk(kv_.key, kv_.level, bytes, file_name_)); + CheckResult(pos_kv.key_value); + ASSERT_EQ(pos_kv.file_name, file_name_); + ASSERT_EQ(pos_kv.row_position, 30); + + ASSERT_NOK_WITH_MSG(processor->PersistToDisk(kv_), + "invalid operation, do not support persist to disk without position in " + "PersistValueAndPosProcessor"); +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/lookup/persist_value_and_pos_processor.h b/src/paimon/core/mergetree/lookup/persist_value_and_pos_processor.h new file mode 100644 index 0000000..0120c30 --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_value_and_pos_processor.h @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_processor.h" +#include "paimon/core/mergetree/lookup/persist_value_processor.h" +#include "paimon/core/mergetree/lookup/positioned_key_value.h" +namespace paimon { +/// A `PersistProcessor` to return `PositionedKeyValue`. +class PersistValueAndPosProcessor : public PersistProcessor { + public: + static constexpr size_t kMetaLen = sizeof(int64_t) + sizeof(int64_t) + sizeof(int8_t); + + bool WithPosition() const override { + return true; + } + + Result> PersistToDisk(const KeyValue& kv) const override { + return Status::Invalid( + "invalid operation, do not support persist to disk without position in " + "PersistValueAndPosProcessor"); + } + + Result> PersistToDisk(const KeyValue& kv, + int64_t row_position) const override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr vbytes, serializer_(*(kv.value))); + auto bytes = std::make_shared(vbytes->size() + kMetaLen, pool_.get()); + auto segment = MemorySegment::Wrap(bytes); + segment.Put(0, *vbytes); + segment.PutValue(bytes->size() - kMetaLen, row_position); + segment.PutValue(bytes->size() - PersistValueProcessor::kMetaLen, + kv.sequence_number); + segment.Put(bytes->size() - 1, static_cast(kv.value_kind->ToByteValue())); + return bytes; + } + + Result ReadFromDisk(std::shared_ptr key, int32_t level, + const std::shared_ptr& value_bytes, + const std::string& file_name) const override { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr value, deserializer_(value_bytes)); + auto memory_segment = MemorySegment::Wrap(value_bytes); + auto row_position = memory_segment.GetValue(value_bytes->size() - kMetaLen); + auto sequence_number = + memory_segment.GetValue(value_bytes->size() - PersistValueProcessor::kMetaLen); + PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, + RowKind::FromByteValue((*value_bytes)[value_bytes->size() - 1])); + return PositionedKeyValue{ + KeyValue(row_kind, sequence_number, level, std::move(key), std::move(value)), file_name, + row_position}; + } + + /// Factory to create `PersistProcessor`. + class Factory : public PersistProcessor::Factory { + public: + explicit Factory(const std::shared_ptr& current_schema) + : current_schema_(current_schema) {} + + std::string Identifier() const override { + return "position-and-value"; + } + + Result>> Create( + const std::string& file_ser_version, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const override { + PAIMON_ASSIGN_OR_RAISE(LookupSerializerFactory::SerializeFunc serializer, + serializer_factory->CreateSerializer(current_schema_, pool)); + PAIMON_ASSIGN_OR_RAISE(LookupSerializerFactory::DeserializeFunc deserializer, + serializer_factory->CreateDeserializer( + file_ser_version, current_schema_, file_schema, pool)); + return std::unique_ptr(new PersistValueAndPosProcessor( + std::move(serializer), std::move(deserializer), pool)); + } + + private: + std::shared_ptr current_schema_; + }; + + private: + PersistValueAndPosProcessor(LookupSerializerFactory::SerializeFunc serializer, + LookupSerializerFactory::DeserializeFunc deserializer, + const std::shared_ptr& pool) + : pool_(pool), serializer_(std::move(serializer)), deserializer_(std::move(deserializer)) {} + + private: + std::shared_ptr pool_; + LookupSerializerFactory::SerializeFunc serializer_; + LookupSerializerFactory::DeserializeFunc deserializer_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/persist_value_processor.h b/src/paimon/core/mergetree/lookup/persist_value_processor.h new file mode 100644 index 0000000..5d73416 --- /dev/null +++ b/src/paimon/core/mergetree/lookup/persist_value_processor.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_processor.h" +namespace paimon { +/// A `PersistProcessor` to return `KeyValue`. +class PersistValueProcessor : public PersistProcessor { + public: + static constexpr size_t kMetaLen = sizeof(int64_t) + sizeof(int8_t); + + bool WithPosition() const override { + return false; + } + + Result> PersistToDisk(const KeyValue& kv) const override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr vbytes, serializer_(*(kv.value))); + auto bytes = std::make_shared(vbytes->size() + kMetaLen, pool_.get()); + auto segment = MemorySegment::Wrap(bytes); + segment.Put(0, *vbytes); + segment.PutValue(bytes->size() - kMetaLen, kv.sequence_number); + segment.Put(bytes->size() - 1, static_cast(kv.value_kind->ToByteValue())); + return bytes; + } + + Result ReadFromDisk(std::shared_ptr key, int32_t level, + const std::shared_ptr& value_bytes, + const std::string& file_name) const override { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr value, deserializer_(value_bytes)); + auto sequence_number = + MemorySegment::Wrap(value_bytes).GetValue(value_bytes->size() - kMetaLen); + PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, + RowKind::FromByteValue((*value_bytes)[value_bytes->size() - 1])); + return KeyValue(row_kind, sequence_number, level, std::move(key), std::move(value)); + } + + /// Factory to create `PersistProcessor`. + class Factory : public PersistProcessor::Factory { + public: + explicit Factory(const std::shared_ptr& current_schema) + : current_schema_(current_schema) {} + + std::string Identifier() const override { + return "value"; + } + + Result>> Create( + const std::string& file_ser_version, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& file_schema, + const std::shared_ptr& pool) const override { + PAIMON_ASSIGN_OR_RAISE(LookupSerializerFactory::SerializeFunc serializer, + serializer_factory->CreateSerializer(current_schema_, pool)); + PAIMON_ASSIGN_OR_RAISE(LookupSerializerFactory::DeserializeFunc deserializer, + serializer_factory->CreateDeserializer( + file_ser_version, current_schema_, file_schema, pool)); + return std::unique_ptr( + new PersistValueProcessor(std::move(serializer), std::move(deserializer), pool)); + } + + private: + std::shared_ptr current_schema_; + }; + + private: + PersistValueProcessor(LookupSerializerFactory::SerializeFunc serializer, + LookupSerializerFactory::DeserializeFunc deserializer, + const std::shared_ptr& pool) + : pool_(pool), serializer_(std::move(serializer)), deserializer_(std::move(deserializer)) {} + + private: + std::shared_ptr pool_; + LookupSerializerFactory::SerializeFunc serializer_; + LookupSerializerFactory::DeserializeFunc deserializer_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup/positioned_key_value.h b/src/paimon/core/mergetree/lookup/positioned_key_value.h new file mode 100644 index 0000000..7d60d7a --- /dev/null +++ b/src/paimon/core/mergetree/lookup/positioned_key_value.h @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/key_value.h" + +namespace paimon { +/// `KeyValue` with file name and row position for DeletionVector. +struct PositionedKeyValue { + KeyValue key_value; + std::string file_name; + int64_t row_position; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup_file.cpp b/src/paimon/core/mergetree/lookup_file.cpp new file mode 100644 index 0000000..e8336e8 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_file.cpp @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/lookup_file.h" + +#include "fmt/format.h" +#include "paimon/common/utils/binary_row_partition_computer.h" + +namespace paimon { +LookupFile::LookupFile(const std::shared_ptr& fs, const std::string& local_file, + int64_t file_size_bytes, int32_t level, int64_t schema_id, + const std::string& ser_version, std::unique_ptr&& reader, + Callback callback) + : fs_(fs), + local_file_(local_file), + file_size_bytes_(file_size_bytes), + level_(level), + schema_id_(schema_id), + ser_version_(ser_version), + reader_(std::move(reader)), + callback_(std::move(callback)) {} + +LookupFile::~LookupFile() { + if (!closed_) { + [[maybe_unused]] auto status = Close(); + } +} + +Result> LookupFile::GetResult(const std::shared_ptr& key) { + if (closed_) { + return Status::Invalid("GetResult failed in LookupFile, reader is closed"); + } + request_count_++; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr res, reader_->Lookup(key)); + if (res) { + hit_count_++; + } + return res; +} + +Status LookupFile::Close() { + closed_ = true; + if (callback_) { + callback_(); + } + PAIMON_RETURN_NOT_OK(reader_->Close()); + return fs_->Delete(local_file_, /*recursive=*/false); +} + +int64_t LookupFile::FileWeigh(const std::string& /*file_name*/, + const std::shared_ptr& lookup_file) { + if (!lookup_file || lookup_file->IsClosed()) { + return 0; + } + return lookup_file->file_size_bytes_; +} + +void LookupFile::RemovalCallback(const std::string& /*file_name*/, + const std::shared_ptr& lookup_file, + LookupFile::LookupFileCache::RemovalCause /*cause*/) { + if (lookup_file && !lookup_file->IsClosed()) { + [[maybe_unused]] auto status = lookup_file->Close(); + } +} + +std::shared_ptr LookupFile::CreateLookupFileCache( + int64_t file_retention_ms, int64_t max_disk_size) { + LookupFile::LookupFileCache::Options options; + options.expire_after_access_ms = file_retention_ms; + options.max_weight = max_disk_size; + options.weigh_func = &LookupFile::FileWeigh; + options.removal_callback = &LookupFile::RemovalCallback; + return std::make_shared(std::move(options)); +} + +Result LookupFile::LocalFilePrefix( + const std::shared_ptr& partition_type, const BinaryRow& partition, + int32_t bucket, const std::string& remote_file_name) { + if (partition.GetFieldCount() == 0) { + return fmt::format("{}-{}", std::to_string(bucket), remote_file_name); + } else { + PAIMON_ASSIGN_OR_RAISE(std::string part_str, BinaryRowPartitionComputer::PartToSimpleString( + partition_type, partition, + /*delimiter=*/"-", /*max_length=*/20)); + return fmt::format("{}-{}-{}", part_str, bucket, remote_file_name); + } +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup_file.h b/src/paimon/core/mergetree/lookup_file.h new file mode 100644 index 0000000..517e415 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_file.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/common/data/binary_row.h" +#include "paimon/common/lookup/lookup_store_factory.h" +#include "paimon/common/utils/generic_lru_cache.h" +#include "paimon/fs/file_system.h" + +namespace paimon { +/// Lookup file for cache remote file to local. +class LookupFile { + public: + using Callback = std::function; + /// Type alias for the global lookup file cache. + /// Key: data file name (string), Value: shared_ptr + /// Weight is measured in bytes (file size on disk). + using LookupFileCache = GenericLruCache>; + + LookupFile(const std::shared_ptr& fs, const std::string& local_file, + int64_t file_size_bytes, int32_t level, int64_t schema_id, + const std::string& ser_version, std::unique_ptr&& reader, + Callback callback); + + ~LookupFile(); + + const std::string& LocalFile() const { + return local_file_; + } + + int64_t SchemaId() const { + return schema_id_; + } + + const std::string& SerVersion() const { + return ser_version_; + } + + int32_t Level() const { + return level_; + } + + bool IsClosed() const { + return closed_; + } + + bool operator==(const LookupFile& other) const { + if (this == &other) { + return true; + } + return local_file_ == other.local_file_; + } + + Result> GetResult(const std::shared_ptr& key); + + Status Close(); + + /// Create a global LookupFileCache with the given retention and max disk size (in bytes). + static std::shared_ptr CreateLookupFileCache(int64_t file_retention_ms, + int64_t max_disk_size); + + static Result LocalFilePrefix(const std::shared_ptr& partition_type, + const BinaryRow& partition, int32_t bucket, + const std::string& remote_file_name); + + private: + /// Compute the weight of a lookup file in bytes for cache eviction. + static int64_t FileWeigh(const std::string& file_name, + const std::shared_ptr& lookup_file); + + /// Removal callback for the global LookupFileCache. + static void RemovalCallback(const std::string& file_name, + const std::shared_ptr& lookup_file, + LookupFileCache::RemovalCause cause); + + private: + std::shared_ptr fs_; + std::string local_file_; + int64_t file_size_bytes_ = 0; + int32_t level_; + int64_t schema_id_; + std::string ser_version_; + std::unique_ptr reader_; + Callback callback_; + int64_t request_count_ = 0; + int64_t hit_count_ = 0; + bool closed_ = false; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup_file_test.cpp b/src/paimon/core/mergetree/lookup_file_test.cpp new file mode 100644 index 0000000..a9a9051 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_file_test.cpp @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/lookup_file.h" + +#include "gtest/gtest.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(LookupFileTest, TestSimple) { + class FakeLookupStoreReader : public LookupStoreReader { + public: + FakeLookupStoreReader(const std::map& kvs, + std::shared_ptr& pool) + : pool_(pool), kvs_(kvs) {} + Result> Lookup(const std::shared_ptr& key) const override { + auto iter = kvs_.find(std::string(key->data(), key->size())); + if (iter == kvs_.end()) { + return std::shared_ptr(); + } + return std::make_shared(iter->second, pool_.get()); + } + Status Close() override { + return Status::OK(); + } + + private: + std::shared_ptr pool_; + std::map kvs_; + }; + auto pool = GetDefaultPool(); + auto tmp_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(tmp_dir); + auto fs = tmp_dir->GetFileSystem(); + std::string local_file = tmp_dir->Str() + "/test.file"; + ASSERT_OK(fs->WriteFile(local_file, "testdata", /*overwrite=*/false)); + ASSERT_TRUE(fs->Exists(local_file).value()); + + std::map kvs = {{"aa", "aa1"}, {"bb", "bb1"}}; + auto lookup_file = std::make_shared( + fs, local_file, /*file_size_bytes=*/0, /*level=*/3, /*schema_id=*/1, + /*ser_version=*/"v1", std::make_unique(kvs, pool), + /*callback=*/nullptr); + ASSERT_EQ(lookup_file->LocalFile(), local_file); + ASSERT_EQ(lookup_file->Level(), 3); + ASSERT_EQ(lookup_file->SchemaId(), 1); + ASSERT_EQ(lookup_file->SerVersion(), "v1"); + { + ASSERT_OK_AND_ASSIGN(auto value, + lookup_file->GetResult(std::make_shared("aa", pool.get()))); + ASSERT_TRUE(value); + ASSERT_EQ(std::string(value->data(), value->size()), "aa1"); + } + { + ASSERT_OK_AND_ASSIGN(auto value, + lookup_file->GetResult(std::make_shared("bb", pool.get()))); + ASSERT_TRUE(value); + ASSERT_EQ(std::string(value->data(), value->size()), "bb1"); + } + { + ASSERT_OK_AND_ASSIGN( + auto value, lookup_file->GetResult(std::make_shared("non-exist", pool.get()))); + ASSERT_FALSE(value); + } + ASSERT_FALSE(lookup_file->IsClosed()); + ASSERT_EQ(lookup_file->request_count_, 3); + ASSERT_EQ(lookup_file->hit_count_, 2); + + ASSERT_OK(lookup_file->Close()); + ASSERT_TRUE(lookup_file->IsClosed()); + ASSERT_FALSE(fs->Exists(local_file).value()); +} + +TEST(LookupFileTest, TestLocalFilePrefix) { + auto pool = GetDefaultPool(); + { + auto schema = arrow::schema({ + arrow::field("f0", arrow::utf8()), + arrow::field("f1", arrow::int32()), + }); + auto partition = BinaryRowGenerator::GenerateRow({std::string("20240731"), 10}, pool.get()); + ASSERT_OK_AND_ASSIGN(std::string ret, LookupFile::LocalFilePrefix( + schema, partition, /*bucket=*/3, "test.orc")); + ASSERT_EQ(ret, "20240731-10-3-test.orc"); + } + { + auto schema = arrow::schema({}); + auto partition = BinaryRow::EmptyRow(); + ASSERT_OK_AND_ASSIGN(std::string ret, LookupFile::LocalFilePrefix( + schema, partition, /*bucket=*/3, "test.orc")); + ASSERT_EQ(ret, "3-test.orc"); + } +} + +TEST(LookupFileTest, TestLookupFileCacheLifecycle) { + // This test covers: cache creation, put multiple entries, replacement, + // invalidation, weight-based eviction, and verifying local files are deleted. + auto pool = GetDefaultPool(); + auto tmp_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(tmp_dir); + auto fs = tmp_dir->GetFileSystem(); + + class FakeLookupStoreReader : public LookupStoreReader { + public: + Result> Lookup( + const std::shared_ptr& /*key*/) const override { + return std::shared_ptr(); + } + Status Close() override { + return Status::OK(); + } + }; + + std::vector call_back_files; + // Helper to create a local file with given size and return a LookupFile + auto make_lookup_file = [&](const std::string& name, + int64_t size) -> std::shared_ptr { + std::string path = tmp_dir->Str() + "/" + name; + std::string data(size, 'x'); + EXPECT_OK(fs->WriteFile(path, data, /*overwrite=*/false)); + LookupFile::Callback callback = [&call_back_files, name = name]() { + call_back_files.push_back(name); + }; + return std::make_shared( + fs, path, size, /*level=*/1, /*schema_id=*/0, + /*ser_version=*/"v1", std::make_unique(), std::move(callback)); + }; + + // Create a cache: max_weight = 300 bytes, no expiration + auto cache = LookupFile::CreateLookupFileCache(/*file_retention_ms=*/-1, /*max_disk_size=*/300); + ASSERT_EQ(cache->Size(), 0); + ASSERT_EQ(cache->GetCurrentWeight(), 0); + + // --- Phase 1: Put multiple entries --- + auto file_a = make_lookup_file("a.sst", 100); + auto file_b = make_lookup_file("b.sst", 100); + auto file_c = make_lookup_file("c.sst", 100); + std::string path_a = file_a->LocalFile(); + std::string path_b = file_b->LocalFile(); + std::string path_c = file_c->LocalFile(); + + ASSERT_OK(cache->Put("a", file_a)); + ASSERT_OK(cache->Put("b", file_b)); + ASSERT_OK(cache->Put("c", file_c)); + ASSERT_EQ(cache->Size(), 3); + ASSERT_EQ(cache->GetCurrentWeight(), 300); + + // All local files should exist + ASSERT_TRUE(fs->Exists(path_a).value()); + ASSERT_TRUE(fs->Exists(path_b).value()); + ASSERT_TRUE(fs->Exists(path_c).value()); + + // --- Phase 2: Replace an entry --- + // Replace "b" with a new file; old file_b should be closed and deleted + auto file_b2 = make_lookup_file("b2.sst", 80); + std::string path_b2 = file_b2->LocalFile(); + ASSERT_OK(cache->Put("b", file_b2)); + ASSERT_EQ(cache->Size(), 3); + ASSERT_EQ(cache->GetCurrentWeight(), 280); // 100 + 80 + 100 + + // Old b.sst should be deleted by RemovalCallback (REPLACED cause) + ASSERT_FALSE(fs->Exists(path_b).value()); + ASSERT_EQ(call_back_files, std::vector({"b.sst"})); + // New b2.sst should exist + ASSERT_TRUE(fs->Exists(path_b2).value()); + + // --- Phase 3: Weight-based eviction --- + // Add a large file that pushes total over 300 bytes + auto file_d = make_lookup_file("d.sst", 150); + std::string path_d = file_d->LocalFile(); + ASSERT_OK(cache->Put("d", file_d)); + // Total would be 100 + 80 + 100 + 150 = 430 > 300 + // LRU eviction should remove "a" first (least recently used), then "c" + // After eviction: weight should be 230 + ASSERT_EQ(cache->GetCurrentWeight(), 230); + + // "a" and "c" were LRU (inserted first, never accessed again), should be evicted and file + // deleted + ASSERT_FALSE(cache->GetIfPresent("a").has_value()); + ASSERT_FALSE(cache->GetIfPresent("c").has_value()); + ASSERT_EQ(call_back_files, std::vector({"b.sst", "a.sst", "c.sst"})); + ASSERT_FALSE(fs->Exists(path_a).value()); + ASSERT_FALSE(fs->Exists(path_c).value()); + + // "d" should be in cache + ASSERT_TRUE(cache->GetIfPresent("d").has_value()); + ASSERT_TRUE(fs->Exists(path_d).value()); + + // --- Phase 4: Explicit invalidation --- + // Invalidate "b" (the replaced entry) + cache->Invalidate("b"); + ASSERT_FALSE(cache->GetIfPresent("b").has_value()); + // b2.sst should be deleted + ASSERT_FALSE(fs->Exists(path_b2).value()); + ASSERT_EQ(call_back_files, std::vector({"b.sst", "a.sst", "c.sst", "b2.sst"})); + + // --- Phase 5: InvalidateAll --- + cache->InvalidateAll(); + ASSERT_EQ(cache->Size(), 0); + ASSERT_EQ(cache->GetCurrentWeight(), 0); + // d.sst should be deleted + ASSERT_FALSE(fs->Exists(path_d).value()); + std::vector> file_status_list; + ASSERT_OK(fs->ListDir(tmp_dir->Str(), &file_status_list)); + ASSERT_TRUE(file_status_list.empty()); + ASSERT_EQ(call_back_files, + std::vector({"b.sst", "a.sst", "c.sst", "b2.sst", "d.sst"})); +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/lookup_levels.cpp b/src/paimon/core/mergetree/lookup_levels.cpp new file mode 100644 index 0000000..92eca73 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_levels.cpp @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/lookup_levels.h" + +#include "fmt/format.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/key_value_data_file_record_reader.h" +#include "paimon/core/mergetree/lookup/file_position.h" +#include "paimon/core/mergetree/lookup/positioned_key_value.h" +#include "paimon/core/mergetree/lookup/remote_lookup_file_manager.h" +#include "paimon/core/mergetree/lookup_utils.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/result.h" +namespace paimon { +template +Result>> LookupLevels::Create( + const std::shared_ptr& fs, const BinaryRow& partition, int32_t bucket, + const CoreOptions& options, const std::shared_ptr& schema_manager, + const std::shared_ptr& io_manager, + const std::shared_ptr& path_factory, + const std::shared_ptr& table_schema, const std::shared_ptr& levels, + DeletionVector::Factory dv_factory, + const std::shared_ptr::Factory>& processor_factory, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& lookup_store_factory, + const std::shared_ptr& lookup_file_cache, + const std::shared_ptr& remote_lookup_file_manager, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::vector pk_fields, + table_schema->TrimmedPrimaryKeyFields()); + + auto pk_schema = DataField::ConvertDataFieldsToArrowSchema(pk_fields); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr key_serializer, + RowCompactedSerializer::Create(pk_schema, pool)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr key_comparator, + FieldsComparator::Create(pk_fields, /*is_ascending_order=*/true)); + + PAIMON_ASSIGN_OR_RAISE(std::vector partition_fields, + table_schema->GetFields(table_schema->PartitionKeys())); + auto partition_schema = DataField::ConvertDataFieldsToArrowSchema(partition_fields); + + // TODO(xinyu.lxy): set executor + ReadContextBuilder read_context_builder(path_factory->RootPath()); + read_context_builder.SetOptions(options.ToMap()) + .WithFileSystem(options.GetFileSystem()) + .EnablePrefetch(true) + .SetPrefetchMaxParallelNum(1) + .SetPrefetchBatchCount(3) + .WithMemoryPool(pool); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr read_context, + read_context_builder.Finish()); + // TODO(xinyu.lxy): temporarily disabled pre-buffer for parquet, which may cause high memory + // usage during compaction. Will fix via parquet format refactor. + auto new_options = options.ToMap(); + if (new_options.find("parquet.read.enable-pre-buffer") == new_options.end()) { + new_options["parquet.read.enable-pre-buffer"] = "false"; + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_read_context, + InternalReadContext::Create(read_context, table_schema, new_options)); + auto split_read = std::make_unique(path_factory, internal_read_context, pool, + CreateDefaultExecutor()); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + path_factory->CreateDataFilePathFactory(partition, bucket)); + + return std::unique_ptr(new LookupLevels( + fs, partition, bucket, options, schema_manager, io_manager, std::move(key_comparator), + data_file_path_factory, std::move(split_read), table_schema, partition_schema, pk_schema, + levels, dv_factory, processor_factory, std::move(key_serializer), serializer_factory, + lookup_store_factory, lookup_file_cache, remote_lookup_file_manager, pool)); +} +template +Result> LookupLevels::Lookup(const std::shared_ptr& key, + int32_t start_level) { + auto lookup = [this](const std::shared_ptr& key, + const SortedRun& level) -> Result> { + return this->Lookup(key, level); + }; + auto lookup_level0 = + [this](const std::shared_ptr& key, + const std::set, Levels::Level0Comparator>& level0) + -> Result> { return this->LookupLevel0(key, level0); }; + return LookupUtils::Lookup(*levels_, key, start_level, std::function(lookup), + std::function(lookup_level0)); +} + +template +Result> LookupLevels::LookupLevel0( + const std::shared_ptr& key, + const std::set, Levels::Level0Comparator>& level0) { + auto lookup = [this](const std::shared_ptr& key, + const std::shared_ptr& file) -> Result> { + return this->Lookup(key, file); + }; + return LookupUtils::LookupLevel0(key_comparator_, key, level0, std::function(lookup)); +} + +template +Result> LookupLevels::Lookup(const std::shared_ptr& key, + const SortedRun& level) { + auto lookup = [this](const std::shared_ptr& key, + const std::shared_ptr& file) -> Result> { + return this->Lookup(key, file); + }; + return LookupUtils::Lookup(key_comparator_, key, level, std::function(lookup)); +} + +template +LookupLevels::LookupLevels( + const std::shared_ptr& fs, const BinaryRow& partition, int32_t bucket, + const CoreOptions& options, const std::shared_ptr& schema_manager, + const std::shared_ptr& io_manager, + std::unique_ptr&& key_comparator, + const std::shared_ptr& data_file_path_factory, + std::unique_ptr&& split_read, + const std::shared_ptr& table_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& key_schema, const std::shared_ptr& levels, + DeletionVector::Factory dv_factory, + const std::shared_ptr::Factory>& processor_factory, + std::unique_ptr&& key_serializer, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& lookup_store_factory, + const std::shared_ptr& lookup_file_cache, + const std::shared_ptr& remote_lookup_file_manager, + const std::shared_ptr& pool) + : pool_(pool), + fs_(fs), + partition_(partition), + bucket_(bucket), + options_(options), + schema_manager_(schema_manager), + io_manager_(io_manager), + key_comparator_(std::move(key_comparator)), + data_file_path_factory_(data_file_path_factory), + split_read_(std::move(split_read)), + table_schema_(table_schema), + partition_schema_(partition_schema), + key_schema_(key_schema), + levels_(levels), + dv_factory_(dv_factory), + processor_factory_(processor_factory), + key_serializer_(std::move(key_serializer)), + serializer_factory_(serializer_factory), + lookup_store_factory_(lookup_store_factory), + lookup_file_cache_(lookup_file_cache), + remote_lookup_file_manager_(remote_lookup_file_manager) { + if constexpr (std::is_same_v) { + // if T is FilePosition, only read key fields to create sst file is enough + value_schema_ = key_schema_; + } else { + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + } + read_schema_ = SpecialFields::CompleteSequenceAndValueKindField(value_schema_); + levels_->AddDropFileCallback(this); +} + +template +LookupLevels::~LookupLevels() { + [[maybe_unused]] auto status = Close(); +} + +template +void LookupLevels::NotifyDropFile(const std::string& file) { + lookup_file_cache_->Invalidate(file); +} + +template +Result> LookupLevels::Lookup(const std::shared_ptr& key, + const std::shared_ptr& file) { + auto cached = lookup_file_cache_->GetIfPresent(file->file_name); + std::shared_ptr lookup_file; + bool new_created = false; + if (cached.has_value()) { + lookup_file = cached.value(); + } else { + PAIMON_ASSIGN_OR_RAISE(lookup_file, CreateLookupFile(file)); + new_created = true; + } + + // Ensure newly created lookup files are always added to cache, even on lookup error + if (new_created) { + PAIMON_RETURN_NOT_OK(AddLocalFile(file, lookup_file)); + } + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_bytes, + key_serializer_->SerializeToBytes(*key)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_bytes, lookup_file->GetResult(key_bytes)); + if (!value_bytes) { + return std::optional(); + } + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr> processor, + GetOrCreateProcessor(lookup_file->SchemaId(), lookup_file->SerVersion())); + PAIMON_ASSIGN_OR_RAISE( + T result, processor->ReadFromDisk(key, lookup_file->Level(), value_bytes, file->file_name)); + return std::optional(std::move(result)); +} + +template +Result> LookupLevels::CreateLookupFile( + const std::shared_ptr& file) { + PAIMON_ASSIGN_OR_RAISE( + std::string prefix, + LookupFile::LocalFilePrefix(partition_schema_, partition_, bucket_, file->file_name)); + PAIMON_ASSIGN_OR_RAISE(FileIOChannel::ID channel_id, io_manager_->CreateChannel(prefix)); + std::string kv_file_path = channel_id.GetPath(); + + int64_t schema_id = table_schema_->Id(); + std::string file_ser_version = serializer_factory_->Version(); + auto download_ser_version = TryToDownloadRemoteSst(file, kv_file_path); + if (download_ser_version.has_value()) { + // use schema id from remote file + schema_id = file->schema_id; + file_ser_version = download_ser_version.value(); + } else { + PAIMON_RETURN_NOT_OK(CreateSstFileFromDataFile(file, kv_file_path)); + } + + // Get file size for cache weight calculation + PAIMON_ASSIGN_OR_RAISE(auto file_status, fs_->GetFileStatus(kv_file_path)); + int64_t file_size = file_status->GetLen(); + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr reader, + lookup_store_factory_->CreateReader(fs_, kv_file_path, pool_)); + + // Callback to remove from own_cached_files_ when evicted from global cache + std::string file_name = file->file_name; + auto callback = [this, file_name = file_name]() { own_cached_files_.erase(file_name); }; + + return std::make_shared(fs_, kv_file_path, file_size, file->level, schema_id, + file_ser_version, std::move(reader), std::move(callback)); +} + +template +std::optional LookupLevels::RemoteSst( + const std::shared_ptr& file) const { + // Find the first extra file that ends with REMOTE_LOOKUP_FILE_SUFFIX + std::optional sst_file_name; + for (const auto& extra_file : file->extra_files) { + if (extra_file.has_value()) { + const std::string& name = extra_file.value(); + if (StringUtils::EndsWith(name, REMOTE_LOOKUP_FILE_SUFFIX)) { + sst_file_name = name; + break; + } + } + } + if (!sst_file_name.has_value()) { + return std::nullopt; + } + + // Parse: {fileName}.{length}.{processorId}.{serVersion}.lookup + // Split by '.' + const std::string& name = sst_file_name.value(); + std::vector parts = StringUtils::Split(name, "."); + // Need at least 3 parts: ...processorId.serVersion.lookup + if (parts.size() < 3) { + return std::nullopt; + } + + // parts[size-1] is "lookup", parts[size-2] is serVersion, parts[size-3] is processorId + const std::string& processor_id = parts[parts.size() - 3]; + if (processor_id != processor_factory_->Identifier()) { + return std::nullopt; + } + + const std::string& ser_version = parts[parts.size() - 2]; + return RemoteSstFile{name, ser_version}; +} + +template +std::string LookupLevels::NewRemoteSst(const std::shared_ptr& file, + int64_t length) const { + return fmt::format("{}.{}.{}.{}{}", file->file_name, std::to_string(length), + processor_factory_->Identifier(), serializer_factory_->Version(), + REMOTE_LOOKUP_FILE_SUFFIX); +} + +template +Status LookupLevels::AddLocalFile(const std::shared_ptr& file, + const std::shared_ptr& lookup_file) { + own_cached_files_.insert(file->file_name); + return lookup_file_cache_->Put(file->file_name, lookup_file); +} + +template +std::optional LookupLevels::TryToDownloadRemoteSst( + const std::shared_ptr& file, const std::string& local_file) { + if (remote_lookup_file_manager_ == nullptr) { + return std::nullopt; + } + auto remote_sst_file = RemoteSst(file); + if (!remote_sst_file.has_value()) { + return std::nullopt; + } + + const auto& remote_sst = remote_sst_file.value(); + + // Validate schema matched, no error status here + auto processor_result = GetOrCreateProcessor(file->schema_id, remote_sst.ser_version); + if (!processor_result.ok()) { + return std::nullopt; + } + + bool success = + remote_lookup_file_manager_->TryToDownload(file, remote_sst.sst_file_name, local_file); + if (!success) { + return std::nullopt; + } + + return remote_sst.ser_version; +} +template +Status LookupLevels::CreateSstFileFromDataFile(const std::shared_ptr& file, + const std::string& kv_file_path) { + if constexpr (std::is_same_v) { + // Short-circuit logic: if T is bool, just write empty lookup file. + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr bloom_filter, + LookupStoreFactory::BfGenerator(file->row_count, options_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr kv_writer, + lookup_store_factory_->CreateWriter(fs_, kv_file_path, bloom_filter, pool_)); + return kv_writer->Close(); + } + // Prepare reader to iterate KeyValue + PAIMON_ASSIGN_OR_RAISE( + std::vector> raw_readers, + split_read_->CreateRawFileReaders(partition_, {file}, read_schema_, + /*predicate=*/nullptr, dv_factory_, + /*row_ranges=*/std::nullopt, data_file_path_factory_)); + if (raw_readers.size() != 1) { + return Status::Invalid("Unexpected, CreateSstFileFromDataFile only create single reader"); + } + auto& raw_reader = raw_readers[0]; + auto reader = std::make_unique(std::move(raw_reader), key_schema_, + value_schema_, file->level, pool_); + + // Create processor to persist value + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr> processor, + GetOrCreateProcessor(table_schema_->Id(), serializer_factory_->Version())); + + // Prepare writer to write lookup file + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr bloom_filter, + LookupStoreFactory::BfGenerator(file->row_count, options_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr kv_writer, + lookup_store_factory_->CreateWriter(fs_, kv_file_path, bloom_filter, pool_)); + + ScopeGuard write_guard([&]() -> void { + [[maybe_unused]] auto status = kv_writer->Close(); + reader->Close(); + [[maybe_unused]] auto delete_status = fs_->Delete(kv_file_path, /*recursive=*/false); + }); + + // Read each KeyValue and write to lookup file with or without position. + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iter, + reader->NextBatch()); + if (iter == nullptr) { + break; + } + auto typed_iter = dynamic_cast(iter.get()); + assert(typed_iter); + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, typed_iter->HasNext()); + if (!has_next) { + break; + } + std::pair kv_and_pos; + PAIMON_ASSIGN_OR_RAISE(kv_and_pos, typed_iter->NextWithFilePos()); + const auto& [file_pos, kv] = kv_and_pos; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_bytes, + key_serializer_->SerializeToBytes(*(kv.key))); + std::shared_ptr value_bytes; + if (processor->WithPosition()) { + PAIMON_ASSIGN_OR_RAISE(value_bytes, processor->PersistToDisk(kv, file_pos)); + } else { + PAIMON_ASSIGN_OR_RAISE(value_bytes, processor->PersistToDisk(kv)); + } + PAIMON_RETURN_NOT_OK(kv_writer->Put(std::move(key_bytes), std::move(value_bytes))); + } + } + PAIMON_RETURN_NOT_OK(kv_writer->Close()); + write_guard.Release(); + return Status::OK(); +} +template +Result>> LookupLevels::GetOrCreateProcessor( + int64_t schema_id, const std::string& ser_version) { + auto key = std::make_pair(schema_id, ser_version); + auto iter = schema_id_and_ser_version_to_processors_.find(key); + if (iter != schema_id_and_ser_version_to_processors_.end()) { + return iter->second; + } + auto file_schema = value_schema_; + if (table_schema_->Id() != schema_id) { + PAIMON_ASSIGN_OR_RAISE(auto file_table_schema, schema_manager_->ReadSchema(schema_id)); + file_schema = DataField::ConvertDataFieldsToArrowSchema(file_table_schema->Fields()); + } + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr> processor, + processor_factory_->Create(ser_version, serializer_factory_, file_schema, pool_)); + schema_id_and_ser_version_to_processors_[key] = processor; + return processor; +} + +template +Status LookupLevels::Close() { + levels_->RemoveDropFileCallback(this); + // Move own_cached_files_ to a local copy before iterating. + // Invalidate triggers LookupFile::Close() -> callback -> own_cached_files_.erase(), + // which would invalidate iterators if we iterated over own_cached_files_ directly. + auto cached_files_copy = std::move(own_cached_files_); + own_cached_files_.clear(); + for (const auto& cached_file : cached_files_copy) { + lookup_file_cache_->Invalidate(cached_file); + } + return Status::OK(); +} + +template class LookupLevels; +template class LookupLevels; +template class LookupLevels; +template class LookupLevels; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup_levels.h b/src/paimon/core/mergetree/lookup_levels.h new file mode 100644 index 0000000..9562595 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_levels.h @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/common/data/serializer/row_compacted_serializer.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_data_file_record_reader.h" +#include "paimon/core/mergetree/lookup/lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_processor.h" +#include "paimon/core/mergetree/lookup_file.h" +#include "paimon/core/mergetree/lookup_utils.h" +#include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/result.h" + +namespace paimon { +class RemoteLookupFileManager; + +/// Remote sst file with serVersion. +struct RemoteSstFile { + std::string sst_file_name; + std::string ser_version; +}; + +/// Provide lookup by key. +template +class LookupLevels : public Levels::DropFileCallback { + public: + static Result>> Create( + const std::shared_ptr& fs, const BinaryRow& partition, int32_t bucket, + const CoreOptions& options, const std::shared_ptr& schema_manager, + const std::shared_ptr& io_manager, + const std::shared_ptr& path_factory, + const std::shared_ptr& table_schema, const std::shared_ptr& levels, + DeletionVector::Factory dv_factory, + const std::shared_ptr::Factory>& processor_factory, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& lookup_store_factory, + const std::shared_ptr& lookup_file_cache, + const std::shared_ptr& remote_lookup_file_manager, + const std::shared_ptr& pool); + + const std::shared_ptr& GetLevels() const { + return levels_; + } + + Result> Lookup(const std::shared_ptr& key, int32_t start_level); + + Result> LookupLevel0( + const std::shared_ptr& key, + const std::set, Levels::Level0Comparator>& level0); + + Result> Lookup(const std::shared_ptr& key, + const SortedRun& level); + + void NotifyDropFile(const std::string& file) override; + + std::optional RemoteSst(const std::shared_ptr& file) const; + + std::string NewRemoteSst(const std::shared_ptr& file, int64_t length) const; + + Result> CreateLookupFile(const std::shared_ptr& file); + + Status AddLocalFile(const std::shared_ptr& file, + const std::shared_ptr& lookup_file); + + ~LookupLevels() override; + + Status Close(); + + private: + LookupLevels(const std::shared_ptr& fs, const BinaryRow& partition, int32_t bucket, + const CoreOptions& options, const std::shared_ptr& schema_manager, + const std::shared_ptr& io_manager, + std::unique_ptr&& key_comparator, + const std::shared_ptr& data_file_path_factory, + std::unique_ptr&& split_read, + const std::shared_ptr& table_schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& key_schema, + const std::shared_ptr& levels, DeletionVector::Factory dv_factory, + const std::shared_ptr::Factory>& processor_factory, + std::unique_ptr&& key_serializer, + const std::shared_ptr& serializer_factory, + const std::shared_ptr& lookup_store_factory, + const std::shared_ptr& lookup_file_cache, + const std::shared_ptr& remote_lookup_file_manager, + const std::shared_ptr& pool); + + Result> Lookup(const std::shared_ptr& key, + const std::shared_ptr& file); + + Status CreateSstFileFromDataFile(const std::shared_ptr& file, + const std::string& kv_file_path); + + /// Try to download remote sst file. Returns the ser_version if successful. + std::optional TryToDownloadRemoteSst(const std::shared_ptr& file, + const std::string& local_file); + + Result>> GetOrCreateProcessor( + int64_t schema_id, const std::string& ser_version); + + static constexpr const char* REMOTE_LOOKUP_FILE_SUFFIX = ".lookup"; + + private: + std::shared_ptr pool_; + std::shared_ptr fs_; + BinaryRow partition_; + int32_t bucket_; + CoreOptions options_; + std::shared_ptr schema_manager_; + std::shared_ptr io_manager_; + std::shared_ptr key_comparator_; + std::shared_ptr data_file_path_factory_; + std::unique_ptr split_read_; + + std::shared_ptr table_schema_; + std::shared_ptr partition_schema_; + std::shared_ptr read_schema_; + std::shared_ptr key_schema_; + std::shared_ptr value_schema_; + std::shared_ptr levels_; + DeletionVector::Factory dv_factory_; + + std::shared_ptr::Factory> processor_factory_; + std::unique_ptr key_serializer_; + std::shared_ptr serializer_factory_; + std::shared_ptr lookup_store_factory_; + + std::shared_ptr lookup_file_cache_; + std::set own_cached_files_; + std::map, std::shared_ptr>> + schema_id_and_ser_version_to_processors_; + + std::shared_ptr remote_lookup_file_manager_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/lookup_levels_test.cpp b/src/paimon/core/mergetree/lookup_levels_test.cpp new file mode 100644 index 0000000..a136b62 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_levels_test.cpp @@ -0,0 +1,946 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/lookup_levels.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/compact/noop_compact_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/mergetree/compact/deduplicate_merge_function.h" +#include "paimon/core/mergetree/compact/reducer_merge_function_wrapper.h" +#include "paimon/core/mergetree/lookup/default_lookup_serializer_factory.h" +#include "paimon/core/mergetree/lookup/persist_value_and_pos_processor.h" +#include "paimon/core/mergetree/lookup/positioned_key_value.h" +#include "paimon/core/mergetree/merge_tree_writer.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +class LookupLevelsTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + arrow::FieldVector fields = { + arrow::field("key", arrow::int32()), + arrow::field("value", arrow::int32()), + }; + arrow_schema_ = arrow::schema(fields); + key_schema_ = arrow::schema({fields[0]}); + tmp_dir_ = UniqueTestDirectory::Create("local"); + dir_ = UniqueTestDirectory::Create("local"); + fs_ = dir_->GetFileSystem(); + noop_compact_manager_ = std::make_shared(); + io_manager_ = std::make_shared(tmp_dir_->Str(), tmp_dir_->GetFileSystem()); + } + + void TearDown() override {} + + Result> NewFiles(int32_t level, int64_t last_sequence_number, + const std::string& table_path, + const CoreOptions& options, + const std::string& src_array_str) const { + std::shared_ptr src_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(arrow_schema_->fields()), + src_array_str) + .ValueOrDie(); + + // prepare writer + PAIMON_ASSIGN_OR_RAISE(auto path_factory, CreateFileStorePathFactory(table_path, options)); + PAIMON_ASSIGN_OR_RAISE(auto data_path_factory, path_factory->CreateDataFilePathFactory( + BinaryRow::EmptyRow(), /*bucket=*/0)); + PAIMON_ASSIGN_OR_RAISE(auto key_comparator, CreateKeyComparator()); + auto mfunc = std::make_unique(/*ignore_delete=*/false); + auto merge_function_wrapper = + std::make_shared(std::move(mfunc)); + + PAIMON_ASSIGN_OR_RAISE( + auto writer, MergeTreeWriter::Create( + /*last_sequence_number=*/last_sequence_number, + std::vector({"key"}), data_path_factory, key_comparator, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper, + /*schema_id=*/0, arrow_schema_, options, noop_compact_manager_, + /*io_manager=*/nullptr, /*enable_multi_thread_spill=*/false, pool_)); + + // write data + ArrowArray c_src_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*src_array, &c_src_array)); + RecordBatchBuilder batch_builder(&c_src_array); + batch_builder.SetBucket(0); + PAIMON_ASSIGN_OR_RAISE(auto batch, batch_builder.Finish()); + PAIMON_RETURN_NOT_OK(writer->Write(std::move(batch))); + // get file meta + PAIMON_ASSIGN_OR_RAISE(auto commit_increment, + writer->PrepareCommit(/*wait_compaction=*/false)); + const auto& file_metas = commit_increment.GetNewFilesIncrement().NewFiles(); + EXPECT_EQ(file_metas.size(), 1); + auto file_meta = file_metas[0]; + file_meta->level = level; + return file_meta; + } + + Result> CreateKeyComparator() const { + std::vector key_fields = {DataField(0, key_schema_->field(0))}; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_comparator, + FieldsComparator::Create(key_fields, + /*is_ascending_order=*/true)); + return key_comparator; + } + + Result CreateTable(const std::map& options) const { + ::ArrowSchema c_schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema_, &c_schema)); + + PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir_->Str(), {})); + PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &c_schema, + /*partition_keys=*/{}, + /*primary_keys=*/{"key"}, options, + /*ignore_if_exists=*/false)); + return PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + } + + Result> CreateFileStorePathFactory( + const std::string& table_path, const CoreOptions& options) const { + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + table_path, arrow_schema_, /*partition_keys=*/{}, options.GetPartitionDefaultName(), + options.GetFileFormat()->Identifier(), options.DataFilePrefix(), + options.LegacyPartitionNameEnabled(), external_paths, global_index_external_path, + options.IndexFileInDataFileDir(), pool_)); + return path_factory; + } + + Result>> CreateLookupLevels( + const std::string& table_path, const std::shared_ptr& levels, + std::shared_ptr lookup_file_cache = nullptr) const { + auto schema_manager = std::make_shared(fs_, table_path); + PAIMON_ASSIGN_OR_RAISE(auto table_schema, schema_manager->ReadSchema(0)); + PAIMON_ASSIGN_OR_RAISE(CoreOptions options, CoreOptions::FromMap(table_schema->Options())); + + auto processor_factory = + std::make_shared(arrow_schema_); + auto serializer_factory = std::make_shared(); + PAIMON_ASSIGN_OR_RAISE(auto key_comparator, + RowCompactedSerializer::CreateSliceComparator(key_schema_, pool_)); + PAIMON_ASSIGN_OR_RAISE( + auto lookup_store_factory, + LookupStoreFactory::Create(key_comparator, + std::make_shared(1024 * 1024, 0.0), options)); + PAIMON_ASSIGN_OR_RAISE(auto path_factory, CreateFileStorePathFactory(table_path, options)); + if (!lookup_file_cache) { + lookup_file_cache = LookupFile::CreateLookupFileCache( + options.GetLookupCacheFileRetentionMs(), options.GetLookupCacheMaxDiskSize()); + } + return LookupLevels::Create( + fs_, BinaryRow::EmptyRow(), /*bucket=*/0, options, schema_manager, io_manager_, + path_factory, table_schema, levels, + /*dv_factory=*/{}, processor_factory, serializer_factory, lookup_store_factory, + lookup_file_cache, + /*remote_lookup_file_manager=*/nullptr, pool_); + } + + private: + std::shared_ptr pool_; + std::shared_ptr arrow_schema_; + std::shared_ptr key_schema_; + std::unique_ptr tmp_dir_; + std::unique_ptr dir_; + std::shared_ptr fs_; + std::shared_ptr noop_compact_manager_; + std::shared_ptr io_manager_; +}; + +TEST_F(LookupLevelsTest, TestMultiLevels) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/3, table_path, + core_options, "[[2, 22], [5, 55]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + // only in level 1 + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 1); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 11); + + // only in level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 4); + ASSERT_EQ(positioned_kv.value().key_value.level, 2); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 22); + + // both in level 1 and level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 3); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 5); + + // no exists + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({4}, pool_.get()), + /*start_level=*/1)); + ASSERT_FALSE(positioned_kv); + + ASSERT_EQ(lookup_levels->lookup_file_cache_->Size(), 2); + ASSERT_EQ(lookup_levels->schema_id_and_ser_version_to_processors_.size(), 1); + ASSERT_EQ(lookup_levels->GetLevels()->NonEmptyHighestLevel(), 2); + + // test lookup file in tmp dir + std::vector> file_status_list; + ASSERT_OK(fs_->ListDir(tmp_dir_->Str(), &file_status_list)); + ASSERT_EQ(file_status_list.size(), 1); + auto channel_dir = file_status_list[0]->GetPath(); + file_status_list.clear(); + ASSERT_OK(fs_->ListDir(channel_dir, &file_status_list)); + ASSERT_EQ(file_status_list.size(), 2); + ASSERT_EQ(levels->drop_file_callbacks_.size(), 1); + + // test close will rm local lookup file + ASSERT_OK(lookup_levels->Close()); + file_status_list.clear(); + ASSERT_OK(fs_->ListDir(channel_dir, &file_status_list)); + ASSERT_TRUE(file_status_list.empty()); + ASSERT_TRUE(levels->drop_file_callbacks_.empty()); + ASSERT_EQ(lookup_levels->lookup_file_cache_->Size(), 0); +} + +TEST_F(LookupLevelsTest, TestMultiFiles) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [2, 22]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/1, /*last_sequence_number=*/2, table_path, + core_options, "[[4, 44], [5, 55]]")); + ASSERT_OK_AND_ASSIGN(auto file2, NewFiles(/*level=*/1, /*last_sequence_number=*/4, table_path, + core_options, "[[7, 77], [8, 88]]")); + ASSERT_OK_AND_ASSIGN(auto file3, NewFiles(/*level=*/1, /*last_sequence_number=*/6, table_path, + core_options, "[[10, 1010], [11, 1111]]")); + + std::vector> files = {file0, file1, file2, file3}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + std::map contains = {{1, 11}, {2, 22}, {4, 44}, {5, 55}, + {7, 77}, {8, 88}, {10, 1010}, {11, 1111}}; + for (const auto& [key, value] : contains) { + ASSERT_OK_AND_ASSIGN( + auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({key}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), value); + } + + std::vector not_contains = {0, 3, 6, 9, 12}; + for (const auto& key : not_contains) { + ASSERT_OK_AND_ASSIGN( + auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({key}, pool_.get()), + /*start_level=*/1)); + ASSERT_FALSE(positioned_kv); + } +} + +TEST_F(LookupLevelsTest, TestLookupEmptyLevel) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/3, /*last_sequence_number=*/3, table_path, + core_options, "[[2, 22], [5, 55]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 4); + ASSERT_EQ(positioned_kv.value().key_value.level, 3); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 22); +} + +TEST_F(LookupLevelsTest, TestLookupLevel0) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/0, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 0]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/1, /*last_sequence_number=*/1, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file2, NewFiles(/*level=*/2, /*last_sequence_number=*/4, table_path, + core_options, "[[2, 22], [5, 55]]")); + + std::vector> files = {file0, file1, file2}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/0)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 1); + ASSERT_EQ(positioned_kv.value().key_value.level, 0); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 0); +} + +TEST_F(LookupLevelsTest, TestLookupLevel0NotInLevel0) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/3, table_path, + core_options, "[[2, 22], [5, 55]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/0)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 1); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 11); +} + +TEST_F(LookupLevelsTest, TestLookupLevel0WithMultipleFiles) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/0, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 0], [4, 44]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/0, /*last_sequence_number=*/2, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file2, NewFiles(/*level=*/2, /*last_sequence_number=*/5, table_path, + core_options, "[[2, 22], [5, 55]]")); + + std::vector> files = {file0, file1, file2}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/0)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 3); + ASSERT_EQ(positioned_kv.value().key_value.level, 0); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 11); + + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/0)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 4); + ASSERT_EQ(positioned_kv.value().key_value.level, 0); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 33); + + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({4}, pool_.get()), + /*start_level=*/0)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 2); + ASSERT_EQ(positioned_kv.value().key_value.level, 0); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 44); + + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/2)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 7); + ASSERT_EQ(positioned_kv.value().key_value.level, 2); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(1), 55); + + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({4}, pool_.get()), + /*start_level=*/2)); + ASSERT_FALSE(positioned_kv); +} + +TEST_F(LookupLevelsTest, TestWithPosition) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [3, 33], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/3, table_path, + core_options, "[[2, 22], [5, 55]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + // only in level 1 + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().row_position, 0); + + // only in level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().row_position, 0); + + // both in level 1 and level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().row_position, 2); + + // no exists + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({4}, pool_.get()), + /*start_level=*/1)); + ASSERT_FALSE(positioned_kv); +} + +TEST_F(LookupLevelsTest, TestLevelsWithValueFieldAppearBeforeKey) { + arrow::FieldVector fields = { + arrow::field("value", arrow::int32()), + arrow::field("key", arrow::int32()), + }; + arrow_schema_ = arrow::schema(fields); + key_schema_ = arrow::schema({fields[1]}); + + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[11, 1], [33, 3], [5, 5]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/3, table_path, + core_options, "[[22, 2], [55, 5]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + // only in level 1 + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 1); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(0), 11); + + // only in level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 4); + ASSERT_EQ(positioned_kv.value().key_value.level, 2); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(0), 22); + + // both in level 1 and level 2 + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_EQ(positioned_kv.value().key_value.sequence_number, 3); + ASSERT_EQ(positioned_kv.value().key_value.level, 1); + ASSERT_EQ(positioned_kv.value().key_value.value->GetInt(0), 5); + + // no exists + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({4}, pool_.get()), + /*start_level=*/1)); + ASSERT_FALSE(positioned_kv); +} + +TEST_F(LookupLevelsTest, TestDropFileCallbackOnUpdate) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [3, 33]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/2, table_path, + core_options, "[[2, 22], [5, 55]]")); + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + // Trigger lookup to populate the cache for both files. + ASSERT_OK_AND_ASSIGN(auto positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + ASSERT_OK_AND_ASSIGN(positioned_kv, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(positioned_kv); + + // Both files should be cached now. + ASSERT_EQ(lookup_levels->lookup_file_cache_->Size(), 2); + ASSERT_TRUE(lookup_levels->lookup_file_cache_->GetIfPresent(file0->file_name).has_value()); + ASSERT_TRUE(lookup_levels->lookup_file_cache_->GetIfPresent(file1->file_name).has_value()); + + // Update: remove file0 from level1, add a new file to level1. + ASSERT_OK_AND_ASSIGN(auto new_file, NewFiles(/*level=*/1, /*last_sequence_number=*/4, + table_path, core_options, "[[1, 111], [3, 333]]")); + ASSERT_OK(levels->Update(/*before=*/{file0}, /*after=*/{new_file})); + + // file0 was dropped, so its cache entry should be invalidated. + ASSERT_EQ(lookup_levels->lookup_file_cache_->Size(), 1); + ASSERT_FALSE(lookup_levels->lookup_file_cache_->GetIfPresent(file0->file_name).has_value()); + // file1 was not dropped, so its cache entry should still exist. + ASSERT_TRUE(lookup_levels->lookup_file_cache_->GetIfPresent(file1->file_name).has_value()); +} + +TEST_F(LookupLevelsTest, TestRemoteSst) { + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11]]")); + std::vector> files = {file0}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels)); + + // The processor identifier for PositionedKeyValue is "position-and-value" + // RemoteSst format: {fileName}.{length}.{processorId}.{serVersion}.lookup + + // 1. Empty extra_files: should return nullopt + auto result = lookup_levels->RemoteSst(file0); + ASSERT_FALSE(result.has_value()); + + // 2. extra_files with only nullopt elements: should return nullopt + auto meta_with_nullopt_extras = file0->CopyWithExtraFiles({std::nullopt, std::nullopt}); + result = lookup_levels->RemoteSst(meta_with_nullopt_extras); + ASSERT_FALSE(result.has_value()); + + // 3. extra_files with non-.lookup suffix: should return nullopt + auto meta_with_non_lookup = file0->CopyWithExtraFiles({std::string("some_file.index")}); + result = lookup_levels->RemoteSst(meta_with_non_lookup); + ASSERT_FALSE(result.has_value()); + + // 4. .lookup file with fewer than 3 dot-separated parts (e.g. "x.lookup"): + // split("x.lookup", ".") = ["x", "lookup"], size=2 < 3, should return nullopt + auto meta_with_short_lookup = file0->CopyWithExtraFiles({std::string("x.lookup")}); + result = lookup_levels->RemoteSst(meta_with_short_lookup); + ASSERT_FALSE(result.has_value()); + + // 5. .lookup file with mismatched processorId: should return nullopt + // Format: name.100.wrong_processor.v1.lookup + // parts = ["name", "100", "wrong_processor", "v1", "lookup"] + // processorId = parts[size-3] = "wrong_processor" != "position-and-value" + auto meta_with_wrong_processor = + file0->CopyWithExtraFiles({std::string("name.100.wrong_processor.v1.lookup")}); + result = lookup_levels->RemoteSst(meta_with_wrong_processor); + ASSERT_FALSE(result.has_value()); + + // 6. Valid .lookup file with matching processorId: should return RemoteSstFile + // Format: data.orc.1024.position-and-value.v1.lookup + std::string valid_lookup_name = "data.orc.1024.position-and-value.v1.lookup"; + auto meta_with_valid_lookup = file0->CopyWithExtraFiles({std::string(valid_lookup_name)}); + result = lookup_levels->RemoteSst(meta_with_valid_lookup); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value().sst_file_name, valid_lookup_name); + ASSERT_EQ(result.value().ser_version, "v1"); + + // 7. Multiple extra_files: first matching .lookup is used, non-matching ones are skipped + auto meta_with_multiple_extras = + file0->CopyWithExtraFiles({std::string("changelog.orc"), std::nullopt, + std::string("first.100.position-and-value.ver1.lookup"), + std::string("second.200.position-and-value.ver2.lookup")}); + result = lookup_levels->RemoteSst(meta_with_multiple_extras); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value().sst_file_name, "first.100.position-and-value.ver1.lookup"); + ASSERT_EQ(result.value().ser_version, "ver1"); + + // 8. Verify NewRemoteSst generates a name that RemoteSst can correctly parse + std::string generated_name = lookup_levels->NewRemoteSst(file0, /*length=*/2048); + auto meta_with_generated = file0->CopyWithExtraFiles({std::string(generated_name)}); + result = lookup_levels->RemoteSst(meta_with_generated); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value().sst_file_name, generated_name); + + // 9. Exactly 3 parts with matching processorId: "position-and-value.v2.lookup" + // parts = ["position-and-value", "v2", "lookup"] + // processorId = parts[0] = "position-and-value" (matches) + auto meta_with_minimal_parts = + file0->CopyWithExtraFiles({std::string("position-and-value.v2.lookup")}); + result = lookup_levels->RemoteSst(meta_with_minimal_parts); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result.value().sst_file_name, "position-and-value.v2.lookup"); + ASSERT_EQ(result.value().ser_version, "v2"); + + ASSERT_OK(lookup_levels->Close()); +} + +TEST_F(LookupLevelsTest, TestLookupFileCacheIntegration) { + // This single test covers multiple cache scenarios: + // 1. Cache is populated on first lookup (cache miss -> create -> put) + // 2. Subsequent lookups hit the cache (no new files created) + // 3. Two LookupLevels instances share the same global cache + // 4. Close() invalidates only own_cached_files_ from the shared cache + // 5. NotifyDropFile triggers cache invalidation and file deletion + // 6. Local lookup files are deleted when evicted from cache + + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + // Create two data files at different levels + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [2, 22]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/2, table_path, + core_options, "[[3, 33], [4, 44]]")); + + // Create a shared global cache + auto shared_cache = LookupFile::CreateLookupFileCache(/*file_retention_ms=*/-1, + /*max_disk_size=*/INT64_MAX); + ASSERT_EQ(shared_cache->Size(), 0); + + // --- Scenario 1: First lookup populates the cache --- + // Instance 1: uses file0 and file1 + std::vector> files1 = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels1, + Levels::Create(key_comparator, files1, /*num_levels=*/3)); + ASSERT_OK_AND_ASSIGN(auto lookup_levels1, + CreateLookupLevels(table_path, levels1, shared_cache)); + + // Lookup key=1 -> triggers cache miss, creates lookup file for file0 + ASSERT_OK_AND_ASSIGN( + auto result, lookup_levels1->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 11); + ASSERT_EQ(shared_cache->Size(), 1); + + // Lookup key=3 -> triggers cache miss, creates lookup file for file1 + ASSERT_OK_AND_ASSIGN( + result, lookup_levels1->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 33); + ASSERT_EQ(shared_cache->Size(), 2); + + // --- Scenario 2: Subsequent lookup hits the cache (no new file created) --- + ASSERT_OK_AND_ASSIGN( + result, lookup_levels1->Lookup(BinaryRowGenerator::GenerateRowPtr({2}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 22); + // Cache size should not increase (file0 was already cached) + ASSERT_EQ(shared_cache->Size(), 2); + + // --- Scenario 3: Two LookupLevels share the same cache --- + // Create a second data file set that has no overlap with lookup_levels1. + ASSERT_OK_AND_ASSIGN(auto file2, NewFiles(/*level=*/1, /*last_sequence_number=*/4, table_path, + core_options, "[[5, 55], [6, 66]]")); + std::vector> files2 = {file2}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels2, + Levels::Create(key_comparator, files2, /*num_levels=*/3)); + ASSERT_OK_AND_ASSIGN(auto lookup_levels2, + CreateLookupLevels(table_path, levels2, shared_cache)); + + // Lookup key=5 in instance 2 -> cache miss, creates lookup file for file2 + ASSERT_OK_AND_ASSIGN( + result, lookup_levels2->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 55); + ASSERT_EQ(shared_cache->Size(), 3); // file0, file1, file2 + + // Collect local file paths for later verification + std::vector> tmp_files; + ASSERT_OK(fs_->ListDir(tmp_dir_->Str(), &tmp_files)); + ASSERT_EQ(tmp_files.size(), 1); + auto channel_dir = tmp_files[0]->GetPath(); + tmp_files.clear(); + ASSERT_OK(fs_->ListDir(channel_dir, &tmp_files)); + ASSERT_EQ(tmp_files.size(), 3); + + // --- Scenario 4: Close instance 1 invalidates only its own files --- + // Instance 1 owns file0 and file1 in the cache + ASSERT_OK(lookup_levels1->Close()); + // file0 and file1 should be evicted (only owned by instance 1) + ASSERT_FALSE(shared_cache->GetIfPresent(file0->file_name).has_value()); + ASSERT_FALSE(shared_cache->GetIfPresent(file1->file_name).has_value()); + // file2 should still be in cache (owned by instance 2, not invalidated) + ASSERT_TRUE(shared_cache->GetIfPresent(file2->file_name).has_value()); + ASSERT_EQ(shared_cache->Size(), 1); + + // --- Scenario 5: Close instance 2 cleans up remaining files --- + ASSERT_OK(lookup_levels2->Close()); + ASSERT_EQ(shared_cache->Size(), 0); + + // All local lookup files should be deleted + tmp_files.clear(); + ASSERT_OK(fs_->ListDir(channel_dir, &tmp_files)); + ASSERT_TRUE(tmp_files.empty()); +} + +TEST_F(LookupLevelsTest, TestCacheEvictionBySmallMaxDiskSize) { + // Verify that when max_disk_size is small enough to hold only 2 lookup files, + // adding a 3rd triggers weight-based (SIZE) eviction of the LRU entry. + // After eviction, subsequent lookups on the evicted file still work by + // re-creating the lookup file on demand. + + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + // Create 3 data files at level 1. + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [2, 22]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/1, /*last_sequence_number=*/2, table_path, + core_options, "[[3, 33], [4, 44]]")); + ASSERT_OK_AND_ASSIGN(auto file2, NewFiles(/*level=*/1, /*last_sequence_number=*/4, table_path, + core_options, "[[5, 55], [6, 66]]")); + + std::vector> files = {file0, file1, file2}; + + // Phase 1: Probe with unlimited cache to measure per-file weights. + ASSERT_OK_AND_ASSIGN(std::shared_ptr probe_levels_obj, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + auto unlimited_cache = LookupFile::CreateLookupFileCache(/*file_retention_ms=*/-1, + /*max_disk_size=*/INT64_MAX); + ASSERT_OK_AND_ASSIGN(auto probe_levels, + CreateLookupLevels(table_path, probe_levels_obj, unlimited_cache)); + + // Trigger lookups to populate cache for all 3 files. + ASSERT_OK_AND_ASSIGN(auto result, + probe_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + int64_t weight_after_file0 = unlimited_cache->GetCurrentWeight(); + + ASSERT_OK_AND_ASSIGN(result, + probe_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + int64_t weight_after_file1 = unlimited_cache->GetCurrentWeight(); + + ASSERT_OK_AND_ASSIGN(result, + probe_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + int64_t weight_after_file2 = unlimited_cache->GetCurrentWeight(); + + ASSERT_EQ(unlimited_cache->Size(), 3); + int64_t file0_weight = weight_after_file0; + int64_t file1_weight = weight_after_file1 - weight_after_file0; + int64_t file2_weight = weight_after_file2 - weight_after_file1; + ASSERT_GT(file0_weight, 0); + ASSERT_GT(file1_weight, 0); + ASSERT_GT(file2_weight, 0); + + // Set max_disk_size to exactly hold file0 + file1 but not file2. + int64_t max_disk_for_two = file0_weight + file1_weight + file2_weight - 1; + ASSERT_OK(probe_levels->Close()); + + // Phase 2: Create a new LookupLevels with the constrained cache. + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels2, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + auto small_cache = LookupFile::CreateLookupFileCache(/*file_retention_ms=*/-1, + /*max_disk_size=*/max_disk_for_two); + ASSERT_OK_AND_ASSIGN(auto lookup_levels, CreateLookupLevels(table_path, levels2, small_cache)); + + // Lookup file0: fits in cache. + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 11); + ASSERT_EQ(small_cache->Size(), 1); + + // Lookup file1: still fits (file0 + file1 <= max). + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 33); + ASSERT_EQ(small_cache->Size(), 2); + ASSERT_TRUE(small_cache->GetIfPresent(file0->file_name).has_value()); + ASSERT_TRUE(small_cache->GetIfPresent(file1->file_name).has_value()); + + // Lookup file2: total weight exceeds max, should evict file0 (LRU). + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({5}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 55); + + // file0 should have been evicted (LRU). + ASSERT_FALSE(small_cache->GetIfPresent(file0->file_name).has_value()); + ASSERT_TRUE(small_cache->GetIfPresent(file1->file_name).has_value()); + ASSERT_TRUE(small_cache->GetIfPresent(file2->file_name).has_value()); + + // Lookup key=1 again (file0 was evicted): should re-create the lookup file. + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 11); + + // file0 is back in cache; file1 should now be evicted (it was LRU). + ASSERT_TRUE(small_cache->GetIfPresent(file0->file_name).has_value()); + ASSERT_FALSE(small_cache->GetIfPresent(file1->file_name).has_value()); + ASSERT_TRUE(small_cache->GetIfPresent(file2->file_name).has_value()); + + ASSERT_OK(lookup_levels->Close()); +} + +TEST_F(LookupLevelsTest, TestCacheEvictionByExpiration) { + // Verify that when expire_after_access_ms is very short, cached lookup files + // expire and are evicted. Subsequent lookups re-create the files. + + std::map options = {}; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(auto table_path, CreateTable(options)); + ASSERT_OK_AND_ASSIGN(auto key_comparator, CreateKeyComparator()); + + ASSERT_OK_AND_ASSIGN(auto file0, NewFiles(/*level=*/1, /*last_sequence_number=*/0, table_path, + core_options, "[[1, 11], [2, 22]]")); + ASSERT_OK_AND_ASSIGN(auto file1, NewFiles(/*level=*/2, /*last_sequence_number=*/2, table_path, + core_options, "[[3, 33], [4, 44]]")); + + std::vector> files = {file0, file1}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr levels, + Levels::Create(key_comparator, files, /*num_levels=*/3)); + + // Create a cache with a very short expiration (300ms). + auto expiring_cache = LookupFile::CreateLookupFileCache(/*file_retention_ms=*/300, + /*max_disk_size=*/INT64_MAX); + ASSERT_OK_AND_ASSIGN(auto lookup_levels, + CreateLookupLevels(table_path, levels, expiring_cache)); + + // Lookup to populate cache. + ASSERT_OK_AND_ASSIGN(auto result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 11); + + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 33); + ASSERT_EQ(expiring_cache->Size(), 2); + + // Wait for entries to expire. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // Entries should be expired now. GetIfPresent triggers expiration check. + ASSERT_FALSE(expiring_cache->GetIfPresent(file0->file_name).has_value()); + ASSERT_FALSE(expiring_cache->GetIfPresent(file1->file_name).has_value()); + ASSERT_EQ(expiring_cache->Size(), 0); + + // Lookups should still work by re-creating the lookup files. + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({1}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 11); + ASSERT_EQ(expiring_cache->Size(), 1); + + ASSERT_OK_AND_ASSIGN(result, + lookup_levels->Lookup(BinaryRowGenerator::GenerateRowPtr({3}, pool_.get()), + /*start_level=*/1)); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().key_value.value->GetInt(1), 33); + ASSERT_EQ(expiring_cache->Size(), 2); + + ASSERT_OK(lookup_levels->Close()); +} +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/lookup_utils.h b/src/paimon/core/mergetree/lookup_utils.h new file mode 100644 index 0000000..eecc7e5 --- /dev/null +++ b/src/paimon/core/mergetree/lookup_utils.h @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/mergetree/levels.h" +#include "paimon/core/mergetree/sorted_run.h" +#include "paimon/result.h" + +namespace paimon { +/// Utils for lookup. +class LookupUtils { + public: + LookupUtils() = delete; + ~LookupUtils() = delete; + + template + static Result> Lookup( + const Levels& levels, const std::shared_ptr& key, int32_t start_level, + std::function>(const std::shared_ptr&, + const SortedRun&)> + lookup, + std::function>( + const std::shared_ptr&, + const std::set, Levels::Level0Comparator>&)> + level0_lookup) { + std::optional result; + for (int32_t i = start_level; i < levels.NumberOfLevels(); ++i) { + if (i == 0) { + PAIMON_ASSIGN_OR_RAISE(result, level0_lookup(key, levels.GetLevel0())); + } else { + PAIMON_ASSIGN_OR_RAISE(SortedRun level, Levels::RunOfLevel(i, levels.GetLevels())); + PAIMON_ASSIGN_OR_RAISE(result, lookup(key, level)); + } + if (result) { + break; + } + } + return result; + } + + template + static Result> LookupLevel0( + const std::shared_ptr& key_comparator, + const std::shared_ptr& target, + const std::set, Levels::Level0Comparator>& level0, + std::function>(const std::shared_ptr&, + const std::shared_ptr&)> + lookup) { + std::optional result; + for (const auto& file : level0) { + if (key_comparator->CompareTo(file->max_key, *target) >= 0 && + key_comparator->CompareTo(file->min_key, *target) <= 0) { + PAIMON_ASSIGN_OR_RAISE(result, lookup(target, file)); + if (result) { + break; + } + } + } + return result; + } + + template + static Result> Lookup( + const std::shared_ptr& key_comparator, + const std::shared_ptr& target, const SortedRun& level, + std::function>(const std::shared_ptr&, + const std::shared_ptr&)> + lookup) { + if (level.IsEmpty()) { + return std::optional(); + } + const auto& files = level.Files(); + int32_t left = 0; + auto right = static_cast(files.size()) - 1; + + // binary search restart positions to find the restart position immediately before the + // target key + while (left < right) { + int32_t mid = (left + right) / 2; + if (key_comparator->CompareTo(files[mid]->max_key, *target) < 0) { + // Key at "mid.max" < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.max" >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + int32_t index = right; + // if the index is now pointing to the last file, check if the largest key in the block is + // smaller than the target key. If so, we need to seek beyond the end of this file + if (index == static_cast(files.size() - 1) && + key_comparator->CompareTo(files[index]->max_key, *target) < 0) { + index++; + } + + // if files does not have a next, it means the key does not exist in this level + return index < static_cast(files.size()) ? lookup(target, files[index]) + : std::optional(); + } +}; +} // namespace paimon