Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ name = "arrow_writer"
required-features = ["arrow"]
path = "./tests/arrow_writer.rs"

[[test]]
name = "ieee754_nan_interop"
required-features = ["arrow"]
path = "./tests/ieee754_nan_interop.rs"

[[test]]
name = "encryption"
required-features = ["arrow"]
Expand Down
80 changes: 80 additions & 0 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,35 @@ where
Ok(array)
}

/// Extracts the NaN count statistics from an iterator
/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn nan_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
where
I: Iterator<Item = (usize, &'a ColumnIndexMetaData)>,
{
let chunks: Vec<_> = iterator.collect();
let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum();
let mut values = Vec::with_capacity(total_capacity);
let mut nulls = NullBufferBuilder::new(total_capacity);
for (len, index) in chunks {
match index.nan_counts() {
Some(counts) => {
values.extend(counts.iter().map(|&x| x as u64));
nulls.append_n_non_nulls(len);
}
None => {
values.resize(values.len() + len, 0);
nulls.append_n_nulls(len);
}
}
}
let null_buffer = nulls.build();
let array = UInt64Array::new(values.into(), null_buffer);
Ok(array)
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with
Expand Down Expand Up @@ -1647,6 +1676,28 @@ impl<'a> StatisticsConverter<'a> {
Ok(UInt64Array::from_iter(null_counts))
}

/// Extract the NaN counts from row group statistics in [`RowGroupMetaData`]
///
/// See docs on [`Self::row_group_mins`] for details
pub fn row_group_nan_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(std::iter::repeat_n(
None,
num_row_groups,
)));
};

let nan_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.and_then(|s| s.nan_count_opt()));
Ok(UInt64Array::from_iter(nan_counts))
}

/// Extract the minimum values from Data Page statistics.
///
/// In Parquet files, in addition to the Column Chunk level statistics
Expand Down Expand Up @@ -1786,6 +1837,35 @@ impl<'a> StatisticsConverter<'a> {
null_counts_page_statistics(iter)
}

/// Returns a [`UInt64Array`] with NaN counts for each data page.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_nan_counts<I>(
&self,
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a usize>,
{
let Some(parquet_index) = self.parquet_column_index else {
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::new_null(num_row_groups));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
nan_counts_page_statistics(iter)
}

/// Returns a [`UInt64Array`] with row counts for each data page.
///
/// This function iterates over the given row group indexes and computes
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl FallbackEncoder {
encoding,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
})
}
Expand Down Expand Up @@ -411,6 +412,7 @@ impl DictEncoder {
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
}
}
Expand Down
115 changes: 113 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ fn get_fsb_array_slice(
#[cfg(test)]
mod tests {
use super::*;
use std::cmp::Ordering;
use std::collections::HashMap;

use std::fs::File;
Expand Down Expand Up @@ -2918,10 +2919,120 @@ mod tests {
for column in row_group.columns() {
assert!(column.offset_index_offset().is_some());
assert!(column.offset_index_length().is_some());
assert!(column.column_index_offset().is_none());
assert!(column.column_index_length().is_none());
assert!(column.column_index_offset().is_some());
assert!(column.column_index_length().is_some());
}
}
assert!(file_meta_data.column_index().is_some());
if let Some(col_indexes) = file_meta_data.column_index() {
for rg_idx in col_indexes {
for idx in rg_idx {
assert!(idx.nan_counts().is_some());
let float_idx = match idx {
ColumnIndexMetaData::DOUBLE(idx) => idx,
_ => panic!("expected double statistics"),
};
for i in 0..idx.num_pages() as usize {
assert_eq!(float_idx.nan_count(i), Some(10));
assert_eq!(
f64::NAN.total_cmp(float_idx.min_value(i).unwrap()),
Ordering::Equal
);
assert_eq!(
f64::NAN.total_cmp(float_idx.max_value(i).unwrap()),
Ordering::Equal
);
}
}
}
}
}

#[test]
fn check_page_offset_index_with_mixed_nan() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col",
DataType::Float64,
true,
)]));

let mut out = Vec::with_capacity(1024);
let props = WriterProperties::builder()
.set_data_page_row_count_limit(10)
.build();
let mut writer = ArrowWriter::try_new(&mut out, schema.clone(), Some(props))
.expect("Unable to write file");

// write a page of all NaN (since batch min and max are NaN, global min/max are NaN)
let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a page of all -NaN (batch min/max is -NaN, should update global min to -NaN)
let values = Arc::new(Float64Array::from(vec![-f64::NAN; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a page of all 0 (non-NaN should override global min/max, now 0/0)
let values = Arc::new(Float64Array::from(vec![0_f64; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a mixed page (should now have min -1, max 1)
let values = Arc::new(Float64Array::from(vec![
-1.0,
0.0,
f64::NAN,
-f64::NAN,
1.0,
]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

let file_meta_data = writer.close().unwrap();

// check the column chunk stats are correct
let col_stats = file_meta_data
.row_group(0)
.column(0)
.statistics()
.expect("missing column chunk statistics");

assert_eq!(col_stats.nan_count_opt(), Some(22));
assert_eq!(col_stats.min_bytes_opt(), Some((-1.0f64).as_bytes()));
assert_eq!(col_stats.max_bytes_opt(), Some(1.0f64.as_bytes()));

assert!(file_meta_data.column_index().is_some());
let col_idx = &file_meta_data.column_index().as_ref().unwrap()[0][0];
assert_eq!(col_idx.num_pages(), 4);

// test each page
let float_idx = match col_idx {
ColumnIndexMetaData::DOUBLE(idx) => idx,
_ => panic!("expected double statistics"),
};

assert_eq!(float_idx.nan_counts, Some(vec![10, 10, 0, 2]));
assert_eq!(
f64::NAN.total_cmp(float_idx.min_value(0).unwrap()),
Ordering::Equal
);
assert_eq!(
f64::NAN.total_cmp(float_idx.max_value(0).unwrap()),
Ordering::Equal
);
assert_eq!(
(-f64::NAN).total_cmp(float_idx.min_value(1).unwrap()),
Ordering::Equal
);
assert_eq!(
(-f64::NAN).total_cmp(float_idx.max_value(1).unwrap()),
Ordering::Equal
);
assert_eq!(float_idx.min_value(2), Some(&0.0));
assert_eq!(float_idx.max_value(2), Some(&0.0));
assert_eq!(float_idx.min_value(3), Some(&-1.0));
assert_eq!(float_idx.max_value(3), Some(&1.0));
}

#[test]
Expand Down
Loading
Loading