[Parquet] Split byte-array batches transparently when i32 offset would overflow#9504
[Parquet] Split byte-array batches transparently when i32 offset would overflow#9504vigneshsiva11 wants to merge 2 commits intoapache:mainfrom
Conversation
…pache#7973) When reading Parquet byte-array columns (Utf8 / Binary) into Arrow arrays with 32-bit offsets, the reader previously returned an error the moment accumulated data in a batch exceeded 2 GiB. This made large Parquet row groups (e.g. NLP datasets with long text columns) completely unreadable with default settings. This commit makes the Parquet reader treat batch_size as a *target* rather than a hard limit: when the next value would overflow the i32 offset type, the decoder stops early and returns the partial batch. The decoder's internal position is left at the unread value, so the following read_records() call resumes seamlessly - no rows are lost, duplicated, or reordered. Key changes ----------- * OffsetBuffer::would_overflow(data_len) - new inline helper that uses checked_add to safely detect whether appending data_len bytes would exceed the capacity of offset type I, without any mutation. * ByteArrayDecoderPlain::read - checks would_overflow before each try_push and breaks out of the loop when true; fixes max_remaining_values accounting to subtract actual reads rather than requested reads. * ByteArrayDecoderDeltaLength::read - same pattern; advances length_offset and data_offset only by what was actually consumed. * ByteArrayDecoderDelta::read - checks would_overflow inside the callback closure and uses an overflow flag to distinguish a clean stop from a genuine error. * ByteArrayDecoderDictionary::read - processes dictionary keys one at a time via decoder.read(1, ...) so that the DictIndexDecoder never advances past an unconsumed key on overflow. Tests ----- * test_would_overflow - unit test for the new helper covering both i32 and i64 offset types, including the usize overflow edge case. * test_plain_decoder_partial_read - confirms a 3-value PLAIN page is correctly split across two read() calls with no data lost. Fixes apache#7973
There was a problem hiding this comment.
Pull request overview
This PR updates the Parquet → Arrow byte-array decoding path to avoid failing when building Arrow arrays with 32-bit offsets would exceed the representable offset range, by stopping early and allowing subsequent read_records() calls to resume from the same value.
Changes:
- Add
OffsetBuffer::would_overflow(data_len)to detect offset-range overflow before mutating buffers. - Update byte-array decoders (PLAIN, DELTA_LENGTH, DELTA_BYTE_ARRAY, DICTIONARY) to stop decoding before offset overflow and return a partial count.
- Add unit tests for
would_overflowand for multi-call reading behavior in the plain decoder.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
parquet/src/arrow/buffer/offset_buffer.rs |
Adds would_overflow helper and unit tests validating overflow boundaries. |
parquet/src/arrow/array_reader/byte_array.rs |
Uses would_overflow to stop early across byte-array decoding implementations; adds a decoder test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; | ||
|
|
||
| let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); | ||
| output.values.reserve(total_bytes); | ||
|
|
||
| let mut current_offset = self.data_offset; | ||
| let mut read = 0; | ||
| for length in src_lengths { | ||
| let end_offset = current_offset + *length as usize; | ||
| let value_len = *length as usize; | ||
| let end_offset = current_offset + value_len; | ||
|
|
||
| // Stop early rather than overflow the 32-bit offset buffer. | ||
| // length_offset / data_offset are only advanced by what was | ||
| // actually consumed, so the next read() resumes correctly. | ||
| if output.would_overflow(value_len) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
output.values.reserve(total_bytes) happens before any would_overflow checks. For large total_bytes (the exact scenario this PR targets), this can attempt to reserve multi‑GiB memory and potentially abort/OOM even though the decoder will stop early. Consider capping the reservation to the remaining representable offset range (e.g., min(total_bytes, max_appendable_bytes)) or deferring reservation until after you’ve determined how many values will actually be appended in this call.
| // Process one key at a time so we can stop cleanly when the output | ||
| // buffer would overflow. When the closure returns Err the | ||
| // DictIndexDecoder does NOT advance its position, so the same key will | ||
| // be retried in the next batch – no values are lost or skipped. | ||
| while total_read < len { | ||
| let mut overflow = false; | ||
| let n = self.decoder.read(1, |keys| { | ||
| let key = keys[0] as usize; | ||
| if key + 1 >= dict_offsets.len() { | ||
| return Err(general_err!( | ||
| "dictionary key beyond bounds of dictionary: 0..{}", | ||
| dict_offsets.len().saturating_sub(1) | ||
| )); | ||
| } | ||
| let start = dict_offsets[key].as_usize(); | ||
| let end = dict_offsets[key + 1].as_usize(); | ||
| let value = &dict_values[start..end]; | ||
|
|
||
| if output.would_overflow(value.len()) { | ||
| overflow = true; | ||
| return Err(general_err!("index overflow decoding byte array")); | ||
| } | ||
| // Dictionary values were validated at dictionary-page decode time. | ||
| output.try_push(value, false) | ||
| }); |
There was a problem hiding this comment.
This dictionary path now calls self.decoder.read(1, ...) in a loop, which adds substantial per-value overhead compared to decoding indices in batches. If possible, consider decoding a larger chunk of keys at a time and stopping without losing already-consumed keys (e.g., by tracking how many keys in the provided slice were successfully processed and advancing the decoder by that amount), or otherwise document/justify the performance tradeoff since dictionary pages are often hot paths.
|
|
||
| if self.validate_utf8 { | ||
| output.check_valid_utf8(initial_values_length)?; | ||
| output.check_valid_utf8(initial_values_length)? |
There was a problem hiding this comment.
The if self.validate_utf8 { ... } block isn’t terminated with a semicolon before Ok(read), which makes this function fail to compile. Add a trailing ; after the if block (or after the check_valid_utf8(...)? inside it).
| output.check_valid_utf8(initial_values_length)? | |
| output.check_valid_utf8(initial_values_length)?; |
|
|
||
| if self.validate_utf8 { | ||
| output.check_valid_utf8(initial_values_length)?; | ||
| output.check_valid_utf8(initial_values_length)? |
There was a problem hiding this comment.
The if self.validate_utf8 { ... } block is missing a terminating semicolon before Ok(read), so this won’t compile. Add ; after the if block (or after check_valid_utf8(...)? inside the block).
| output.check_valid_utf8(initial_values_length)? | |
| output.check_valid_utf8(initial_values_length)?; |
| let n = self.decoder.read(1, |keys| { | ||
| let key = keys[0] as usize; | ||
| if key + 1 >= dict_offsets.len() { | ||
| return Err(general_err!( | ||
| "dictionary key beyond bounds of dictionary: 0..{}", | ||
| dict_offsets.len().saturating_sub(1) | ||
| )); | ||
| } |
There was a problem hiding this comment.
let key = keys[0] as usize; can wrap negative dictionary indices to huge usize values, and key + 1 can overflow (e.g. key == -1) causing a panic in debug builds. Convert with usize::try_from(keys[0]) (returning an error on negatives) and avoid key + 1 overflow by comparing key >= dict_offsets.len().saturating_sub(1) or using checked_add(1).
Which issue does this PR close?
Rationale for this change
When reading Parquet byte-array columns (Utf8 / Binary) into Arrow arrays with 32-bit offsets, the reader errors with "index overflow decoding byte array" as soon as the accumulated string/binary data in a single batch exceeds 2 GiB (i32::MAX bytes).
With the default batch_size of 8 192 rows, this means any column where the average value is larger than ~256 KB cannot be read at all—even though the file is perfectly valid and both pyarrow and DuckDB handle it fine by splitting internally.
The correct fix, as discussed in the issue, is for the Parquet reader to treat
batch_sizeas a target rather than a hard limit and emit a smallerRecordBatchwhenever the next value would overflow the offsettype.
What changes are included in this PR?
parquet/src/arrow/buffer/offset_buffer.rsOffsetBuffer::would_overflow(data_len: usize) -> bool— an inline, zero-allocation helper that useschecked_addto safely test whether appendingdata_lenbytes would exceed the representable range of offset typeI, without mutating any state.parquet/src/arrow/array_reader/byte_array.rsAll four byte-array decoders are updated to call
would_overflowbefore eachtry_push. When the check fires the decoder breaks out of its loop and returns the partial count. The decoder's internal position is left pointing at the value that didn't fit, so the nextread_records()call resumes from exactly that value—no rows are lost, duplicated, or reordered.ByteArrayDecoderPlain::readwould_overflowbeforetry_push; fixmax_remaining_valuesto subtract actual reads, not requested readsByteArrayDecoderDeltaLength::readlength_offset/data_offsetonly by what was consumedByteArrayDecoderDelta::readwould_overflowinside the callback closure; use anoverflowflag to distinguish a clean stop from a real errorByteArrayDecoderDictionary::readdecoder.read(1, …)soDictIndexDecodernever advances past an unconsumed keyAre these changes tested?
Yes:
test_would_overflow— unit test for the new helper covering bothi32andi64offset types, including theusize::MAXedge case.test_plain_decoder_partial_read— confirms that a 3-value PLAIN page is correctly split across tworead()calls with no data lost orduplicated.
Are there any user-facing changes?
No breaking changes. Users who previously hit
"index overflow decoding byte array"with large string/binary columns will now get their data returned across multipleRecordBatches transparently, with no API or schema changes required.