Skip to content
44 changes: 23 additions & 21 deletions src/paimon/format/parquet/file_reader_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,15 @@ void FileReaderWrapper::AdvanceToNextRowGroup() {
current_row_group_idx_++;
// Skip row groups excluded by read range.
while (current_row_group_idx_ < target_row_groups_.size() &&
target_row_groups_[current_row_group_idx_].excluded_by_read_range) {
target_row_groups_[current_row_group_idx_].IsExcludedByReadRange()) {
current_row_group_idx_++;
}
if (current_row_group_idx_ >= target_row_groups_.size()) {
next_row_to_read_ = num_rows_;
} else {
next_row_to_read_ =
all_row_group_ranges_[target_row_groups_[current_row_group_idx_].row_group_index].first;
all_row_group_ranges_[target_row_groups_[current_row_group_idx_].GetRowGroupIndex()]
.first;
}
}

Expand All @@ -181,10 +182,10 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
filtered_global_offset_ = 0;

for (uint64_t i = 0; i < target_row_groups_.size(); i++) {
if (target_row_groups_[i].excluded_by_read_range) {
if (target_row_groups_[i].IsExcludedByReadRange()) {
continue;
}
int32_t rg_id = target_row_groups_[i].row_group_index;
int32_t rg_id = target_row_groups_[i].GetRowGroupIndex();
uint64_t rg_start = all_row_group_ranges_[rg_id].first;
uint64_t rg_end = all_row_group_ranges_[rg_id].second;
if (row_number > rg_start && row_number < rg_end) {
Expand All @@ -200,9 +201,9 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
// Rebuild batch_reader_ for non-page-filtered RGs at/after seek position.
std::vector<int32_t> fully_matched_indices;
for (uint64_t j = i; j < target_row_groups_.size(); j++) {
if (!target_row_groups_[j].excluded_by_read_range &&
!target_row_groups_[j].is_partially_matched) {
fully_matched_indices.push_back(target_row_groups_[j].row_group_index);
if (!target_row_groups_[j].IsExcludedByReadRange() &&
!target_row_groups_[j].IsPartiallyMatched()) {
fully_matched_indices.push_back(target_row_groups_[j].GetRowGroupIndex());
}
}
if (!fully_matched_indices.empty()) {
Expand All @@ -222,7 +223,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
}

Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::NextPageFiltered() {
int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index;
int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex();

// Construct the per-RG streaming reader on demand.
if (!current_page_filtered_reader_) {
Expand All @@ -237,7 +238,7 @@ Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::NextPageFiltered(
file_reader_->parquet_reader(), target_rg, target_column_indices_,
page_filtered_read_schema_, file_reader_->properties().cache_options(),
pre_buffered, page_ranges, max_chunksize, pool_));
current_filtered_row_ranges_ = target_rg.row_ranges;
current_filtered_row_ranges_ = target_rg.GetRowRanges();
current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first;
filtered_global_offset_ = 0;
}
Expand Down Expand Up @@ -273,7 +274,7 @@ Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::NextFullyMatched(
return std::shared_ptr<arrow::RecordBatch>();
}

int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index;
int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex();
uint64_t rg_end = all_row_group_ranges_[rg_id].second;
int64_t num_rows = record_batch->num_rows();

Expand All @@ -298,7 +299,7 @@ Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::Next() {

while (current_row_group_idx_ < target_row_groups_.size()) {
bool is_partially_matched =
target_row_groups_[current_row_group_idx_].is_partially_matched;
target_row_groups_[current_row_group_idx_].IsPartiallyMatched();
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch,
is_partially_matched ? NextPageFiltered() : NextFullyMatched());
if (batch) {
Expand Down Expand Up @@ -368,17 +369,17 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges(
auto file_metadata = file_reader_->parquet_reader()->metadata();

for (const auto& trg : target_row_groups_) {
if (trg.excluded_by_read_range) continue;
if (trg.IsExcludedByReadRange()) continue;

if (trg.is_partially_matched) {
if (trg.IsPartiallyMatched()) {
// Page-filtered RGs: only matching page byte ranges.
auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges(
file_reader_->parquet_reader(), trg, column_indices);
ranges.insert(ranges.end(), std::make_move_iterator(page_ranges.begin()),
std::make_move_iterator(page_ranges.end()));
} else {
// Fully-matched RGs: entire column chunk ranges.
auto rg_metadata = file_metadata->RowGroup(trg.row_group_index);
auto rg_metadata = file_metadata->RowGroup(trg.GetRowGroupIndex());
for (int32_t col_idx : column_indices) {
auto col_chunk = rg_metadata->ColumnChunk(col_idx);
int64_t offset = col_chunk->data_page_offset();
Expand Down Expand Up @@ -416,12 +417,12 @@ Status FileReaderWrapper::PrepareForReading(const std::vector<TargetRowGroup>& t
std::vector<int32_t> fully_matched_row_groups;
uint64_t active_count = 0;
for (const auto& trg : target_row_groups_) {
if (trg.excluded_by_read_range) {
if (trg.IsExcludedByReadRange()) {
continue;
}
active_count++;
if (!trg.is_partially_matched) {
fully_matched_row_groups.push_back(trg.row_group_index);
if (!trg.IsPartiallyMatched()) {
fully_matched_row_groups.push_back(trg.GetRowGroupIndex());
}
}

Expand Down Expand Up @@ -455,14 +456,15 @@ Status FileReaderWrapper::PrepareForReading(const std::vector<TargetRowGroup>& t
// Reset read state. Find the first non-excluded row group.
uint64_t first_active_idx = 0;
while (first_active_idx < target_row_groups_.size() &&
target_row_groups_[first_active_idx].excluded_by_read_range) {
target_row_groups_[first_active_idx].IsExcludedByReadRange()) {
first_active_idx++;
}
if (first_active_idx >= target_row_groups_.size()) {
next_row_to_read_ = num_rows_;
} else {
next_row_to_read_ =
all_row_group_ranges_[target_row_groups_[first_active_idx].row_group_index].first;
all_row_group_ranges_[target_row_groups_[first_active_idx].GetRowGroupIndex()]
.first;
}
previous_first_row_ = std::numeric_limits<uint64_t>::max();
current_row_group_idx_ = first_active_idx;
Expand All @@ -476,7 +478,7 @@ Status FileReaderWrapper::ApplyReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) {
if (read_ranges.empty()) {
for (auto& trg : target_row_groups_) {
trg.excluded_by_read_range = true;
trg.SetExcludedByReadRange(true);
}
reader_initialized_ = false;
return Status::OK();
Expand All @@ -492,7 +494,7 @@ Status FileReaderWrapper::ApplyReadRanges(
}
// Mark each target row group as excluded or not based on the matching set.
for (auto& trg : target_row_groups_) {
trg.excluded_by_read_range = matching_rg_indices.count(trg.row_group_index) == 0;
trg.SetExcludedByReadRange(matching_rg_indices.count(trg.GetRowGroupIndex()) == 0);
}
reader_initialized_ = false;
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/paimon/format/parquet/file_reader_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/type_fwd.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/format/parquet/row_ranges.h"
#include "paimon/format/parquet/target_row_group.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "parquet/arrow/reader.h"
Expand Down
8 changes: 4 additions & 4 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ Result<std::unique_ptr<arrow::RecordBatchReader>> PageFilteredRowGroupReader::Re
const ::arrow::io::CacheOptions& cache_options, bool pre_buffered,
const std::vector<::arrow::io::ReadRange>& page_ranges, int64_t max_chunksize,
std::shared_ptr<::arrow::MemoryPool> pool) {
const auto& row_ranges = target_row_group.row_ranges;
int32_t row_group_index = target_row_group.row_group_index;
const auto& row_ranges = target_row_group.GetRowRanges();
int32_t row_group_index = target_row_group.GetRowGroupIndex();

if (row_ranges.IsEmpty()) {
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Table> empty_table,
Expand Down Expand Up @@ -289,8 +289,8 @@ Result<std::unique_ptr<arrow::RecordBatchReader>> PageFilteredRowGroupReader::Re
std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRanges(
::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group,
const std::vector<int32_t>& column_indices) {
int32_t row_group_index = target_row_group.row_group_index;
const auto& row_ranges = target_row_group.row_ranges;
int32_t row_group_index = target_row_group.GetRowGroupIndex();
const auto& row_ranges = target_row_group.GetRowRanges();

std::vector<::arrow::io::ReadRange> ranges;
auto file_metadata = parquet_reader->metadata();
Expand Down
1 change: 1 addition & 0 deletions src/paimon/format/parquet/page_filtered_row_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/record_batch.h"
#include "arrow/type.h"
#include "paimon/format/parquet/row_ranges.h"
#include "paimon/format/parquet/target_row_group.h"
#include "paimon/result.h"
#include "parquet/column_reader.h"
#include "parquet/file_reader.h"
Expand Down
Loading
Loading