From 6d3427b8f4fe8b4458526b3ee50e18fb5405a5ee Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 27 May 2026 12:32:58 +0200 Subject: [PATCH 1/3] GH-3598: Expose getRowRanges(int) and add getCompressedBytesForRowRanges ### Rationale for this change Opening up APIs needed by a later materialization feature in Spark. External readers (e.g. a Spark-side scanner) need (a) the column-index-derived row ranges that may pass the configured filter for a row group, and (b) a metadata-only estimate of the on-disk compressed bytes those ranges correspond to for the currently requested columns, so they can plan I/O without reading column data. ### What changes are included in this PR? - `getRowRanges(int blockIndex)`: made public; returns row ranges that may pass the configured filter. With no filter, shortcuts to all rows of the row group. - `getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges)`: metadata-only sum of compressed page sizes for the reader's currently requested columns whose pages overlap the given row ranges. Dictionary pages are not represented in OffsetIndex and are therefore excluded. ### Are these changes tested? Yes. `TestParquetFileReaderRowRanges` covers: no-filter row ranges cover all rows, empty ranges short-circuit to 0, full ranges equal the per-page OffsetIndex sum and are strictly less than the column-chunk total (proving dictionary-page exclusion), and partial ranges fall between 0 and the full total. ### Are there any user-facing changes? No. Closes #3598 Co-authored-by: Matt Butrovich --- .../parquet/hadoop/ParquetFileReader.java | 60 ++++++- .../TestParquetFileReaderRowRanges.java | 159 ++++++++++++++++++ 2 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..bd2bcaf225 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1489,9 +1489,23 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) { return ciStore; } - private RowRanges getRowRanges(int blockIndex) { - assert FilterCompat.isFilteringRequired(options.getRecordFilter()) - : "Should not be invoked if filter is null or NOOP"; + /** + * Computes the {@link RowRanges} within the given row group that may pass the configured filter + * (set via {@link ParquetReadOptions} or {@link ParquetInputFormat#setFilterPredicate}). If no + * filter is configured, returns a {@link RowRanges} covering all rows in the row group. + * + *

This computation is metadata-only: it consults each filter-referenced column's column + * index from the file footer; no column data is read from disk. The result can be passed to + * {@link #readFilteredRowGroup(int, RowRanges)} (intersected with any caller-supplied row + * ranges if desired) to read only the matching pages. + * + * @param blockIndex the row group (block) index + * @return row ranges within the block that may pass the configured filter + */ + public RowRanges getRowRanges(int blockIndex) { + if (!FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return RowRanges.createSingle(blocks.get(blockIndex).getRowCount()); + } RowRanges rowRanges = blockRowRanges.get(blockIndex); if (rowRanges == null) { rowRanges = ColumnIndexFilter.calculateRowRanges( @@ -1504,6 +1518,46 @@ private RowRanges getRowRanges(int blockIndex) { return rowRanges; } + /** + * Returns the total compressed byte count of this reader's requested columns' pages whose + * row ranges intersect {@code rowRanges} within the given row group. The set of columns is + * taken from the reader's currently configured requested schema (see + * {@link #setRequestedSchema}). Metadata-only: consults each column's {@link OffsetIndex} + * from the file footer; no column data is read. + * + *

Page size here is {@link OffsetIndex#getCompressedPageSize} (includes page header). + * Dictionary pages are not represented in {@link OffsetIndex} and are therefore excluded + * from the sum. + * + * @param blockIndex row group index + * @param rowRanges row ranges to intersect against pages + * @return sum of compressed page sizes across requested columns for pages overlapping + * {@code rowRanges} + * @throws ColumnIndexStore.MissingOffsetIndexException if any requested column lacks an + * offset index + */ + public long getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges) { + if (rowRanges.rowCount() == 0 || paths.isEmpty()) { + return 0L; + } + BlockMetaData block = blocks.get(blockIndex); + long blockRowCount = block.getRowCount(); + ColumnIndexStore ciStore = getColumnIndexStore(blockIndex); + long total = 0L; + for (ColumnPath path : paths.keySet()) { + OffsetIndex offsetIndex = ciStore.getOffsetIndex(path); + int pageCount = offsetIndex.getPageCount(); + for (int i = 0; i < pageCount; i++) { + long from = offsetIndex.getFirstRowIndex(i); + long to = offsetIndex.getLastRowIndex(i, blockRowCount); + if (rowRanges.isOverlapping(from, to)) { + total += offsetIndex.getCompressedPageSize(i); + } + } + } + return total; + } + public boolean skipNextRowGroup() { return advanceToNextBlock(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java new file mode 100644 index 0000000000..48ff4321d0 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.PrimitiveIterator; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests {@link ParquetFileReader#getRowRanges(int)} and + * {@link ParquetFileReader#getCompressedBytesForRowRanges(int, RowRanges)}. + */ +public class TestParquetFileReaderRowRanges { + + private static final int ROW_COUNT = 10_000; + private static final MessageType SCHEMA = + MessageTypeParser.parseMessageType("message test { required int64 id; required int64 grp; }"); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private Path file; + + @Before + public void writeFile() throws IOException { + File f = temp.newFile(); + f.delete(); + file = new Path(f.toURI()); + + // Small page size produces many pages per column chunk; low-cardinality `grp` + // ensures dictionary encoding kicks in so we can verify dictionary-page exclusion. + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .withType(SCHEMA) + .withWriteMode(OVERWRITE) + .withRowGroupSize(64L * 1024 * 1024) + .withPageSize(4 * 1024) + .withDictionaryEncoding(true) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + for (int i = 0; i < ROW_COUNT; i++) { + writer.write(factory.newGroup().append("id", (long) i).append("grp", (long) (i % 8))); + } + } + } + + private ParquetFileReader openReader() throws IOException { + Configuration conf = new Configuration(); + ParquetReadOptions options = HadoopReadOptions.builder(conf).build(); + return ParquetFileReader.open(HadoopInputFile.fromPath(file, conf), options); + } + + @Test + public void getRowRangesWithoutFilterCoversAllRows() throws IOException { + try (ParquetFileReader reader = openReader()) { + assertEquals(1, reader.getRowGroups().size()); + BlockMetaData block = reader.getRowGroups().get(0); + + RowRanges ranges = reader.getRowRanges(0); + + assertEquals(block.getRowCount(), ranges.rowCount()); + assertTrue(ranges.isOverlapping(0L, block.getRowCount() - 1)); + } + } + + @Test + public void getCompressedBytesForEmptyRangesIsZero() throws IOException { + try (ParquetFileReader reader = openReader()) { + assertEquals(0L, reader.getCompressedBytesForRowRanges(0, RowRanges.EMPTY)); + } + } + + @Test + public void getCompressedBytesForFullRangesEqualsOffsetIndexSum() throws IOException { + try (ParquetFileReader reader = openReader()) { + BlockMetaData block = reader.getRowGroups().get(0); + RowRanges full = reader.getRowRanges(0); + + long expected = 0L; + long columnChunkTotal = 0L; + for (ColumnChunkMetaData col : block.getColumns()) { + OffsetIndex oi = reader.readOffsetIndex(col); + for (int p = 0; p < oi.getPageCount(); p++) { + expected += oi.getCompressedPageSize(p); + } + columnChunkTotal += col.getTotalSize(); + } + + assertEquals(expected, reader.getCompressedBytesForRowRanges(0, full)); + + // Dictionary pages aren't represented in OffsetIndex, so the per-page sum + // must be strictly smaller than the column-chunk totals (which include them). + assertTrue( + "expected dictionary-page exclusion: " + expected + " < " + columnChunkTotal, + expected < columnChunkTotal); + } + } + + @Test + public void getCompressedBytesForPartialRangesIsBetweenZeroAndFull() throws IOException { + try (ParquetFileReader reader = openReader()) { + BlockMetaData block = reader.getRowGroups().get(0); + RowRanges full = reader.getRowRanges(0); + long fullBytes = reader.getCompressedBytesForRowRanges(0, full); + + // Build a partial RowRanges from the first half of the pages of an arbitrary column; + // since all columns share row counts, the resulting range applies to every column. + OffsetIndex anyOi = reader.readOffsetIndex(block.getColumns().get(0)); + int halfPageCount = Math.max(1, anyOi.getPageCount() / 2); + PrimitiveIterator.OfInt pages = IntStream.range(0, halfPageCount).iterator(); + RowRanges partial = RowRanges.create(block.getRowCount(), pages, anyOi); + + long partialBytes = reader.getCompressedBytesForRowRanges(0, partial); + + assertTrue("partial bytes should be > 0: " + partialBytes, partialBytes > 0); + assertTrue( + "partial bytes should be < full bytes: " + partialBytes + " < " + fullBytes, + partialBytes < fullBytes); + } + } +} From 1bffbc2095c62149bfb8042f5b07a8438d123697 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 8 Jun 2026 16:17:18 +0200 Subject: [PATCH 2/3] remove `getCompressedBytesForRowRanges()` --- .../parquet/hadoop/ParquetFileReader.java | 40 ----------- .../TestParquetFileReaderRowRanges.java | 67 +------------------ 2 files changed, 2 insertions(+), 105 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index bd2bcaf225..655268bf21 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1518,46 +1518,6 @@ public RowRanges getRowRanges(int blockIndex) { return rowRanges; } - /** - * Returns the total compressed byte count of this reader's requested columns' pages whose - * row ranges intersect {@code rowRanges} within the given row group. The set of columns is - * taken from the reader's currently configured requested schema (see - * {@link #setRequestedSchema}). Metadata-only: consults each column's {@link OffsetIndex} - * from the file footer; no column data is read. - * - *

Page size here is {@link OffsetIndex#getCompressedPageSize} (includes page header). - * Dictionary pages are not represented in {@link OffsetIndex} and are therefore excluded - * from the sum. - * - * @param blockIndex row group index - * @param rowRanges row ranges to intersect against pages - * @return sum of compressed page sizes across requested columns for pages overlapping - * {@code rowRanges} - * @throws ColumnIndexStore.MissingOffsetIndexException if any requested column lacks an - * offset index - */ - public long getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges) { - if (rowRanges.rowCount() == 0 || paths.isEmpty()) { - return 0L; - } - BlockMetaData block = blocks.get(blockIndex); - long blockRowCount = block.getRowCount(); - ColumnIndexStore ciStore = getColumnIndexStore(blockIndex); - long total = 0L; - for (ColumnPath path : paths.keySet()) { - OffsetIndex offsetIndex = ciStore.getOffsetIndex(path); - int pageCount = offsetIndex.getPageCount(); - for (int i = 0; i < pageCount; i++) { - long from = offsetIndex.getFirstRowIndex(i); - long to = offsetIndex.getLastRowIndex(i, blockRowCount); - if (rowRanges.isOverlapping(from, to)) { - total += offsetIndex.getCompressedPageSize(i); - } - } - } - return total; - } - public boolean skipNextRowGroup() { return advanceToNextBlock(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java index 48ff4321d0..e56bfc8d6f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java @@ -24,8 +24,6 @@ import java.io.File; import java.io.IOException; -import java.util.PrimitiveIterator; -import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; @@ -34,9 +32,7 @@ import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -46,8 +42,7 @@ import org.junit.rules.TemporaryFolder; /** - * Tests {@link ParquetFileReader#getRowRanges(int)} and - * {@link ParquetFileReader#getCompressedBytesForRowRanges(int, RowRanges)}. + * Tests {@link ParquetFileReader#getRowRanges(int)}. */ public class TestParquetFileReaderRowRanges { @@ -66,14 +61,12 @@ public void writeFile() throws IOException { f.delete(); file = new Path(f.toURI()); - // Small page size produces many pages per column chunk; low-cardinality `grp` - // ensures dictionary encoding kicks in so we can verify dictionary-page exclusion. + // Small page size produces many pages per column chunk. try (ParquetWriter writer = ExampleParquetWriter.builder(file) .withType(SCHEMA) .withWriteMode(OVERWRITE) .withRowGroupSize(64L * 1024 * 1024) .withPageSize(4 * 1024) - .withDictionaryEncoding(true) .build()) { SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); for (int i = 0; i < ROW_COUNT; i++) { @@ -100,60 +93,4 @@ public void getRowRangesWithoutFilterCoversAllRows() throws IOException { assertTrue(ranges.isOverlapping(0L, block.getRowCount() - 1)); } } - - @Test - public void getCompressedBytesForEmptyRangesIsZero() throws IOException { - try (ParquetFileReader reader = openReader()) { - assertEquals(0L, reader.getCompressedBytesForRowRanges(0, RowRanges.EMPTY)); - } - } - - @Test - public void getCompressedBytesForFullRangesEqualsOffsetIndexSum() throws IOException { - try (ParquetFileReader reader = openReader()) { - BlockMetaData block = reader.getRowGroups().get(0); - RowRanges full = reader.getRowRanges(0); - - long expected = 0L; - long columnChunkTotal = 0L; - for (ColumnChunkMetaData col : block.getColumns()) { - OffsetIndex oi = reader.readOffsetIndex(col); - for (int p = 0; p < oi.getPageCount(); p++) { - expected += oi.getCompressedPageSize(p); - } - columnChunkTotal += col.getTotalSize(); - } - - assertEquals(expected, reader.getCompressedBytesForRowRanges(0, full)); - - // Dictionary pages aren't represented in OffsetIndex, so the per-page sum - // must be strictly smaller than the column-chunk totals (which include them). - assertTrue( - "expected dictionary-page exclusion: " + expected + " < " + columnChunkTotal, - expected < columnChunkTotal); - } - } - - @Test - public void getCompressedBytesForPartialRangesIsBetweenZeroAndFull() throws IOException { - try (ParquetFileReader reader = openReader()) { - BlockMetaData block = reader.getRowGroups().get(0); - RowRanges full = reader.getRowRanges(0); - long fullBytes = reader.getCompressedBytesForRowRanges(0, full); - - // Build a partial RowRanges from the first half of the pages of an arbitrary column; - // since all columns share row counts, the resulting range applies to every column. - OffsetIndex anyOi = reader.readOffsetIndex(block.getColumns().get(0)); - int halfPageCount = Math.max(1, anyOi.getPageCount() / 2); - PrimitiveIterator.OfInt pages = IntStream.range(0, halfPageCount).iterator(); - RowRanges partial = RowRanges.create(block.getRowCount(), pages, anyOi); - - long partialBytes = reader.getCompressedBytesForRowRanges(0, partial); - - assertTrue("partial bytes should be > 0: " + partialBytes, partialBytes > 0); - assertTrue( - "partial bytes should be < full bytes: " + partialBytes + " < " + fullBytes, - partialBytes < fullBytes); - } - } } From b04cbb81e8a26eda15f2aa2d82e67f0d0e4deeca Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 9 Jun 2026 12:17:15 +0200 Subject: [PATCH 3/3] address review finding --- .../apache/parquet/hadoop/ParquetFileReader.java | 15 +++++++++++++-- .../hadoop/TestParquetFileReaderRowRanges.java | 10 ++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 655268bf21..8355a1554a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1492,7 +1492,8 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) { /** * Computes the {@link RowRanges} within the given row group that may pass the configured filter * (set via {@link ParquetReadOptions} or {@link ParquetInputFormat#setFilterPredicate}). If no - * filter is configured, returns a {@link RowRanges} covering all rows in the row group. + * filter is configured, returns a {@link RowRanges} covering all rows in the row group. If the + * row group has no rows, returns {@link RowRanges#EMPTY}. * *

This computation is metadata-only: it consults each filter-referenced column's column * index from the file footer; no column data is read from disk. The result can be passed to @@ -1501,10 +1502,20 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) { * * @param blockIndex the row group (block) index * @return row ranges within the block that may pass the configured filter + * @throws IllegalArgumentException if {@code blockIndex} is out of range */ public RowRanges getRowRanges(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + throw new IllegalArgumentException(String.format( + "Invalid block index %s, the valid block index range are: [%s, %s]", + blockIndex, 0, blocks.size() - 1)); + } + long rowCount = blocks.get(blockIndex).getRowCount(); + if (rowCount == 0L) { + return RowRanges.EMPTY; + } if (!FilterCompat.isFilteringRequired(options.getRecordFilter())) { - return RowRanges.createSingle(blocks.get(blockIndex).getRowCount()); + return RowRanges.createSingle(rowCount); } RowRanges rowRanges = blockRowRanges.get(blockIndex); if (rowRanges == null) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java index e56bfc8d6f..e445caf2bc 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java @@ -20,6 +20,7 @@ import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.File; @@ -93,4 +94,13 @@ public void getRowRangesWithoutFilterCoversAllRows() throws IOException { assertTrue(ranges.isOverlapping(0L, block.getRowCount() - 1)); } } + + @Test + public void getRowRangesRejectsOutOfRangeBlockIndex() throws IOException { + try (ParquetFileReader reader = openReader()) { + int blockCount = reader.getRowGroups().size(); + assertThrows(IllegalArgumentException.class, () -> reader.getRowRanges(-1)); + assertThrows(IllegalArgumentException.class, () -> reader.getRowRanges(blockCount)); + } + } }