Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/paimon/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "paimon/factories/factory.h" // IWYU pragma: export
#include "paimon/file_store_commit.h" // IWYU pragma: export
#include "paimon/file_store_write.h" // IWYU pragma: export
#include "paimon/format/parquet.h" // IWYU pragma: export
#include "paimon/fs/file_system_factory.h" // IWYU pragma: export
#include "paimon/memory/memory_pool.h" // IWYU pragma: export
#include "paimon/predicate/predicate.h" // IWYU pragma: export
Expand Down
33 changes: 33 additions & 0 deletions include/paimon/format/parquet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>

#include "paimon/status.h"
#include "paimon/visibility.h"

namespace paimon::parquet {

/// Resize the process-wide parquet metadata cache. `max_bytes <= 0` disables the
/// cache for subsequently created readers, and shrinks the existing cache down to
/// the new limit immediately (entries evicted in LRU order). The cache is
/// initialized eagerly when the parquet file format factory is registered, so this
/// function is safe to call at any time.
PAIMON_EXPORT Status ResizeParquetMetadataCache(int64_t max_bytes);

} // namespace paimon::parquet
37 changes: 31 additions & 6 deletions src/paimon/common/utils/generic_lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -63,6 +64,8 @@ class GenericLruCache {
/// Configuration options for the cache.
struct Options {
/// Maximum total weight of all entries. Entries are evicted (LRU) when exceeded.
/// Used only as the initial value for the cache's runtime-mutable max weight;
/// after construction, use GetMaxWeight()/SetMaxWeight() to read or update it.
int64_t max_weight = INT64_MAX;

/// Time in milliseconds after last access before an entry expires.
Expand All @@ -77,7 +80,8 @@ class GenericLruCache {
RemovalCallback removal_callback = nullptr;
};

explicit GenericLruCache(Options options) : options_(std::move(options)) {}
explicit GenericLruCache(Options options)
: options_(std::move(options)), max_weight_(options_.max_weight) {}

/// Look up a key in the cache. On hit, promotes the entry to the front (most recently
/// used) and updates its access time. Returns std::nullopt on miss or if the entry
Expand All @@ -101,7 +105,7 @@ class GenericLruCache {
// Cache miss: load via supplier outside the lock
PAIMON_ASSIGN_OR_RAISE(V value, supplier(key));
int64_t weight = ComputeWeight(key, value);
if (weight > options_.max_weight) {
if (weight > max_weight_.load(std::memory_order_relaxed)) {
return value;
}

Expand All @@ -122,10 +126,11 @@ class GenericLruCache {
/// @return Status::Invalid if the entry's weight exceeds max_weight, Status::OK otherwise.
Status Put(const K& key, V value) {
int64_t weight = ComputeWeight(key, value);
if (weight > options_.max_weight) {
int64_t max_weight = max_weight_.load(std::memory_order_relaxed);
if (weight > max_weight) {
return Status::Invalid(
fmt::format("Entry weight {} exceeds cache max weight {}, entry will not be cached",
weight, options_.max_weight));
weight, max_weight));
}

std::unique_lock<std::shared_mutex> lock(mutex_);
Expand Down Expand Up @@ -176,7 +181,22 @@ class GenericLruCache {

/// @return The maximum weight configured for this cache.
int64_t GetMaxWeight() const {
return options_.max_weight;
return max_weight_.load(std::memory_order_relaxed);
}

/// Update the maximum total weight at runtime. The new limit is published with
/// release semantics so subsequent insertions on other threads observe it without
/// needing the cache lock. After updating, this method also acquires the write
/// lock and runs EvictIfNeeded() so that:
/// 1. expired entries are reaped opportunistically;
/// 2. when the new limit is smaller than current_weight_, entries are evicted
/// down to the new limit immediately rather than being held until the next
/// insertion (relevant when the cache is being shrunk or disabled and no
/// further inserts are expected).
void SetMaxWeight(int64_t new_max_weight) {
max_weight_.store(new_max_weight, std::memory_order_relaxed);
std::unique_lock<std::shared_mutex> lock(mutex_);
EvictIfNeeded();
}

private:
Expand Down Expand Up @@ -267,7 +287,8 @@ class GenericLruCache {
/// Evict expired entries from the tail, then evict by weight if still over capacity.
void EvictIfNeeded() {
EvictExpired();
while (current_weight_ > options_.max_weight && !lru_list_.empty()) {
while (current_weight_ > max_weight_.load(std::memory_order_relaxed) &&
!lru_list_.empty()) {
RemoveEntry(std::prev(lru_list_.end()), RemovalCause::SIZE);
}
}
Expand Down Expand Up @@ -317,6 +338,10 @@ class GenericLruCache {
}

Options options_;
/// Runtime-mutable maximum total weight. Read on the hot path without holding
/// `mutex_` (e.g. in Get/Put before locking, and inside EvictIfNeeded under
/// the write lock). Writes go through SetMaxWeight() which uses relaxed atomics.
std::atomic<int64_t> max_weight_;
int64_t current_weight_ = 0;
EntryList lru_list_;
EntryMap lru_map_;
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/format/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ set(PAIMON_PARQUET_FILE_FORMAT
page_filtered_row_group_reader.cpp
parquet_timestamp_converter.cpp
parquet_file_batch_reader.cpp
parquet_file_format.cpp
parquet_file_format_factory.cpp
parquet_format_writer.cpp
parquet_metadata_cache.cpp
parquet_schema_util.cpp
parquet_stats_extractor.cpp
parquet_writer_builder.cpp
Expand Down Expand Up @@ -57,6 +59,7 @@ if(PAIMON_BUILD_TESTS)
parquet_field_id_converter_test.cpp
parquet_file_batch_reader_test.cpp
parquet_format_writer_test.cpp
parquet_metadata_cache_test.cpp
parquet_stats_extractor_test.cpp
parquet_writer_builder_test.cpp
predicate_converter_test.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test {
options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = "true";
ASSERT_OK_AND_ASSIGN(
auto batch_reader,
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size));
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size,
/*file_metadata=*/nullptr));
auto c_schema = std::make_unique<ArrowSchema>();
ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok());
ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate,
Expand Down
6 changes: 4 additions & 2 deletions src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ ParquetFileBatchReader::ParquetFileBatchReader(
Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
const std::shared_ptr<arrow::MemoryPool>& pool,
const std::map<std::string, std::string>& options, int32_t batch_size) {
const std::map<std::string, std::string>& options, int32_t batch_size,
std::shared_ptr<::parquet::FileMetaData> file_metadata) {
try {
assert(input_stream);
PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties,
Expand All @@ -81,7 +82,8 @@ Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
CreateArrowReaderProperties(pool, options, batch_size));

::parquet::arrow::FileReaderBuilder file_reader_builder;
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, reader_properties));
PAIMON_RETURN_NOT_OK_FROM_ARROW(
file_reader_builder.Open(input_stream, reader_properties, std::move(file_metadata)));

std::unique_ptr<::parquet::arrow::FileReader> file_reader;
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get())
Expand Down
5 changes: 4 additions & 1 deletion src/paimon/format/parquet/parquet_file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader {
static Result<std::unique_ptr<ParquetFileBatchReader>> Create(
std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
const std::shared_ptr<arrow::MemoryPool>& pool,
const std::map<std::string, std::string>& options, int32_t batch_size);
const std::map<std::string, std::string>& options, int32_t batch_size,
std::shared_ptr<::parquet::FileMetaData> file_metadata);

Comment thread
gripleaf marked this conversation as resolved.
// For timestamp type, we return the schema stored in file, e.g., second in parquet file will
// store as milli.
Expand Down Expand Up @@ -167,6 +168,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader {
const std::vector<int32_t>& src_row_groups);

private:
friend class ParquetReaderBuilder;

std::map<std::string, std::string> options_;
// hold the lifecycle of arrow memory pool.
std::shared_ptr<arrow::MemoryPool> arrow_pool_;
Expand Down
38 changes: 36 additions & 2 deletions src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ class ParquetFileBatchReaderTest : public ::testing::Test,
const std::optional<RoaringBitmap32>& selection_bitmap, int32_t batch_size) const {
EXPECT_OK_AND_ASSIGN(
auto parquet_batch_reader,
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size));
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size,
/*file_metadata=*/nullptr));
std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
EXPECT_TRUE(arrow_status.ok());
Expand Down Expand Up @@ -203,6 +204,38 @@ TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary
check_binary_read_result(arrow::large_binary(), "large-binary.parquet");
}

TEST_F(ParquetFileBatchReaderTest, TestReadRewrittenFileWithoutMetadataCache) {
auto field = arrow::field("f0", arrow::int32());
auto schema = arrow::schema({field});

auto first_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([[1]])")
.ValueOrDie());
WriteArray(file_path_, first_array, schema, /*write_batch_size=*/first_array->length(),
/*enable_dictionary=*/false, /*max_row_group_length=*/first_array->length());

auto first_reader = PrepareParquetFileBatchReader(
file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_);
ASSERT_OK_AND_ASSIGN(auto first_result,
paimon::test::ReadResultCollector::CollectResult(first_reader.get()));
auto first_expected = std::make_shared<arrow::ChunkedArray>(first_array);
ASSERT_TRUE(first_result->Equals(first_expected));

auto second_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}),
R"([[1], [2], [3], [4]])")
.ValueOrDie());
WriteArray(file_path_, second_array, schema, /*write_batch_size=*/second_array->length(),
/*enable_dictionary=*/false, /*max_row_group_length=*/second_array->length());

auto second_reader = PrepareParquetFileBatchReader(
file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_);
ASSERT_OK_AND_ASSIGN(auto second_result,
paimon::test::ReadResultCollector::CollectResult(second_reader.get()));
auto second_expected = std::make_shared<arrow::ChunkedArray>(second_array);
ASSERT_TRUE(second_result->Equals(second_expected));
}

TEST_F(ParquetFileBatchReaderTest, TestSimple) {
std::string file_name = paimon::test::GetDataDir() +
"/parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
Expand All @@ -229,7 +262,8 @@ TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) {
std::map<std::string, std::string> options;
ASSERT_OK_AND_ASSIGN(
auto parquet_batch_reader,
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_));
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_,
/*file_metadata=*/nullptr));
// test GetFileSchema()
ASSERT_OK_AND_ASSIGN(auto c_file_schema, parquet_batch_reader->GetFileSchema());
auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie();
Expand Down
37 changes: 37 additions & 0 deletions src/paimon/format/parquet/parquet_file_format.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/factories/factory_creator.h"
#include "paimon/format/parquet.h"
#include "paimon/format/parquet/parquet_file_format_factory.h"
#include "paimon/status.h"

namespace paimon::parquet {

Status ResizeParquetMetadataCache(int64_t max_bytes) {
auto* factory_creator = ::paimon::FactoryCreator::GetInstance();
if (factory_creator == nullptr) {
return Status::Invalid("FactoryCreator is not initialized");
}
auto* factory = dynamic_cast<ParquetFileFormatFactory*>(
factory_creator->Create(ParquetFileFormatFactory::IDENTIFIER));
if (factory == nullptr) {
return Status::Invalid("ParquetFileFormatFactory is not registered");
}
return factory->ResizeMetadataCache(max_bytes);
}

} // namespace paimon::parquet
15 changes: 11 additions & 4 deletions src/paimon/format/parquet/parquet_file_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
#pragma once

#include <cassert>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <utility>

#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/format/file_format.h"
#include "paimon/format/parquet/parquet_field_id_converter.h"
#include "paimon/format/parquet/parquet_metadata_cache.h"
#include "paimon/format/parquet/parquet_reader_builder.h"
#include "paimon/format/parquet/parquet_stats_extractor.h"
#include "paimon/format/parquet/parquet_writer_builder.h"
#include "paimon/result.h"

struct ArrowSchema;

Expand All @@ -42,15 +45,16 @@ namespace parquet {

class ParquetFileFormat : public FileFormat {
public:
explicit ParquetFileFormat(const std::map<std::string, std::string>& options)
: identifier_("parquet"), options_(options) {}
ParquetFileFormat(const std::map<std::string, std::string>& options,
std::shared_ptr<ParquetMetadataCache> metadata_cache)
: identifier_("parquet"), options_(options), metadata_cache_(std::move(metadata_cache)) {}

const std::string& Identifier() const override {
return identifier_;
}

Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(int32_t batch_size) const override {
return std::make_unique<ParquetReaderBuilder>(options_, batch_size);
return std::make_unique<ParquetReaderBuilder>(options_, batch_size, metadata_cache_);
}

Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(::ArrowSchema* schema,
Expand All @@ -72,6 +76,9 @@ class ParquetFileFormat : public FileFormat {
protected:
std::string identifier_;
std::map<std::string, std::string> options_;
/// Process-wide parquet metadata cache injected by the factory. May be nullptr
/// when the cache is disabled.
std::shared_ptr<ParquetMetadataCache> metadata_cache_;
};

} // namespace parquet
Expand Down
18 changes: 17 additions & 1 deletion src/paimon/format/parquet/parquet_file_format_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,30 @@

#include "paimon/factories/factory.h"
#include "paimon/format/parquet/parquet_file_format.h"
#include "paimon/format/parquet/parquet_metadata_cache.h"

namespace paimon::parquet {

const char ParquetFileFormatFactory::IDENTIFIER[] = "parquet";

ParquetFileFormatFactory::ParquetFileFormatFactory()
: metadata_cache_(std::make_shared<ParquetMetadataCache>(0)) {}

ParquetFileFormatFactory::~ParquetFileFormatFactory() = default;

Result<std::unique_ptr<FileFormat>> ParquetFileFormatFactory::Create(
const std::map<std::string, std::string>& options) const {
return std::make_unique<ParquetFileFormat>(options);
// Inject the cache only if it is currently enabled (max weight > 0). When
// disabled by ResizeMetadataCache(0), pass nullptr so downstream readers behave
// identically to "no cache configured".
std::shared_ptr<ParquetMetadataCache> cache =
metadata_cache_->GetMaxWeight() > 0 ? metadata_cache_ : nullptr;
return std::make_unique<ParquetFileFormat>(options, std::move(cache));
}

Status ParquetFileFormatFactory::ResizeMetadataCache(int64_t max_bytes) {
metadata_cache_->SetMaxWeight(max_bytes);
return Status::OK();
}

REGISTER_PAIMON_FACTORY(ParquetFileFormatFactory);
Expand Down
Loading
Loading