diff --git a/include/paimon/api.h b/include/paimon/api.h index d0c15187f..3f6304df7 100644 --- a/include/paimon/api.h +++ b/include/paimon/api.h @@ -23,6 +23,7 @@ #include "paimon/factories/factory.h" // IWYU pragma: export #include "paimon/file_store_commit.h" // IWYU pragma: export #include "paimon/file_store_write.h" // IWYU pragma: export +#include "paimon/format/parquet.h" // IWYU pragma: export #include "paimon/fs/file_system_factory.h" // IWYU pragma: export #include "paimon/memory/memory_pool.h" // IWYU pragma: export #include "paimon/predicate/predicate.h" // IWYU pragma: export diff --git a/include/paimon/format/parquet.h b/include/paimon/format/parquet.h new file mode 100644 index 000000000..364873bea --- /dev/null +++ b/include/paimon/format/parquet.h @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon::parquet { + +/// Resize the process-wide parquet metadata cache. `max_bytes <= 0` disables the +/// cache for subsequently created readers, and shrinks the existing cache down to +/// the new limit immediately (entries evicted in LRU order). The cache is +/// initialized eagerly when the parquet file format factory is registered, so this +/// function is safe to call at any time. +PAIMON_EXPORT Status ResizeParquetMetadataCache(int64_t max_bytes); + +} // namespace paimon::parquet diff --git a/src/paimon/common/utils/generic_lru_cache.h b/src/paimon/common/utils/generic_lru_cache.h index 9d799fcb8..b6a1b3b2f 100644 --- a/src/paimon/common/utils/generic_lru_cache.h +++ b/src/paimon/common/utils/generic_lru_cache.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -63,6 +64,8 @@ class GenericLruCache { /// Configuration options for the cache. struct Options { /// Maximum total weight of all entries. Entries are evicted (LRU) when exceeded. + /// Used only as the initial value for the cache's runtime-mutable max weight; + /// after construction, use GetMaxWeight()/SetMaxWeight() to read or update it. int64_t max_weight = INT64_MAX; /// Time in milliseconds after last access before an entry expires. @@ -77,7 +80,8 @@ class GenericLruCache { RemovalCallback removal_callback = nullptr; }; - explicit GenericLruCache(Options options) : options_(std::move(options)) {} + explicit GenericLruCache(Options options) + : options_(std::move(options)), max_weight_(options_.max_weight) {} /// Look up a key in the cache. On hit, promotes the entry to the front (most recently /// used) and updates its access time. Returns std::nullopt on miss or if the entry @@ -101,7 +105,7 @@ class GenericLruCache { // Cache miss: load via supplier outside the lock PAIMON_ASSIGN_OR_RAISE(V value, supplier(key)); int64_t weight = ComputeWeight(key, value); - if (weight > options_.max_weight) { + if (weight > max_weight_.load(std::memory_order_relaxed)) { return value; } @@ -122,10 +126,11 @@ class GenericLruCache { /// @return Status::Invalid if the entry's weight exceeds max_weight, Status::OK otherwise. Status Put(const K& key, V value) { int64_t weight = ComputeWeight(key, value); - if (weight > options_.max_weight) { + int64_t max_weight = max_weight_.load(std::memory_order_relaxed); + if (weight > max_weight) { return Status::Invalid( fmt::format("Entry weight {} exceeds cache max weight {}, entry will not be cached", - weight, options_.max_weight)); + weight, max_weight)); } std::unique_lock lock(mutex_); @@ -176,7 +181,22 @@ class GenericLruCache { /// @return The maximum weight configured for this cache. int64_t GetMaxWeight() const { - return options_.max_weight; + return max_weight_.load(std::memory_order_relaxed); + } + + /// Update the maximum total weight at runtime. The new limit is published with + /// release semantics so subsequent insertions on other threads observe it without + /// needing the cache lock. After updating, this method also acquires the write + /// lock and runs EvictIfNeeded() so that: + /// 1. expired entries are reaped opportunistically; + /// 2. when the new limit is smaller than current_weight_, entries are evicted + /// down to the new limit immediately rather than being held until the next + /// insertion (relevant when the cache is being shrunk or disabled and no + /// further inserts are expected). + void SetMaxWeight(int64_t new_max_weight) { + max_weight_.store(new_max_weight, std::memory_order_relaxed); + std::unique_lock lock(mutex_); + EvictIfNeeded(); } private: @@ -267,7 +287,8 @@ class GenericLruCache { /// Evict expired entries from the tail, then evict by weight if still over capacity. void EvictIfNeeded() { EvictExpired(); - while (current_weight_ > options_.max_weight && !lru_list_.empty()) { + while (current_weight_ > max_weight_.load(std::memory_order_relaxed) && + !lru_list_.empty()) { RemoveEntry(std::prev(lru_list_.end()), RemovalCause::SIZE); } } @@ -317,6 +338,10 @@ class GenericLruCache { } Options options_; + /// Runtime-mutable maximum total weight. Read on the hot path without holding + /// `mutex_` (e.g. in Get/Put before locking, and inside EvictIfNeeded under + /// the write lock). Writes go through SetMaxWeight() which uses relaxed atomics. + std::atomic max_weight_; int64_t current_weight_ = 0; EntryList lru_list_; EntryMap lru_map_; diff --git a/src/paimon/format/parquet/CMakeLists.txt b/src/paimon/format/parquet/CMakeLists.txt index 6e72e0c69..c7c65a734 100644 --- a/src/paimon/format/parquet/CMakeLists.txt +++ b/src/paimon/format/parquet/CMakeLists.txt @@ -19,8 +19,10 @@ set(PAIMON_PARQUET_FILE_FORMAT page_filtered_row_group_reader.cpp parquet_timestamp_converter.cpp parquet_file_batch_reader.cpp + parquet_file_format.cpp parquet_file_format_factory.cpp parquet_format_writer.cpp + parquet_metadata_cache.cpp parquet_schema_util.cpp parquet_stats_extractor.cpp parquet_writer_builder.cpp @@ -57,6 +59,7 @@ if(PAIMON_BUILD_TESTS) parquet_field_id_converter_test.cpp parquet_file_batch_reader_test.cpp parquet_format_writer_test.cpp + parquet_metadata_cache_test.cpp parquet_stats_extractor_test.cpp parquet_writer_builder_test.cpp predicate_converter_test.cpp diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index a963efaba..c09e803f2 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -120,7 +120,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = "true"; ASSERT_OK_AND_ASSIGN( auto batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size)); + ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size, + /*file_metadata=*/nullptr)); auto c_schema = std::make_unique(); ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate, diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 7533cb99a..b65691c29 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -71,7 +71,8 @@ ParquetFileBatchReader::ParquetFileBatchReader( Result> ParquetFileBatchReader::Create( std::shared_ptr&& input_stream, const std::shared_ptr& pool, - const std::map& options, int32_t batch_size) { + const std::map& options, int32_t batch_size, + std::shared_ptr<::parquet::FileMetaData> file_metadata) { try { assert(input_stream); PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties, @@ -81,7 +82,8 @@ Result> ParquetFileBatchReader::Create( CreateArrowReaderProperties(pool, options, batch_size)); ::parquet::arrow::FileReaderBuilder file_reader_builder; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, reader_properties)); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + file_reader_builder.Open(input_stream, reader_properties, std::move(file_metadata))); std::unique_ptr<::parquet::arrow::FileReader> file_reader; PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get()) diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 8dc412c30..9884b9982 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -64,7 +64,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { static Result> Create( std::shared_ptr&& input_stream, const std::shared_ptr& pool, - const std::map& options, int32_t batch_size); + const std::map& options, int32_t batch_size, + std::shared_ptr<::parquet::FileMetaData> file_metadata); // For timestamp type, we return the schema stored in file, e.g., second in parquet file will // store as milli. @@ -167,6 +168,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { const std::vector& src_row_groups); private: + friend class ParquetReaderBuilder; + std::map options_; // hold the lifecycle of arrow memory pool. std::shared_ptr arrow_pool_; diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index f5cf99b9d..a2eb23e2e 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -143,7 +143,8 @@ class ParquetFileBatchReaderTest : public ::testing::Test, const std::optional& selection_bitmap, int32_t batch_size) const { EXPECT_OK_AND_ASSIGN( auto parquet_batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size)); + ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size, + /*file_metadata=*/nullptr)); std::unique_ptr c_schema = std::make_unique(); auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); EXPECT_TRUE(arrow_status.ok()); @@ -203,6 +204,38 @@ TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary check_binary_read_result(arrow::large_binary(), "large-binary.parquet"); } +TEST_F(ParquetFileBatchReaderTest, TestReadRewrittenFileWithoutMetadataCache) { + auto field = arrow::field("f0", arrow::int32()); + auto schema = arrow::schema({field}); + + auto first_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([[1]])") + .ValueOrDie()); + WriteArray(file_path_, first_array, schema, /*write_batch_size=*/first_array->length(), + /*enable_dictionary=*/false, /*max_row_group_length=*/first_array->length()); + + auto first_reader = PrepareParquetFileBatchReader( + file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_); + ASSERT_OK_AND_ASSIGN(auto first_result, + paimon::test::ReadResultCollector::CollectResult(first_reader.get())); + auto first_expected = std::make_shared(first_array); + ASSERT_TRUE(first_result->Equals(first_expected)); + + auto second_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), + R"([[1], [2], [3], [4]])") + .ValueOrDie()); + WriteArray(file_path_, second_array, schema, /*write_batch_size=*/second_array->length(), + /*enable_dictionary=*/false, /*max_row_group_length=*/second_array->length()); + + auto second_reader = PrepareParquetFileBatchReader( + file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_); + ASSERT_OK_AND_ASSIGN(auto second_result, + paimon::test::ReadResultCollector::CollectResult(second_reader.get())); + auto second_expected = std::make_shared(second_array); + ASSERT_TRUE(second_result->Equals(second_expected)); +} + TEST_F(ParquetFileBatchReaderTest, TestSimple) { std::string file_name = paimon::test::GetDataDir() + "/parquet/parquet_append_table.db/parquet_append_table/bucket-0/" @@ -229,7 +262,8 @@ TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) { std::map options; ASSERT_OK_AND_ASSIGN( auto parquet_batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); + ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_, + /*file_metadata=*/nullptr)); // test GetFileSchema() ASSERT_OK_AND_ASSIGN(auto c_file_schema, parquet_batch_reader->GetFileSchema()); auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie(); diff --git a/src/paimon/format/parquet/parquet_file_format.cpp b/src/paimon/format/parquet/parquet_file_format.cpp new file mode 100644 index 000000000..aee3995f2 --- /dev/null +++ b/src/paimon/format/parquet/parquet_file_format.cpp @@ -0,0 +1,37 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/factories/factory_creator.h" +#include "paimon/format/parquet.h" +#include "paimon/format/parquet/parquet_file_format_factory.h" +#include "paimon/status.h" + +namespace paimon::parquet { + +Status ResizeParquetMetadataCache(int64_t max_bytes) { + auto* factory_creator = ::paimon::FactoryCreator::GetInstance(); + if (factory_creator == nullptr) { + return Status::Invalid("FactoryCreator is not initialized"); + } + auto* factory = dynamic_cast( + factory_creator->Create(ParquetFileFormatFactory::IDENTIFIER)); + if (factory == nullptr) { + return Status::Invalid("ParquetFileFormatFactory is not registered"); + } + return factory->ResizeMetadataCache(max_bytes); +} + +} // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_format.h b/src/paimon/format/parquet/parquet_file_format.h index ef3982ad7..ec1e6e77f 100644 --- a/src/paimon/format/parquet/parquet_file_format.h +++ b/src/paimon/format/parquet/parquet_file_format.h @@ -17,18 +17,21 @@ #pragma once #include +#include #include #include #include +#include #include "arrow/c/bridge.h" -#include "arrow/c/helpers.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/file_format.h" #include "paimon/format/parquet/parquet_field_id_converter.h" +#include "paimon/format/parquet/parquet_metadata_cache.h" #include "paimon/format/parquet/parquet_reader_builder.h" #include "paimon/format/parquet/parquet_stats_extractor.h" #include "paimon/format/parquet/parquet_writer_builder.h" +#include "paimon/result.h" struct ArrowSchema; @@ -42,15 +45,16 @@ namespace parquet { class ParquetFileFormat : public FileFormat { public: - explicit ParquetFileFormat(const std::map& options) - : identifier_("parquet"), options_(options) {} + ParquetFileFormat(const std::map& options, + std::shared_ptr metadata_cache) + : identifier_("parquet"), options_(options), metadata_cache_(std::move(metadata_cache)) {} const std::string& Identifier() const override { return identifier_; } Result> CreateReaderBuilder(int32_t batch_size) const override { - return std::make_unique(options_, batch_size); + return std::make_unique(options_, batch_size, metadata_cache_); } Result> CreateWriterBuilder(::ArrowSchema* schema, @@ -72,6 +76,9 @@ class ParquetFileFormat : public FileFormat { protected: std::string identifier_; std::map options_; + /// Process-wide parquet metadata cache injected by the factory. May be nullptr + /// when the cache is disabled. + std::shared_ptr metadata_cache_; }; } // namespace parquet diff --git a/src/paimon/format/parquet/parquet_file_format_factory.cpp b/src/paimon/format/parquet/parquet_file_format_factory.cpp index a203ce1f6..6602b4bce 100644 --- a/src/paimon/format/parquet/parquet_file_format_factory.cpp +++ b/src/paimon/format/parquet/parquet_file_format_factory.cpp @@ -20,14 +20,30 @@ #include "paimon/factories/factory.h" #include "paimon/format/parquet/parquet_file_format.h" +#include "paimon/format/parquet/parquet_metadata_cache.h" namespace paimon::parquet { const char ParquetFileFormatFactory::IDENTIFIER[] = "parquet"; +ParquetFileFormatFactory::ParquetFileFormatFactory() + : metadata_cache_(std::make_shared(0)) {} + +ParquetFileFormatFactory::~ParquetFileFormatFactory() = default; + Result> ParquetFileFormatFactory::Create( const std::map& options) const { - return std::make_unique(options); + // Inject the cache only if it is currently enabled (max weight > 0). When + // disabled by ResizeMetadataCache(0), pass nullptr so downstream readers behave + // identically to "no cache configured". + std::shared_ptr cache = + metadata_cache_->GetMaxWeight() > 0 ? metadata_cache_ : nullptr; + return std::make_unique(options, std::move(cache)); +} + +Status ParquetFileFormatFactory::ResizeMetadataCache(int64_t max_bytes) { + metadata_cache_->SetMaxWeight(max_bytes); + return Status::OK(); } REGISTER_PAIMON_FACTORY(ParquetFileFormatFactory); diff --git a/src/paimon/format/parquet/parquet_file_format_factory.h b/src/paimon/format/parquet/parquet_file_format_factory.h index fd9975bfa..d5c05a01a 100644 --- a/src/paimon/format/parquet/parquet_file_format_factory.h +++ b/src/paimon/format/parquet/parquet_file_format_factory.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -26,16 +27,34 @@ namespace paimon::parquet { +class ParquetMetadataCache; + class ParquetFileFormatFactory : public FileFormatFactory { public: static const char IDENTIFIER[]; + ParquetFileFormatFactory(); + ~ParquetFileFormatFactory() override; + const char* Identifier() const override { return IDENTIFIER; } Result> Create( const std::map& options) const override; + + /// Resize the process-wide parquet metadata cache held by this factory. + /// `max_bytes <= 0` disables further caching: subsequent Create() calls inject + /// nullptr instead of the cache instance, and the cache is shrunk down to the + /// new limit immediately (entries evicted in LRU order). Note: the cache + /// instance itself is preserved across resizes so that the cache can be + /// re-enabled later without losing the singleton identity. + Status ResizeMetadataCache(int64_t max_bytes); + + private: + /// Always non-null. Holds all currently cached entries; effective capacity is + /// controlled by the cache's internal max weight. + std::shared_ptr metadata_cache_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_metadata_cache.cpp b/src/paimon/format/parquet/parquet_metadata_cache.cpp new file mode 100644 index 000000000..0caf0f215 --- /dev/null +++ b/src/paimon/format/parquet/parquet_metadata_cache.cpp @@ -0,0 +1,59 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/parquet/parquet_metadata_cache.h" + +#include "parquet/metadata.h" + +namespace paimon::parquet { + +ParquetMetadataCache::ParquetMetadataCache(int64_t max_weight_bytes) + : cache_(MetadataLruCache::Options{ + .max_weight = max_weight_bytes, + .expire_after_access_ms = -1, + // Weight each entry by the size of its thrift-encoded footer so the cache + // bounds real memory usage rather than entry count. + .weigh_func = [](const std::string& key, + const std::shared_ptr<::parquet::FileMetaData>& value) -> int64_t { + auto weight = static_cast(key.size()); + if (value) { + weight += static_cast(value->size()); + } + return weight; + }}) {} + +Result> ParquetMetadataCache::Get( + const std::string& uri, const MetadataLoader& loader) { + auto supplier = + [&loader](const std::string&) -> Result> { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::FileMetaData> metadata, loader()); + if (metadata == nullptr) { + return Status::Invalid("Parquet metadata loader returned nullptr"); + } + return metadata; + }; + return cache_.Get(uri, supplier); +} + +int64_t ParquetMetadataCache::GetMaxWeight() const { + return cache_.GetMaxWeight(); +} + +void ParquetMetadataCache::SetMaxWeight(int64_t max_weight_bytes) { + cache_.SetMaxWeight(max_weight_bytes); +} + +} // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_metadata_cache.h b/src/paimon/format/parquet/parquet_metadata_cache.h new file mode 100644 index 000000000..351673390 --- /dev/null +++ b/src/paimon/format/parquet/parquet_metadata_cache.h @@ -0,0 +1,77 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/utils/generic_lru_cache.h" +#include "paimon/result.h" + +namespace parquet { +class FileMetaData; +} // namespace parquet + +namespace paimon::parquet { + +/// Cache for parsed Parquet FileMetaData (the footer), keyed by the file's URI. +/// +/// Paimon data files are immutable: once written, a given path never changes its +/// content. Therefore a parsed FileMetaData is valid for the lifetime of the file and +/// can be safely reused across reader creations that share this cache instance without +/// any invalidation/staleness check. Reusing it lets the underlying Parquet reader skip +/// re-reading and re-parsing the footer, which is especially impactful when many +/// readers are opened for the same file (e.g. prefetch parallelism). +/// +/// The cached FileMetaData is an immutable, read-only object and is shared via +/// shared_ptr, so concurrent reuse from multiple reader threads is safe. +class ParquetMetadataCache { + public: + using MetadataLoader = std::function>()>; + + explicit ParquetMetadataCache(int64_t max_weight_bytes); + + /// Look up cached metadata for the given file uri. On a cache miss, invokes + /// `loader` (outside the cache lock) to produce the metadata and inserts the result + /// into the cache before returning it. If `loader` fails, the error is propagated + /// and nothing is cached. Modeled after CacheManager::GetPage so callers do not + /// have to orchestrate Get/Put themselves. + Result> Get(const std::string& uri, + const MetadataLoader& loader); + + /// @return The maximum total weight (in bytes) currently configured. + int64_t GetMaxWeight() const; + + /// Update the maximum total weight (in bytes) at runtime. The new limit is + /// published immediately; if it is smaller than the current total weight, + /// entries are evicted in LRU order until the cache fits within the new limit. + /// Expired entries are also reaped opportunistically. + void SetMaxWeight(int64_t max_weight_bytes); + + private: + // This cache only uses GenericLruCache::Get with a supplier. Avoid adding + // Put-style replacement paths here: GenericLruCache compares shared_ptr + // values by dereferencing them, while FileMetaData is cached by pointer + // identity. + using MetadataLruCache = GenericLruCache>; + + MetadataLruCache cache_; +}; + +} // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_metadata_cache_test.cpp b/src/paimon/format/parquet/parquet_metadata_cache_test.cpp new file mode 100644 index 000000000..3eee5c48f --- /dev/null +++ b/src/paimon/format/parquet/parquet_metadata_cache_test.cpp @@ -0,0 +1,121 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/parquet/parquet_metadata_cache.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/format/parquet/parquet_format_defs.h" +#include "paimon/format/parquet/parquet_format_writer.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" +#include "parquet/file_reader.h" + +namespace paimon::parquet::test { + +class ParquetMetadataCacheTest : public ::testing::Test { + public: + void SetUp() override { + dir_ = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + fs_ = std::make_shared(); + pool_ = GetArrowPool(GetDefaultPool()); + file_path_ = PathUtil::JoinPath(dir_->Str(), "test.parquet"); + } + + void WriteSingleIntColumnFile(const std::string& file_path) { + auto field = arrow::field("f0", arrow::int32()); + auto schema = arrow::schema({field}); + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([[1], [2]])") + .ValueOrDie()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(file_path, /*overwrite=*/true)); + ::parquet::WriterProperties::Builder builder; + builder.write_batch_size(array->length()); + builder.max_row_group_length(array->length()); + builder.disable_dictionary(); + auto writer_properties = builder.build(); + ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create( + out, schema, writer_properties, + DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool_)); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + ASSERT_OK(format_writer->AddBatch(arrow_array.get())); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Close()); + } + + protected: + std::shared_ptr dir_; + std::shared_ptr fs_; + std::shared_ptr pool_; + std::string file_path_; +}; + +TEST_F(ParquetMetadataCacheTest, ReusesEntryForSameKeyAndMissesForDifferentKey) { + WriteSingleIntColumnFile(file_path_); + auto length = fs_->GetFileStatus(file_path_).value()->GetLen(); + + ParquetMetadataCache cache(/*max_weight_bytes=*/128 * 1024 * 1024); + int load_count = 0; + auto loader = [&]() -> Result> { + ++load_count; + PAIMON_ASSIGN_OR_RAISE(auto input_stream, fs_->Open(file_path_)); + auto in_stream = + std::make_shared(std::move(input_stream), pool_, length); + return ::parquet::ReadMetaData(in_stream); + }; + + ASSERT_OK_AND_ASSIGN(auto metadata1, cache.Get(file_path_, loader)); + ASSERT_OK_AND_ASSIGN(auto metadata2, cache.Get(file_path_, loader)); + ASSERT_EQ(metadata1.get(), metadata2.get()); + ASSERT_EQ(1, load_count); + + // A different cache key must trigger a fresh loader call. The string itself + // does not need to refer to a real file - the loader closure decides what to + // open. Using `file_path_ + "#different-key"` keeps that intent explicit. + ASSERT_OK_AND_ASSIGN(auto metadata3, cache.Get(file_path_ + "#different-key", loader)); + ASSERT_NE(metadata1.get(), metadata3.get()); + ASSERT_EQ(2, load_count); +} + +TEST_F(ParquetMetadataCacheTest, NullLoaderResultIsNotCached) { + ParquetMetadataCache cache(/*max_weight_bytes=*/128 * 1024 * 1024); + int load_count = 0; + auto loader = [&]() -> Result> { + ++load_count; + return std::shared_ptr<::parquet::FileMetaData>(); + }; + + ASSERT_NOK_WITH_MSG(cache.Get(file_path_, loader), "Parquet metadata loader returned nullptr"); + ASSERT_NOK_WITH_MSG(cache.Get(file_path_, loader), "Parquet metadata loader returned nullptr"); + ASSERT_EQ(2, load_count); +} + +} // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/parquet_reader_builder.h b/src/paimon/format/parquet/parquet_reader_builder.h index 8eb5e625d..507039c32 100644 --- a/src/paimon/format/parquet/parquet_reader_builder.h +++ b/src/paimon/format/parquet/parquet_reader_builder.h @@ -16,6 +16,8 @@ #pragma once +#include +#include #include #include #include @@ -24,17 +26,24 @@ #include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/parquet/parquet_file_batch_reader.h" +#include "paimon/format/parquet/parquet_metadata_cache.h" #include "paimon/format/reader_builder.h" #include "paimon/memory/memory_pool.h" #include "paimon/reader/file_batch_reader.h" #include "paimon/result.h" +#include "paimon/status.h" +#include "parquet/file_reader.h" namespace paimon::parquet { class ParquetReaderBuilder : public ReaderBuilder { public: - ParquetReaderBuilder(const std::map& options, int32_t batch_size) - : batch_size_(batch_size), pool_(GetDefaultPool()), options_(options) {} + ParquetReaderBuilder(const std::map& options, int32_t batch_size, + std::shared_ptr metadata_cache) + : batch_size_(batch_size), + pool_(GetDefaultPool()), + options_(options), + metadata_cache_(std::move(metadata_cache)) {} ReaderBuilder* WithMemoryPool(const std::shared_ptr& pool) override { pool_ = pool; @@ -46,9 +55,13 @@ class ParquetReaderBuilder : public ReaderBuilder { PAIMON_ASSIGN_OR_RAISE(int64_t file_length, path->Length()); std::shared_ptr arrow_pool = GetArrowPool(pool_); auto input_stream = - std::make_unique(path, arrow_pool, file_length); + std::make_shared(path, arrow_pool, file_length); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::FileMetaData> file_metadata, + LoadCachedMetadata(path, input_stream, arrow_pool)); + return ParquetFileBatchReader::Create(std::move(input_stream), arrow_pool, options_, - batch_size_); + batch_size_, std::move(file_metadata)); } Result> Build(const std::string& path) const override { @@ -56,9 +69,42 @@ class ParquetReaderBuilder : public ReaderBuilder { } private: + /// Resolve the cached Parquet FileMetaData for `path` if a metadata cache is + /// configured. Returns nullptr (not an error) when the cache is disabled or + /// the input stream does not expose a stable URI; downstream code treats a + /// null metadata as "load it lazily on first use". + Result> LoadCachedMetadata( + const std::shared_ptr& path, + const std::shared_ptr& input_stream, + const std::shared_ptr& arrow_pool) const { + if (metadata_cache_ == nullptr) { + return std::shared_ptr<::parquet::FileMetaData>(); + } + PAIMON_ASSIGN_OR_RAISE(std::string file_uri, path->GetUri()); + if (file_uri.empty()) { + return std::shared_ptr<::parquet::FileMetaData>(); + } + PAIMON_ASSIGN_OR_RAISE( + ::parquet::ReaderProperties reader_properties, + ParquetFileBatchReader::CreateReaderProperties(arrow_pool, options_)); + auto loader = [input_stream, reader_properties]() + -> Result> { + try { + return ::parquet::ParquetFileReader::Open(input_stream, reader_properties) + ->metadata(); + } catch (const std::exception& e) { + return Status::Invalid("ParquetReaderBuilder::Build: ", e.what()); + } catch (...) { + return Status::UnknownError("ParquetReaderBuilder::Build: unknown error"); + } + }; + return metadata_cache_->Get(file_uri, loader); + } + int32_t batch_size_ = -1; std::shared_ptr pool_; std::map options_; + std::shared_ptr metadata_cache_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp b/src/paimon/format/parquet/predicate_pushdown_test.cpp index 942114e9f..3c2e88cd6 100644 --- a/src/paimon/format/parquet/predicate_pushdown_test.cpp +++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp @@ -112,7 +112,8 @@ class PredicatePushdownTest : public ::testing::Test { std::to_string(predicate_node_count_limit); ASSERT_OK_AND_ASSIGN(auto batch_reader, ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, - options, batch_size_)); + options, batch_size_, + /*file_metadata=*/nullptr)); std::unique_ptr c_schema = std::make_unique(); auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); ASSERT_TRUE(arrow_status.ok());