From 1927cc6dce5ef25e120e20f1c2985e0c78a698f6 Mon Sep 17 00:00:00 2001 From: gripleaf <425797155@qq.com> Date: Tue, 16 Jun 2026 21:26:52 +0800 Subject: [PATCH] feat(parquet): support parquet metadata cache --- docs/source/user_guide.rst | 1 + .../user_guide/parquet_metadata_cache.rst | 107 ++++++++++++++++ include/paimon/cache/cache.h | 2 + include/paimon/format/reader_builder.h | 7 + src/paimon/common/io/cache/cache_key.cpp | 4 + .../core/operation/abstract_split_read.cpp | 1 + .../page_filtered_row_group_reader_test.cpp | 3 +- .../parquet/parquet_file_batch_reader.cpp | 6 +- .../parquet/parquet_file_batch_reader.h | 14 +- .../parquet_file_batch_reader_test.cpp | 121 +++++++++++++++++- .../format/parquet/parquet_reader_builder.h | 101 ++++++++++++++- .../parquet/predicate_pushdown_test.cpp | 7 +- .../testing/utils/counting_cache_test_utils.h | 114 +++++++++++++++++ .../testing/utils/manifest_cache_test_utils.h | 74 +---------- test/inte/write_and_read_inte_test.cpp | 90 +++++++++++++ 15 files changed, 567 insertions(+), 85 deletions(-) create mode 100644 docs/source/user_guide/parquet_metadata_cache.rst create mode 100644 src/paimon/testing/utils/counting_cache_test_utils.h diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst index 522a35a3c..4c497b5c3 100644 --- a/docs/source/user_guide.rst +++ b/docs/source/user_guide.rst @@ -25,6 +25,7 @@ User Guide user_guide/snapshot user_guide/manifest user_guide/manifest_cache + user_guide/parquet_metadata_cache user_guide/data_types user_guide/primary_key_table user_guide/append_only_table diff --git a/docs/source/user_guide/parquet_metadata_cache.rst b/docs/source/user_guide/parquet_metadata_cache.rst new file mode 100644 index 000000000..58ba0c4c7 --- /dev/null +++ b/docs/source/user_guide/parquet_metadata_cache.rst @@ -0,0 +1,107 @@ +.. Copyright 2026-present Alibaba Inc. + +.. Licensed under the Apache License, Version 2.0 (the "License"); +.. you may not use this file except in compliance with the License. +.. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +Parquet Metadata Cache +====================== + +Overview +-------- + +paimon-cpp can cache serialized Parquet metadata footer bytes for Parquet data files. +The cache is used by ``ParquetReaderBuilder`` before opening the Arrow Parquet +reader. On a cache miss, paimon-cpp loads the Parquet file metadata, serializes +it as a complete metadata footer, and stores those bytes in the public +``Cache`` abstraction. On a cache hit, paimon-cpp parses the cached footer bytes into +``parquet::FileMetaData`` and passes the metadata to the Parquet reader. + +The cache stores serialized metadata footer bytes instead of caching a +``parquet::FileMetaData`` instance. This keeps the cache value compact and +similar to manifest cache values: the cache weight follows the actual cached +bytes, while the Parquet library still owns metadata parsing and validation. + +This optimization is useful when the same Parquet files are opened repeatedly +in the same process, for example repeated ``get`` or ``scan`` requests over the +same snapshot. On a cache hit, the read path avoids reading the Parquet footer +bytes from the filesystem again. paimon-cpp still parses the cached footer bytes +into ``parquet::FileMetaData`` for each reader open. Data pages, page indexes, +and column chunks are still read from the file as usual. + +Configuration +------------- + +Parquet metadata caching is disabled by default. Embedding applications that +need it can provide a custom ``Cache`` implementation and inject it through +``ScanContextBuilder`` or ``ReadContextBuilder``. Parquet reader builders +receive the cache from the read context and create cache keys with +``CacheKind::PARQUET_METADATA`` internally. + +The cache key represents the file footer and is created from the file URI with +position ``-1`` and length ``-1``. Callers do not need to construct this key +directly; they only need to route ``CacheKind::PARQUET_METADATA`` entries to an +appropriate cache backend. + +Example: + +.. code-block:: cpp + + class RoutingCache : public paimon::Cache { + public: + RoutingCache(std::shared_ptr default_cache, + std::shared_ptr parquet_metadata_cache) + : default_cache_(std::move(default_cache)), + parquet_metadata_cache_(std::move(parquet_metadata_cache)) {} + + paimon::Result> Get( + const std::shared_ptr& key, + std::function>( + const std::shared_ptr&)> supplier) override { + return Select(key)->Get(key, std::move(supplier)); + } + + // Put(), Invalidate(), InvalidateAll(), and Size() route in the same way. + + private: + std::shared_ptr Select( + const std::shared_ptr& key) const { + return key && key->GetKind() == paimon::CacheKind::PARQUET_METADATA + ? parquet_metadata_cache_ + : default_cache_; + } + + std::shared_ptr default_cache_; + std::shared_ptr parquet_metadata_cache_; + }; + + auto cache = std::make_shared( + std::make_shared(), + std::make_shared()); + + paimon::ScanContextBuilder scan_builder(table_path); + scan_builder.WithCache(cache); + + paimon::ReadContextBuilder read_builder(table_path); + read_builder.WithCache(cache); + +Passing ``nullptr`` or omitting ``WithCache()`` leaves Parquet metadata caching +disabled. If a file URI cannot be obtained, paimon-cpp also bypasses the cache +and opens the Parquet file normally. + +Future Optimizations +-------------------- + +- Add hit, miss, bypass, and eviction metrics for Parquet metadata cache. +- Add single-flight loading for high-concurrency misses on the same Parquet + file. +- Evaluate sharing cached metadata footer bytes with page-index prefetch logic when + those read paths can use the same cache abstraction. diff --git a/include/paimon/cache/cache.h b/include/paimon/cache/cache.h index a61a60c50..b2e3a14fa 100644 --- a/include/paimon/cache/cache.h +++ b/include/paimon/cache/cache.h @@ -32,6 +32,7 @@ class CacheValue; enum class CacheKind { DEFAULT, MANIFEST, + PARQUET_METADATA, }; class PAIMON_EXPORT CacheKey { @@ -40,6 +41,7 @@ class PAIMON_EXPORT CacheKey { int32_t length, bool is_index); static std::shared_ptr ForKind(const std::string& file_path, int64_t position, int32_t length, CacheKind kind); + static std::shared_ptr ForParquetMeta(const std::string& file_uri); public: virtual ~CacheKey() = default; diff --git a/include/paimon/format/reader_builder.h b/include/paimon/format/reader_builder.h index 8b4d2f021..19aa3d432 100644 --- a/include/paimon/format/reader_builder.h +++ b/include/paimon/format/reader_builder.h @@ -24,6 +24,7 @@ #include "paimon/type_fwd.h" namespace paimon { +class Cache; /// Create a file batch reader based on the file path. Allows you to specify memory pool. class PAIMON_EXPORT ReaderBuilder { @@ -33,6 +34,12 @@ class PAIMON_EXPORT ReaderBuilder { /// Set memory pool to use. virtual ReaderBuilder* WithMemoryPool(const std::shared_ptr& pool) = 0; + /// Inject a cache for reader-specific immutable metadata. + virtual ReaderBuilder* WithCache(const std::shared_ptr& cache) { + (void)cache; + return this; + } + /// Build a file batch reader based on the created `InputStream`. virtual Result> Build( const std::shared_ptr& path) const = 0; diff --git a/src/paimon/common/io/cache/cache_key.cpp b/src/paimon/common/io/cache/cache_key.cpp index 12732bb27..bbc2079be 100644 --- a/src/paimon/common/io/cache/cache_key.cpp +++ b/src/paimon/common/io/cache/cache_key.cpp @@ -31,6 +31,10 @@ std::shared_ptr CacheKey::ForKind(const std::string& file_path, int64_ return key; } +std::shared_ptr CacheKey::ForParquetMeta(const std::string& file_uri) { + return ForKind(file_uri, /*position=*/-1, /*length=*/-1, CacheKind::PARQUET_METADATA); +} + bool PositionCacheKey::IsIndex() const { return is_index_; } diff --git a/src/paimon/core/operation/abstract_split_read.cpp b/src/paimon/core/operation/abstract_split_read.cpp index 5433335c6..4322f8580 100644 --- a/src/paimon/core/operation/abstract_split_read.cpp +++ b/src/paimon/core/operation/abstract_split_read.cpp @@ -117,6 +117,7 @@ Result> AbstractSplitRead::PrepareReaderBuilder( PAIMON_ASSIGN_OR_RAISE(std::unique_ptr reader_builder, file_format->CreateReaderBuilder(options_.GetReadBatchSize())); reader_builder->WithMemoryPool(pool_); + reader_builder->WithCache(options_.GetCache()); return reader_builder; } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index a963efaba..c09e803f2 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -120,7 +120,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = "true"; ASSERT_OK_AND_ASSIGN( auto batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size)); + ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size, + /*file_metadata=*/nullptr)); auto c_schema = std::make_unique(); ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate, diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 7533cb99a..b65691c29 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -71,7 +71,8 @@ ParquetFileBatchReader::ParquetFileBatchReader( Result> ParquetFileBatchReader::Create( std::shared_ptr&& input_stream, const std::shared_ptr& pool, - const std::map& options, int32_t batch_size) { + const std::map& options, int32_t batch_size, + std::shared_ptr<::parquet::FileMetaData> file_metadata) { try { assert(input_stream); PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties, @@ -81,7 +82,8 @@ Result> ParquetFileBatchReader::Create( CreateArrowReaderProperties(pool, options, batch_size)); ::parquet::arrow::FileReaderBuilder file_reader_builder; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, reader_properties)); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + file_reader_builder.Open(input_stream, reader_properties, std::move(file_metadata))); std::unique_ptr<::parquet::arrow::FileReader> file_reader; PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get()) diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 8dc412c30..fd4dae404 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -51,6 +51,9 @@ namespace io { class RandomAccessFile; } // namespace io } // namespace arrow +namespace parquet { +class FileMetaData; +} // namespace parquet namespace paimon { class Metrics; class Predicate; @@ -64,7 +67,12 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { static Result> Create( std::shared_ptr&& input_stream, const std::shared_ptr& pool, - const std::map& options, int32_t batch_size); + const std::map& options, int32_t batch_size, + std::shared_ptr<::parquet::FileMetaData> file_metadata); + + static Result<::parquet::ReaderProperties> CreateReaderProperties( + const std::shared_ptr& pool, + const std::map& options); // For timestamp type, we return the schema stored in file, e.g., second in parquet file will // store as milli. @@ -128,10 +136,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { const std::map& options, const std::shared_ptr& arrow_pool); - static Result<::parquet::ReaderProperties> CreateReaderProperties( - const std::shared_ptr& pool, - const std::map& options); - static Result<::parquet::ArrowReaderProperties> CreateArrowReaderProperties( const std::shared_ptr& pool, const std::map& options, int32_t batch_size); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index f5cf99b9d..dc3230381 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -16,6 +16,7 @@ #include "paimon/format/parquet/parquet_file_batch_reader.h" +#include #include #include @@ -38,12 +39,14 @@ #include "paimon/defs.h" #include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/format/parquet/parquet_format_writer.h" +#include "paimon/format/parquet/parquet_reader_builder.h" #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/memory/memory_pool.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/reader/batch_reader.h" +#include "paimon/testing/utils/counting_cache_test_utils.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" #include "paimon/testing/utils/timezone_guard.h" @@ -56,6 +59,47 @@ class Predicate; namespace paimon::parquet::test { +class FailedUriInputStream : public InputStream { + public: + explicit FailedUriInputStream(const std::shared_ptr& input) : input_(input) {} + + Status Seek(int64_t offset, SeekOrigin origin) override { + return input_->Seek(offset, origin); + } + + Result GetPos() const override { + return input_->GetPos(); + } + + Result Read(char* buffer, int64_t size) override { + return input_->Read(buffer, size); + } + + Result Read(char* buffer, int64_t size, int64_t offset) override { + return input_->Read(buffer, size, offset); + } + + void ReadAsync(char* buffer, int64_t size, int64_t offset, + std::function&& callback) override { + return input_->ReadAsync(buffer, size, offset, std::move(callback)); + } + + Result GetUri() const override { + return Status::Invalid("failed to get uri"); + } + + Result Length() const override { + return input_->Length(); + } + + Status Close() override { + return input_->Close(); + } + + private: + std::shared_ptr input_; +}; + class ParquetFileBatchReaderTest : public ::testing::Test, public ::testing::WithParamInterface { public: @@ -143,7 +187,8 @@ class ParquetFileBatchReaderTest : public ::testing::Test, const std::optional& selection_bitmap, int32_t batch_size) const { EXPECT_OK_AND_ASSIGN( auto parquet_batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size)); + ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size, + /*file_metadata=*/nullptr)); std::unique_ptr c_schema = std::make_unique(); auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); EXPECT_TRUE(arrow_status.ok()); @@ -151,7 +196,7 @@ class ParquetFileBatchReaderTest : public ::testing::Test, return parquet_batch_reader; } - private: + protected: std::string file_path_; std::unique_ptr dir_; std::shared_ptr fs_; @@ -161,6 +206,75 @@ class ParquetFileBatchReaderTest : public ::testing::Test, std::shared_ptr struct_array_; }; +TEST_F(ParquetFileBatchReaderTest, TestParquetMetadataCacheReusesSerializedFooter) { + WriteArray(file_path_, struct_array_, schema_, /*write_batch_size=*/struct_array_->length(), + /*enable_dictionary=*/false, + /*max_row_group_length=*/struct_array_->length()); + + auto cache = std::make_shared(CacheKind::PARQUET_METADATA, + 128 * 1024 * 1024); + auto open_reader = [&]() -> Result> { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr input_stream, fs_->Open(file_path_)); + std::map options; + ParquetReaderBuilder builder(options, batch_size_); + builder.WithMemoryPool(GetDefaultPool())->WithCache(cache); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr reader, + builder.Build(input_stream)); + auto parquet_reader = dynamic_cast(reader.release()); + if (parquet_reader == nullptr) { + return Status::Invalid("failed to cast FileBatchReader to ParquetFileBatchReader"); + } + return std::unique_ptr(parquet_reader); + }; + + ASSERT_OK_AND_ASSIGN(auto reader1, open_reader()); + ASSERT_OK_AND_ASSIGN(auto schema1, reader1->GetFileSchema()); + ASSERT_TRUE(schema1); + ASSERT_TRUE(schema1->release); + schema1->release(schema1.get()); + ASSERT_EQ(1, cache->GetCount()); + ASSERT_EQ(1, cache->SupplierCallCount()); + ASSERT_EQ(1, cache->Size()); + ASSERT_EQ(CacheKind::PARQUET_METADATA, cache->LastKind()); + + ASSERT_OK_AND_ASSIGN(auto reader2, open_reader()); + ASSERT_OK_AND_ASSIGN(auto schema2, reader2->GetFileSchema()); + ASSERT_TRUE(schema2); + ASSERT_TRUE(schema2->release); + schema2->release(schema2.get()); + ASSERT_EQ(2, cache->GetCount()); + ASSERT_EQ(1, cache->SupplierCallCount()); + ASSERT_EQ(1, cache->Size()); + ASSERT_EQ(CacheKind::PARQUET_METADATA, cache->LastKind()); +} + +TEST_F(ParquetFileBatchReaderTest, TestParquetMetadataCacheBypassesWhenGetUriFails) { + WriteArray(file_path_, struct_array_, schema_, /*write_batch_size=*/struct_array_->length(), + /*enable_dictionary=*/false, + /*max_row_group_length=*/struct_array_->length()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, fs_->Open(file_path_)); + auto failed_uri_input_stream = std::make_shared(input_stream); + auto cache = std::make_shared(CacheKind::PARQUET_METADATA, + 128 * 1024 * 1024); + + std::map options; + ParquetReaderBuilder builder(options, batch_size_); + builder.WithMemoryPool(GetDefaultPool())->WithCache(cache); + ASSERT_OK_AND_ASSIGN(std::unique_ptr reader, + builder.Build(failed_uri_input_stream)); + auto parquet_reader = dynamic_cast(reader.get()); + ASSERT_TRUE(parquet_reader); + ASSERT_OK_AND_ASSIGN(auto file_schema, parquet_reader->GetFileSchema()); + ASSERT_TRUE(file_schema); + ASSERT_TRUE(file_schema->release); + file_schema->release(file_schema.get()); + + ASSERT_EQ(0, cache->GetCount()); + ASSERT_EQ(0, cache->SupplierCallCount()); + ASSERT_EQ(0, cache->Size()); +} + TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary) { auto check_binary_read_result = [&](const std::shared_ptr& write_type, const std::string& file_name) { @@ -229,7 +343,8 @@ TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) { std::map options; ASSERT_OK_AND_ASSIGN( auto parquet_batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); + ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_, + /*file_metadata=*/nullptr)); // test GetFileSchema() ASSERT_OK_AND_ASSIGN(auto c_file_schema, parquet_batch_reader->GetFileSchema()); auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie(); diff --git a/src/paimon/format/parquet/parquet_reader_builder.h b/src/paimon/format/parquet/parquet_reader_builder.h index 8eb5e625d..93a7d5914 100644 --- a/src/paimon/format/parquet/parquet_reader_builder.h +++ b/src/paimon/format/parquet/parquet_reader_builder.h @@ -16,18 +16,28 @@ #pragma once +#include +#include +#include #include #include #include #include +#include "arrow/buffer.h" +#include "arrow/io/memory.h" +#include "fmt/format.h" +#include "paimon/cache/cache.h" #include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/parquet/parquet_file_batch_reader.h" #include "paimon/format/reader_builder.h" #include "paimon/memory/memory_pool.h" +#include "paimon/memory/memory_segment.h" #include "paimon/reader/file_batch_reader.h" #include "paimon/result.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" namespace paimon::parquet { @@ -41,14 +51,29 @@ class ParquetReaderBuilder : public ReaderBuilder { return this; } + ReaderBuilder* WithCache(const std::shared_ptr& cache) override { + cache_ = cache; + return this; + } + Result> Build( const std::shared_ptr& path) const override { PAIMON_ASSIGN_OR_RAISE(int64_t file_length, path->Length()); + std::string file_uri; + if (cache_) { + Result file_uri_result = path->GetUri(); + if (file_uri_result.ok()) { + file_uri = std::move(file_uri_result).value(); + } + } std::shared_ptr arrow_pool = GetArrowPool(pool_); - auto input_stream = + auto unique_input_stream = std::make_unique(path, arrow_pool, file_length); + std::shared_ptr input_stream(std::move(unique_input_stream)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::FileMetaData> file_metadata, + GetCachedParquetMetadata(input_stream, file_uri, arrow_pool)); return ParquetFileBatchReader::Create(std::move(input_stream), arrow_pool, options_, - batch_size_); + batch_size_, std::move(file_metadata)); } Result> Build(const std::string& path) const override { @@ -56,9 +81,81 @@ class ParquetReaderBuilder : public ReaderBuilder { } private: + Result SerializeParquetMetadataFooter( + const std::shared_ptr& input_stream, + const ::parquet::ReaderProperties& reader_properties, + const std::shared_ptr& arrow_pool) const { + constexpr int64_t kParquetFooterSize = 8; + + std::shared_ptr<::parquet::FileMetaData> metadata = + ::parquet::ParquetFileReader::Open(input_stream, reader_properties)->metadata(); + if (metadata == nullptr) { + return Status::Invalid("Failed to read parquet metadata"); + } + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr output_stream, + arrow::io::BufferOutputStream::Create(metadata->size() + kParquetFooterSize, + arrow_pool.get())); + ::parquet::WriteFileMetaData(*metadata, output_stream.get()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr metadata_footer, + output_stream->Finish()); + + MemorySegment segment = + MemorySegment::AllocateHeapMemory(metadata_footer->size(), pool_.get()); + std::memcpy(segment.MutableData(), metadata_footer->data(), metadata_footer->size()); + return segment; + } + + static Result> ParseParquetMetadataFooter( + const MemorySegment& segment, const ::parquet::ReaderProperties& reader_properties) { + if (segment.Data() == nullptr || segment.Size() <= 0) { + return Status::Invalid("Parquet metadata cache value is empty"); + } + + auto buffer = std::make_shared( + reinterpret_cast(segment.Data()), segment.Size()); + auto buffer_reader = std::make_shared(buffer); + std::shared_ptr<::parquet::FileMetaData> metadata = + ::parquet::ParquetFileReader::Open(buffer_reader, reader_properties)->metadata(); + if (metadata == nullptr) { + return Status::Invalid("Failed to parse parquet metadata footer"); + } + return metadata; + } + + Result> GetCachedParquetMetadata( + const std::shared_ptr& input_stream, + const std::string& file_uri, const std::shared_ptr& arrow_pool) const { + if (!cache_ || file_uri.empty()) { + return std::shared_ptr<::parquet::FileMetaData>(); + } + PAIMON_ASSIGN_OR_RAISE( + ::parquet::ReaderProperties reader_properties, + ParquetFileBatchReader::CreateReaderProperties(arrow_pool, options_)); + + auto cache_key = CacheKey::ForParquetMeta(file_uri); + auto supplier = + [this, &input_stream, reader_properties, + arrow_pool](const std::shared_ptr&) -> Result> { + PAIMON_ASSIGN_OR_RAISE( + MemorySegment segment, + SerializeParquetMetadataFooter(input_stream, reader_properties, arrow_pool)); + return std::make_shared(segment, CacheCallback()); + }; + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr cache_value, + cache_->Get(cache_key, supplier)); + if (cache_value == nullptr) { + return Status::Invalid("Parquet metadata cache returned nullptr value"); + } + return ParseParquetMetadataFooter(cache_value->GetSegment(), reader_properties); + } + int32_t batch_size_ = -1; std::shared_ptr pool_; std::map options_; + std::shared_ptr cache_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp b/src/paimon/format/parquet/predicate_pushdown_test.cpp index 942114e9f..7af14083b 100644 --- a/src/paimon/format/parquet/predicate_pushdown_test.cpp +++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp @@ -110,9 +110,10 @@ class PredicatePushdownTest : public ::testing::Test { std::map options; options[paimon::parquet::PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT] = std::to_string(predicate_node_count_limit); - ASSERT_OK_AND_ASSIGN(auto batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, - options, batch_size_)); + ASSERT_OK_AND_ASSIGN( + auto batch_reader, + ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size_, + /*file_metadata=*/nullptr)); std::unique_ptr c_schema = std::make_unique(); auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); ASSERT_TRUE(arrow_status.ok()); diff --git a/src/paimon/testing/utils/counting_cache_test_utils.h b/src/paimon/testing/utils/counting_cache_test_utils.h new file mode 100644 index 000000000..5d4b88fc3 --- /dev/null +++ b/src/paimon/testing/utils/counting_cache_test_utils.h @@ -0,0 +1,114 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/cache/cache.h" +#include "paimon/common/io/cache/lru_cache.h" +#include "paimon/result.h" + +namespace paimon::test { + +class CountingRoutingCache : public Cache { + public: + CountingRoutingCache(CacheKind kind, int64_t max_weight) { + caches_[kind] = std::make_shared(max_weight); + } + + explicit CountingRoutingCache(const std::map& max_weights) { + for (const auto& [kind, max_weight] : max_weights) { + caches_[kind] = std::make_shared(max_weight); + } + } + + Result> Get( + const std::shared_ptr& key, + std::function>(const std::shared_ptr&)> + supplier) override { + ++get_count_; + last_kind_ = key->GetKind(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr cache, GetCache(key)); + return cache->Get( + key, + [this, supplier = std::move(supplier)](const std::shared_ptr& supplier_key) + -> Result> { + ++supplier_call_count_; + return supplier(supplier_key); + }); + } + + Status Put(const std::shared_ptr& key, + const std::shared_ptr& value) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr cache, GetCache(key)); + return cache->Put(key, value); + } + + void Invalidate(const std::shared_ptr& key) override { + Result> cache = GetCache(key); + if (cache.ok()) { + cache.value()->Invalidate(key); + } + } + + void InvalidateAll() override { + for (const auto& [kind, cache] : caches_) { + cache->InvalidateAll(); + } + } + + size_t Size() const override { + size_t size = 0; + for (const auto& [kind, cache] : caches_) { + size += cache->Size(); + } + return size; + } + + int64_t GetCount() const { + return get_count_; + } + + int64_t SupplierCallCount() const { + return supplier_call_count_; + } + + CacheKind LastKind() const { + return last_kind_; + } + + private: + Result> GetCache(const std::shared_ptr& key) const { + auto iter = caches_.find(key->GetKind()); + if (iter == caches_.end()) { + return Status::Invalid("unexpected cache kind"); + } + return iter->second; + } + + std::map> caches_; + int64_t get_count_ = 0; + int64_t supplier_call_count_ = 0; + CacheKind last_kind_ = CacheKind::DEFAULT; +}; + +} // namespace paimon::test diff --git a/src/paimon/testing/utils/manifest_cache_test_utils.h b/src/paimon/testing/utils/manifest_cache_test_utils.h index 8b771287d..7588b4610 100644 --- a/src/paimon/testing/utils/manifest_cache_test_utils.h +++ b/src/paimon/testing/utils/manifest_cache_test_utils.h @@ -16,82 +16,18 @@ #pragma once -#include #include -#include -#include -#include -#include -#include "gtest/gtest.h" -#include "paimon/cache/cache.h" -#include "paimon/common/io/cache/lru_cache.h" -#include "paimon/result.h" +#include "paimon/testing/utils/counting_cache_test_utils.h" namespace paimon::test { -class CountingManifestRoutingCache : public Cache { +class CountingManifestRoutingCache : public CountingRoutingCache { public: - explicit CountingManifestRoutingCache(int64_t max_weight = 64 * 1024 * 1024) { - caches_[CacheKind::MANIFEST] = std::make_shared(max_weight); - } + CountingManifestRoutingCache() : CountingRoutingCache(CacheKind::MANIFEST, 64 * 1024 * 1024) {} - Result> Get( - const std::shared_ptr& key, - std::function>(const std::shared_ptr&)> - supplier) override { - ++get_count_; - return GetCache(key)->Get( - key, - [this, supplier = std::move(supplier)](const std::shared_ptr& supplier_key) - -> Result> { - ++supplier_call_count_; - return supplier(supplier_key); - }); - } - - Status Put(const std::shared_ptr& key, - const std::shared_ptr& value) override { - return GetCache(key)->Put(key, value); - } - - void Invalidate(const std::shared_ptr& key) override { - GetCache(key)->Invalidate(key); - } - - void InvalidateAll() override { - for (const auto& [kind, cache] : caches_) { - cache->InvalidateAll(); - } - } - - size_t Size() const override { - size_t size = 0; - for (const auto& [kind, cache] : caches_) { - size += cache->Size(); - } - return size; - } - - int64_t GetCount() const { - return get_count_; - } - - int64_t SupplierCallCount() const { - return supplier_call_count_; - } - - private: - std::shared_ptr GetCache(const std::shared_ptr& key) const { - EXPECT_EQ(CacheKind::MANIFEST, key->GetKind()); - auto iter = caches_.find(key->GetKind()); - EXPECT_NE(caches_.end(), iter); - return iter == caches_.end() ? nullptr : iter->second; - } - - std::map> caches_; - int64_t get_count_ = 0; - int64_t supplier_call_count_ = 0; + explicit CountingManifestRoutingCache(int64_t max_weight) + : CountingRoutingCache(CacheKind::MANIFEST, max_weight) {} }; } // namespace paimon::test diff --git a/test/inte/write_and_read_inte_test.cpp b/test/inte/write_and_read_inte_test.cpp index 529c9e7f8..b361d6227 100644 --- a/test/inte/write_and_read_inte_test.cpp +++ b/test/inte/write_and_read_inte_test.cpp @@ -42,6 +42,7 @@ #include "paimon/table/source/startup_mode.h" #include "paimon/table/source/table_read.h" #include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/counting_cache_test_utils.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/test_helper.h" #include "paimon/testing/utils/testharness.h" @@ -1098,6 +1099,95 @@ TEST_P(WriteAndReadInteTest, TestAppendWithParquetPageIndexFilter) { ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); } +TEST_P(WriteAndReadInteTest, TestAppendWithParquetMetadataCache) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" || file_system != "local") { + return; + } + + auto test_dir = UniqueTestDirectory::Create("local"); + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8()), + arrow::field("f1", arrow::int32())}; + auto schema = arrow::schema(fields); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, "parquet"}, + {Options::TARGET_FILE_SIZE, "1048576"}, {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + }; + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(test_dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); + std::string table_path = test_dir->Str() + "/foo.db/bar"; + + std::string data = R"([ + ["banana", 2], + ["dog", 1], + ["lucy", 14], + ["mouse", 100] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto expected_data_type = arrow::struct_(fields_with_row_kind); + auto expected = std::make_shared( + arrow::ipc::internal::json::ArrayFromJSON(expected_data_type, R"([ + [0, "banana", 2], + [0, "dog", 1], + [0, "lucy", 14], + [0, "mouse", 100] + ])") + .ValueOrDie()); + + auto cache = + std::make_shared(CacheKind::PARQUET_METADATA, 128 * 1024 * 1024); + auto read_once = [&]() -> Result { + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.AddOption(Options::SCAN_MODE, StartupMode::LatestFull().ToString()); + PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); + PAIMON_ASSIGN_OR_RAISE(auto result_plan, table_scan->CreatePlan()); + if (result_plan->SnapshotId() != std::optional(1)) { + return Status::Invalid("unexpected snapshot id"); + } + if (result_plan->Splits().empty()) { + return Status::Invalid("no splits found"); + } + + ReadContextBuilder read_context_builder(table_path); + read_context_builder.WithCache(cache); + PAIMON_ASSIGN_OR_RAISE(auto read_context, read_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context))); + PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + PAIMON_ASSIGN_OR_RAISE(auto read_result, + ReadResultCollector::CollectResult(batch_reader.get())); + if (!read_result) { + return Status::Invalid("read result is null"); + } + return expected->Equals(read_result); + }; + + ASSERT_OK_AND_ASSIGN(bool first_success, read_once()); + ASSERT_TRUE(first_success); + ASSERT_EQ(1, cache->GetCount()); + ASSERT_EQ(1, cache->SupplierCallCount()); + ASSERT_EQ(1, cache->Size()); + ASSERT_EQ(CacheKind::PARQUET_METADATA, cache->LastKind()); + + ASSERT_OK_AND_ASSIGN(bool second_success, read_once()); + ASSERT_TRUE(second_success); + ASSERT_EQ(2, cache->GetCount()); + ASSERT_EQ(1, cache->SupplierCallCount()); + ASSERT_EQ(1, cache->Size()); + ASSERT_EQ(CacheKind::PARQUET_METADATA, cache->LastKind()); +} + INSTANTIATE_TEST_SUITE_P(FileFormatAndFileSystem, WriteAndReadInteTest, ::testing::ValuesIn(GetTestValuesForWriteAndReadInteTest()));