feat(parquet): add content defined chunking for arrow writer#9450
feat(parquet): add content defined chunking for arrow writer#9450kszucs wants to merge 15 commits intoapache:mainfrom
Conversation
…` and use it in content defined chunking
| let mut path: Vec<String> = vec![]; | ||
| path.extend(path_so_far.iter().copied().map(String::from)); | ||
| leaves.push(Arc::new(ColumnDescriptor::new( | ||
| let mut desc = ColumnDescriptor::new( |
There was a problem hiding this comment.
I didn't want to break the API of ColumnDescriptor, so setting repeated_ancestor_def_level below.
There was a problem hiding this comment.
You could add a new_with_repeated_ancestor, and perhaps change new to call that with the default value for repeated_ancestor_def_level.
There was a problem hiding this comment.
We don't necessarily need to store the codegen script in the repository. Alternatively we could just reference https://github.com/apache/arrow/blob/main/cpp/src/parquet/chunker_internal_generated.h as a source for cdc_generated.rs. Likely it won't be regenerated at all.
|
Hi @kszucs. 👋 Apologies, I have been unusually bandwidth constrained lately. I will try to give this a good look in the next few days. Thank you for your patience 🙏 (and for adding this to arrow-rs). |
|
Hi @etseidl! No worries, I really appreciate you taking the time to review! |
etseidl
left a comment
There was a problem hiding this comment.
Flushing a few early observations/questions. Still need to do the deep dive.
| let mut path: Vec<String> = vec![]; | ||
| path.extend(path_so_far.iter().copied().map(String::from)); | ||
| leaves.push(Arc::new(ColumnDescriptor::new( | ||
| let mut desc = ColumnDescriptor::new( |
There was a problem hiding this comment.
You could add a new_with_repeated_ancestor, and perhaps change new to call that with the default value for repeated_ancestor_def_level.
| let cdc_chunkers = match props_ptr.content_defined_chunking() { | ||
| Some(opts) => { | ||
| let chunkers = file_writer | ||
| .schema_descr() | ||
| .columns() | ||
| .iter() | ||
| .map(|desc| ContentDefinedChunker::new(desc, opts)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| Some(chunkers) | ||
| } | ||
| None => None, | ||
| }; |
There was a problem hiding this comment.
| let cdc_chunkers = match props_ptr.content_defined_chunking() { | |
| Some(opts) => { | |
| let chunkers = file_writer | |
| .schema_descr() | |
| .columns() | |
| .iter() | |
| .map(|desc| ContentDefinedChunker::new(desc, opts)) | |
| .collect::<Result<Vec<_>>>()?; | |
| Some(chunkers) | |
| } | |
| None => None, | |
| }; | |
| let cdc_chunkers = props_ptr.content_defined_chunking().map(|opts| { | |
| file_writer | |
| .schema_descr() | |
| .columns() | |
| .iter() | |
| .map(|desc| ContentDefinedChunker::new(desc, opts)) | |
| .collect::<Result<Vec<_>>>() | |
| }).transpose()?; |
Can simplify this a bit.
|
|
||
| /// A chunk of data with level and value offsets for record-shredded nested data. | ||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) struct Chunk { |
There was a problem hiding this comment.
"chunk" is an overloaded term (I keep thinking column chunk). What do you think of changing this to CdcChunk?
| /// Create a sliced view of this `ArrayLevels` for a CDC chunk. | ||
| pub(crate) fn slice_for_chunk(&self, chunk: &Chunk) -> Self { |
There was a problem hiding this comment.
I have trouble with calling this a "view" when it's actually allocating new vectors for the levels and non-null indices. I'm thinking out loud, but I wonder if we could create an actual ArrayLevelsView that uses proper slices of the underlying Vecs and pass that to write_internal.
Nevermind...I tried implementing this but it would require a ton of changes to the level handling. I guess just update the comment to say that copies of data will be made.
|
|
||
| #[test] | ||
| fn test_slice_for_chunk_flat() { | ||
| // Required field (no levels): array [1..=6], slice values 2..5 |
There was a problem hiding this comment.
| // Required field (no levels): array [1..=6], slice values 2..5 | |
| // Required field (no levels): array [1..=6], slice values 3..=5 |
? Trying to understand value_offset == 2
| self.row_group_writer_factory | ||
| .create_row_group_writer(self.writer.flushed_row_groups().len())?, | ||
| ), | ||
| x => { |
There was a problem hiding this comment.
Is this change necessary? Or is it left over from earlier debugging?
etseidl
left a comment
There was a problem hiding this comment.
Just a few more random comments...I'll get into the meat of the chunking tomorrow. Looking good so far!
| None => return Ok(()), | ||
| }; | ||
|
|
||
| let chunks = in_progress.close()?; |
There was a problem hiding this comment.
Also curious about this change and below. Does the close have to happen before calling next_row_group?
| /// Note that the parquet writer has a related `data_page_size_limit` property that | ||
| /// controls the maximum size of a parquet data page after encoding. While setting | ||
| /// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect | ||
| /// the chunking effectiveness, it results in more small parquet data pages. |
There was a problem hiding this comment.
| /// Note that the parquet writer has a related `data_page_size_limit` property that | |
| /// controls the maximum size of a parquet data page after encoding. While setting | |
| /// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect | |
| /// the chunking effectiveness, it results in more small parquet data pages. | |
| /// Note that the parquet writer has a related [`data_page_size_limit`] property that | |
| /// controls the maximum size of a parquet data page after encoding. While setting | |
| /// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect | |
| /// the chunking effectiveness, it results in more small parquet data pages. | |
| /// | |
| /// [`data_page_size_limit`]: WriterPropertiesBuilder::set_data_page_size_limit |
| min_chunk_size: 256 * 1024, | ||
| max_chunk_size: 1024 * 1024, |
There was a problem hiding this comment.
Can you add constants for these above? 🙏
| /// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled. | ||
| /// | ||
| /// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`] | ||
| pub fn content_defined_chunking(&self) -> Option<&CdcOptions> { |
There was a problem hiding this comment.
Does this only make sense as a global option, or would making it a per-column property be reasonable?
Which issue does this PR close?
Rationale for this change
Rust implementation of apache/arrow#45360
Traditional Parquet writing splits data pages at fixed sizes, so a single inserted or deleted row causes all subsequent pages to shift — resulting in nearly every byte being re-uploaded to content-addressable storage (CAS) systems. CDC determines page boundaries via a rolling gearhash over column values, so unchanged data produces identical pages across different writes enabling storage cost reductions and faster upload times.
See more details in https://huggingface.co/blog/parquet-cdc
The original C++ implementation apache/arrow#45360
Evaluation tool https://github.com/huggingface/dataset-dedupe-estimator where I already integrated this PR to verify that deduplication effectiveness is on par with parquet-cpp (lower is better):
What changes are included in this PR?
parquet/src/column/chunker/ArrowColumnWriterCdcOptionsstruct (min_chunk_size,max_chunk_size,norm_level)repeated_ancestor_def_levelfield to for nested field values iterationAre these changes tested?
Yes — unit tests are located in
cdc.rsand ported from the C++ implementation.Are there any user-facing changes?
New experimental API, disabled by default — no behavior change for existing code: