Skip to content

feat(parquet): add content defined chunking for arrow writer#9450

Open
kszucs wants to merge 15 commits intoapache:mainfrom
kszucs:content-defined-chunking
Open

feat(parquet): add content defined chunking for arrow writer#9450
kszucs wants to merge 15 commits intoapache:mainfrom
kszucs:content-defined-chunking

Conversation

@kszucs
Copy link
Member

@kszucs kszucs commented Feb 20, 2026

Which issue does this PR close?

  • Closes #NNN.

Rationale for this change

Rust implementation of apache/arrow#45360

Traditional Parquet writing splits data pages at fixed sizes, so a single inserted or deleted row causes all subsequent pages to shift — resulting in nearly every byte being re-uploaded to content-addressable storage (CAS) systems. CDC determines page boundaries via a rolling gearhash over column values, so unchanged data produces identical pages across different writes enabling storage cost reductions and faster upload times.

See more details in https://huggingface.co/blog/parquet-cdc

The original C++ implementation apache/arrow#45360

Evaluation tool https://github.com/huggingface/dataset-dedupe-estimator where I already integrated this PR to verify that deduplication effectiveness is on par with parquet-cpp (lower is better):

image

What changes are included in this PR?

  • Content-defined chunker at parquet/src/column/chunker/
  • Arrow writer integration integrated in ArrowColumnWriter
  • Writer properties via CdcOptions struct (min_chunk_size, max_chunk_size, norm_level)
  • ColumnDescriptor: added repeated_ancestor_def_level field to for nested field values iteration

Are these changes tested?

Yes — unit tests are located in cdc.rs and ported from the C++ implementation.

Are there any user-facing changes?

New experimental API, disabled by default — no behavior change for existing code:

// Simple toggle (256 KiB min, 1 MiB max, norm_level 0)
let props = WriterProperties::builder()
    .set_content_defined_chunking(true)
    .build();

// Excpliti CDC parameters
let props = WriterProperties::builder()
    .set_cdc_options(CdcOptions { min_chunk_size: 128 * 1024, max_chunk_size: 512 * 1024, norm_level: 1 })
    .build();

@github-actions github-actions bot added the parquet Changes to the parquet crate label Feb 20, 2026
@kszucs kszucs marked this pull request as ready for review February 25, 2026 08:12
@kszucs kszucs requested review from alamb and etseidl February 25, 2026 11:19
let mut path: Vec<String> = vec![];
path.extend(path_so_far.iter().copied().map(String::from));
leaves.push(Arc::new(ColumnDescriptor::new(
let mut desc = ColumnDescriptor::new(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to break the API of ColumnDescriptor, so setting repeated_ancestor_def_level below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a new_with_repeated_ancestor, and perhaps change new to call that with the default value for repeated_ancestor_def_level.

Copy link
Member Author

@kszucs kszucs Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't necessarily need to store the codegen script in the repository. Alternatively we could just reference https://github.com/apache/arrow/blob/main/cpp/src/parquet/chunker_internal_generated.h as a source for cdc_generated.rs. Likely it won't be regenerated at all.

@kszucs
Copy link
Member Author

kszucs commented Mar 6, 2026

@alamb @etseidl could you please take a look? Let me know if you need extra context or if you have limited bandwidth.

@etseidl
Copy link
Contributor

etseidl commented Mar 6, 2026

Hi @kszucs. 👋 Apologies, I have been unusually bandwidth constrained lately. I will try to give this a good look in the next few days. Thank you for your patience 🙏 (and for adding this to arrow-rs).

@kszucs
Copy link
Member Author

kszucs commented Mar 6, 2026

Hi @etseidl! No worries, I really appreciate you taking the time to review!

Copy link
Contributor

@etseidl etseidl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing a few early observations/questions. Still need to do the deep dive.

let mut path: Vec<String> = vec![];
path.extend(path_so_far.iter().copied().map(String::from));
leaves.push(Arc::new(ColumnDescriptor::new(
let mut desc = ColumnDescriptor::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a new_with_repeated_ancestor, and perhaps change new to call that with the default value for repeated_ancestor_def_level.

Comment on lines +269 to +280
let cdc_chunkers = match props_ptr.content_defined_chunking() {
Some(opts) => {
let chunkers = file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()?;
Some(chunkers)
}
None => None,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let cdc_chunkers = match props_ptr.content_defined_chunking() {
Some(opts) => {
let chunkers = file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()?;
Some(chunkers)
}
None => None,
};
let cdc_chunkers = props_ptr.content_defined_chunking().map(|opts| {
file_writer
.schema_descr()
.columns()
.iter()
.map(|desc| ContentDefinedChunker::new(desc, opts))
.collect::<Result<Vec<_>>>()
}).transpose()?;

Can simplify this a bit.


/// A chunk of data with level and value offsets for record-shredded nested data.
#[derive(Debug, Clone, Copy)]
pub(crate) struct Chunk {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"chunk" is an overloaded term (I keep thinking column chunk). What do you think of changing this to CdcChunk?

Comment on lines +806 to +807
/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
pub(crate) fn slice_for_chunk(&self, chunk: &Chunk) -> Self {
Copy link
Contributor

@etseidl etseidl Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have trouble with calling this a "view" when it's actually allocating new vectors for the levels and non-null indices. I'm thinking out loud, but I wonder if we could create an actual ArrayLevelsView that uses proper slices of the underlying Vecs and pass that to write_internal.

Nevermind...I tried implementing this but it would require a ton of changes to the level handling. I guess just update the comment to say that copies of data will be made.


#[test]
fn test_slice_for_chunk_flat() {
// Required field (no levels): array [1..=6], slice values 2..5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Required field (no levels): array [1..=6], slice values 2..5
// Required field (no levels): array [1..=6], slice values 3..=5

? Trying to understand value_offset == 2

self.row_group_writer_factory
.create_row_group_writer(self.writer.flushed_row_groups().len())?,
),
x => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary? Or is it left over from earlier debugging?

Copy link
Contributor

@etseidl etseidl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more random comments...I'll get into the meat of the chunking tomorrow. Looking good so far!

None => return Ok(()),
};

let chunks = in_progress.close()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also curious about this change and below. Does the close have to happen before calling next_row_group?

Comment on lines +86 to +89
/// Note that the parquet writer has a related `data_page_size_limit` property that
/// controls the maximum size of a parquet data page after encoding. While setting
/// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect
/// the chunking effectiveness, it results in more small parquet data pages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Note that the parquet writer has a related `data_page_size_limit` property that
/// controls the maximum size of a parquet data page after encoding. While setting
/// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect
/// the chunking effectiveness, it results in more small parquet data pages.
/// Note that the parquet writer has a related [`data_page_size_limit`] property that
/// controls the maximum size of a parquet data page after encoding. While setting
/// `data_page_size_limit` to a smaller value than `max_chunk_size` doesn't affect
/// the chunking effectiveness, it results in more small parquet data pages.
///
/// [`data_page_size_limit`]: WriterPropertiesBuilder::set_data_page_size_limit

Comment on lines +108 to +109
min_chunk_size: 256 * 1024,
max_chunk_size: 1024 * 1024,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add constants for these above? 🙏

/// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled.
///
/// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`]
pub fn content_defined_chunking(&self) -> Option<&CdcOptions> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this only make sense as a global option, or would making it a per-column property be reasonable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants