diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index efcd1fe2190b..2032991d06ce 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -170,6 +170,11 @@ name = "arrow_writer" required-features = ["arrow"] path = "./tests/arrow_writer.rs" +[[test]] +name = "ieee754_nan_interop" +required-features = ["arrow"] +path = "./tests/ieee754_nan_interop.rs" + [[test]] name = "encryption" required-features = ["arrow"] diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 19d3e34f5243..4faaaa517dcf 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1335,6 +1335,35 @@ where Ok(array) } +/// Extracts the NaN count statistics from an iterator +/// of parquet page [`ColumnIndexMetaData`]'s to an [`ArrayRef`] +/// +/// The returned Array is an [`UInt64Array`] +pub(crate) fn nan_counts_page_statistics<'a, I>(iterator: I) -> Result +where + I: Iterator, +{ + let chunks: Vec<_> = iterator.collect(); + let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum(); + let mut values = Vec::with_capacity(total_capacity); + let mut nulls = NullBufferBuilder::new(total_capacity); + for (len, index) in chunks { + match index.nan_counts() { + Some(counts) => { + values.extend(counts.iter().map(|&x| x as u64)); + nulls.append_n_non_nulls(len); + } + None => { + values.resize(values.len() + len, 0); + nulls.append_n_nulls(len); + } + } + } + let null_buffer = nulls.build(); + let array = UInt64Array::new(values.into(), null_buffer); + Ok(array) +} + /// Extracts Parquet statistics as Arrow arrays /// /// This is used to convert Parquet statistics to Arrow [`ArrayRef`], with @@ -1647,6 +1676,28 @@ impl<'a> StatisticsConverter<'a> { Ok(UInt64Array::from_iter(null_counts)) } + /// Extract the NaN counts from row group statistics in [`RowGroupMetaData`] + /// + /// See docs on [`Self::row_group_mins`] for details + pub fn row_group_nan_counts(&self, metadatas: I) -> Result + where + I: IntoIterator, + { + let Some(parquet_index) = self.parquet_column_index else { + let num_row_groups = metadatas.into_iter().count(); + return Ok(UInt64Array::from_iter(std::iter::repeat_n( + None, + num_row_groups, + ))); + }; + + let nan_counts = metadatas + .into_iter() + .map(|x| x.column(parquet_index).statistics()) + .map(|s| s.and_then(|s| s.nan_count_opt())); + Ok(UInt64Array::from_iter(nan_counts)) + } + /// Extract the minimum values from Data Page statistics. /// /// In Parquet files, in addition to the Column Chunk level statistics @@ -1786,6 +1837,35 @@ impl<'a> StatisticsConverter<'a> { null_counts_page_statistics(iter) } + /// Returns a [`UInt64Array`] with NaN counts for each data page. + /// + /// See docs on [`Self::data_page_mins`] for details. + pub fn data_page_nan_counts( + &self, + column_page_index: &ParquetColumnIndex, + column_offset_index: &ParquetOffsetIndex, + row_group_indices: I, + ) -> Result + where + I: IntoIterator, + { + let Some(parquet_index) = self.parquet_column_index else { + let num_row_groups = row_group_indices.into_iter().count(); + return Ok(UInt64Array::new_null(num_row_groups)); + }; + + let iter = row_group_indices.into_iter().map(|rg_index| { + let column_page_index_per_row_group_per_column = + &column_page_index[*rg_index][parquet_index]; + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); + + (*num_data_pages, column_page_index_per_row_group_per_column) + }); + nan_counts_page_statistics(iter) + } + /// Returns a [`UInt64Array`] with row counts for each data page. /// /// This function iterates over the given row group indexes and computes diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 228d229b3088..650bfe114a87 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -289,6 +289,7 @@ impl FallbackEncoder { encoding, min_value, max_value, + nan_count: None, variable_length_bytes, }) } @@ -411,6 +412,7 @@ impl DictEncoder { encoding: Encoding::RLE_DICTIONARY, min_value, max_value, + nan_count: None, variable_length_bytes, } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2ef71d5745a2..0d4bc6532f24 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1663,6 +1663,7 @@ fn get_fsb_array_slice( #[cfg(test)] mod tests { use super::*; + use std::cmp::Ordering; use std::collections::HashMap; use std::fs::File; @@ -2918,10 +2919,120 @@ mod tests { for column in row_group.columns() { assert!(column.offset_index_offset().is_some()); assert!(column.offset_index_length().is_some()); - assert!(column.column_index_offset().is_none()); - assert!(column.column_index_length().is_none()); + assert!(column.column_index_offset().is_some()); + assert!(column.column_index_length().is_some()); } } + assert!(file_meta_data.column_index().is_some()); + if let Some(col_indexes) = file_meta_data.column_index() { + for rg_idx in col_indexes { + for idx in rg_idx { + assert!(idx.nan_counts().is_some()); + let float_idx = match idx { + ColumnIndexMetaData::DOUBLE(idx) => idx, + _ => panic!("expected double statistics"), + }; + for i in 0..idx.num_pages() as usize { + assert_eq!(float_idx.nan_count(i), Some(10)); + assert_eq!( + f64::NAN.total_cmp(float_idx.min_value(i).unwrap()), + Ordering::Equal + ); + assert_eq!( + f64::NAN.total_cmp(float_idx.max_value(i).unwrap()), + Ordering::Equal + ); + } + } + } + } + } + + #[test] + fn check_page_offset_index_with_mixed_nan() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + DataType::Float64, + true, + )])); + + let mut out = Vec::with_capacity(1024); + let props = WriterProperties::builder() + .set_data_page_row_count_limit(10) + .build(); + let mut writer = ArrowWriter::try_new(&mut out, schema.clone(), Some(props)) + .expect("Unable to write file"); + + // write a page of all NaN (since batch min and max are NaN, global min/max are NaN) + let values = Arc::new(Float64Array::from(vec![f64::NAN; 10])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + writer.write(&batch).unwrap(); + + // write a page of all -NaN (batch min/max is -NaN, should update global min to -NaN) + let values = Arc::new(Float64Array::from(vec![-f64::NAN; 10])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + writer.write(&batch).unwrap(); + + // write a page of all 0 (non-NaN should override global min/max, now 0/0) + let values = Arc::new(Float64Array::from(vec![0_f64; 10])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + writer.write(&batch).unwrap(); + + // write a mixed page (should now have min -1, max 1) + let values = Arc::new(Float64Array::from(vec![ + -1.0, + 0.0, + f64::NAN, + -f64::NAN, + 1.0, + ])); + let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap(); + writer.write(&batch).unwrap(); + + let file_meta_data = writer.close().unwrap(); + + // check the column chunk stats are correct + let col_stats = file_meta_data + .row_group(0) + .column(0) + .statistics() + .expect("missing column chunk statistics"); + + assert_eq!(col_stats.nan_count_opt(), Some(22)); + assert_eq!(col_stats.min_bytes_opt(), Some((-1.0f64).as_bytes())); + assert_eq!(col_stats.max_bytes_opt(), Some(1.0f64.as_bytes())); + + assert!(file_meta_data.column_index().is_some()); + let col_idx = &file_meta_data.column_index().as_ref().unwrap()[0][0]; + assert_eq!(col_idx.num_pages(), 4); + + // test each page + let float_idx = match col_idx { + ColumnIndexMetaData::DOUBLE(idx) => idx, + _ => panic!("expected double statistics"), + }; + + assert_eq!(float_idx.nan_counts, Some(vec![10, 10, 0, 2])); + assert_eq!( + f64::NAN.total_cmp(float_idx.min_value(0).unwrap()), + Ordering::Equal + ); + assert_eq!( + f64::NAN.total_cmp(float_idx.max_value(0).unwrap()), + Ordering::Equal + ); + assert_eq!( + (-f64::NAN).total_cmp(float_idx.min_value(1).unwrap()), + Ordering::Equal + ); + assert_eq!( + (-f64::NAN).total_cmp(float_idx.max_value(1).unwrap()), + Ordering::Equal + ); + assert_eq!(float_idx.min_value(2), Some(&0.0)); + assert_eq!(float_idx.max_value(2), Some(&0.0)); + assert_eq!(float_idx.min_value(3), Some(&-1.0)); + assert_eq!(float_idx.max_value(3), Some(&1.0)); } #[test] diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index ba8ffc2e92c3..22c84532595c 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -1159,6 +1159,8 @@ pub enum SortOrder { UNSIGNED, /// Comparison is undefined. UNDEFINED, + /// Use IEEE 754 total order. + TOTAL_ORDER, } impl SortOrder { @@ -1179,6 +1181,8 @@ pub enum ColumnOrder { /// Column uses the order defined by its logical or physical type /// (if there is no logical type), parquet-format 2.4.0+. TYPE_DEFINED_ORDER(SortOrder), + /// Column ordering to use for floating point types. + IEEE_754_TOTAL_ORDER, // The following are not defined in the Parquet spec and should always be last. /// Undefined column order, means legacy behaviour before parquet-format 2.4.0. /// Sort order is always SIGNED. @@ -1199,14 +1203,36 @@ impl ColumnOrder { converted_type: ConvertedType, physical_type: Type, ) -> SortOrder { - Self::sort_order_for_type(logical_type.as_ref(), converted_type, physical_type) + Self::column_order_for_type(logical_type.as_ref(), converted_type, physical_type) + .sort_order() + } + + /// Returns the `ColumnOrder` for a physical/logical type. + pub fn column_order_for_type( + logical_type: Option<&LogicalType>, + converted_type: ConvertedType, + physical_type: Type, + ) -> ColumnOrder { + if Some(&LogicalType::Float16) == logical_type + || matches!(physical_type, Type::FLOAT | Type::DOUBLE) + { + ColumnOrder::IEEE_754_TOTAL_ORDER + } else { + let sort_order = + Self::sort_order_for_type(logical_type, converted_type, physical_type, true); + ColumnOrder::TYPE_DEFINED_ORDER(sort_order) + } } /// Returns sort order for a physical/logical type. + /// + /// `is_type_defined` indicates whether the column order for this type is + /// [`ColumnOrder::TYPE_DEFINED_ORDER`]. pub fn sort_order_for_type( logical_type: Option<&LogicalType>, converted_type: ConvertedType, physical_type: Type, + is_type_defined: bool, ) -> SortOrder { match logical_type { Some(logical) => match logical { @@ -1224,18 +1250,28 @@ impl ColumnOrder { LogicalType::Timestamp { .. } => SortOrder::SIGNED, LogicalType::Unknown => SortOrder::UNDEFINED, LogicalType::Uuid => SortOrder::UNSIGNED, - LogicalType::Float16 => SortOrder::SIGNED, + LogicalType::Float16 => { + if is_type_defined { + SortOrder::SIGNED + } else { + SortOrder::TOTAL_ORDER + } + } LogicalType::Variant { .. } | LogicalType::Geometry { .. } | LogicalType::Geography { .. } | LogicalType::_Unknown { .. } => SortOrder::UNDEFINED, }, // Fall back to converted type - None => Self::get_converted_sort_order(converted_type, physical_type), + None => Self::get_converted_sort_order(converted_type, physical_type, is_type_defined), } } - fn get_converted_sort_order(converted_type: ConvertedType, physical_type: Type) -> SortOrder { + fn get_converted_sort_order( + converted_type: ConvertedType, + physical_type: Type, + is_type_defined: bool, + ) -> SortOrder { match converted_type { // Unsigned byte-wise comparison. ConvertedType::UTF8 @@ -1270,24 +1306,35 @@ impl ColumnOrder { } // Fall back to physical type. - ConvertedType::NONE => Self::get_default_sort_order(physical_type), + ConvertedType::NONE => Self::get_default_sort_order(physical_type, is_type_defined), } } /// Returns default sort order based on physical type. - fn get_default_sort_order(physical_type: Type) -> SortOrder { + fn get_default_sort_order(physical_type: Type, is_type_defined: bool) -> SortOrder { match physical_type { // Order: false, true Type::BOOLEAN => SortOrder::UNSIGNED, Type::INT32 | Type::INT64 => SortOrder::SIGNED, Type::INT96 => SortOrder::UNDEFINED, // Notes to remember when comparing float/double values: - // If the min is a NaN, it should be ignored. - // If the max is a NaN, it should be ignored. - // If the min is +0, the row group may contain -0 values as well. - // If the max is -0, the row group may contain +0 values as well. - // When looking for NaN values, min and max should be ignored. - Type::FLOAT | Type::DOUBLE => SortOrder::SIGNED, + // If legacy TYPE_DEFINED_ORDER is specified: + // If the min is a NaN, it should be ignored. + // If the max is a NaN, it should be ignored. + // If the min is +0, the row group may contain -0 values as well. + // If the max is -0, the row group may contain +0 values as well. + // When looking for NaN values, min and max should be ignored. + // If IEEE_754_TOTAL_ORDER: + // Examine nan_count to see if NaNs are present. + // If min/max are NaN, that means only NaNs are present. + // If min/max are not NaN, they are ordered according to total order. + Type::FLOAT | Type::DOUBLE => { + if is_type_defined { + SortOrder::SIGNED + } else { + SortOrder::TOTAL_ORDER + } + } // Unsigned byte-wise comparison Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => SortOrder::UNSIGNED, } @@ -1297,6 +1344,7 @@ impl ColumnOrder { pub fn sort_order(&self) -> SortOrder { match *self { ColumnOrder::TYPE_DEFINED_ORDER(order) => order, + ColumnOrder::IEEE_754_TOTAL_ORDER => SortOrder::TOTAL_ORDER, ColumnOrder::UNDEFINED => SortOrder::SIGNED, ColumnOrder::UNKNOWN => SortOrder::UNDEFINED, } @@ -1315,6 +1363,10 @@ impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder { prot.skip_empty_struct()?; Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED) } + 2 => { + prot.skip_empty_struct()?; + Self::IEEE_754_TOTAL_ORDER + } _ => { prot.skip(field_ident.field_type)?; Self::UNKNOWN @@ -1339,6 +1391,10 @@ impl WriteThrift for ColumnOrder { writer.write_field_begin(FieldType::Struct, 1, 0)?; writer.write_struct_end()?; } + Self::IEEE_754_TOTAL_ORDER => { + writer.write_field_begin(FieldType::Struct, 2, 0)?; + writer.write_struct_end()?; + } _ => return Err(general_err!("Attempt to write undefined ColumnOrder")), } // write end of struct for this union @@ -2181,6 +2237,7 @@ mod tests { assert_eq!(SortOrder::SIGNED.to_string(), "SIGNED"); assert_eq!(SortOrder::UNSIGNED.to_string(), "UNSIGNED"); assert_eq!(SortOrder::UNDEFINED.to_string(), "UNDEFINED"); + assert_eq!(SortOrder::TOTAL_ORDER.to_string(), "TOTAL_ORDER"); } #[test] @@ -2197,6 +2254,10 @@ mod tests { ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).to_string(), "TYPE_DEFINED_ORDER(UNDEFINED)" ); + assert_eq!( + ColumnOrder::IEEE_754_TOTAL_ORDER.to_string(), + "IEEE_754_TOTAL_ORDER" + ); assert_eq!(ColumnOrder::UNDEFINED.to_string(), "UNDEFINED"); } @@ -2213,7 +2274,12 @@ mod tests { fn check_sort_order(types: Vec, expected_order: SortOrder) { for tpe in types { assert_eq!( - ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY), + ColumnOrder::column_order_for_type( + Some(&tpe), + ConvertedType::NONE, + Type::BYTE_ARRAY + ) + .sort_order(), expected_order ); } @@ -2292,10 +2358,12 @@ mod tests { is_adjusted_to_u_t_c: true, unit: TimeUnit::NANOS, }, - LogicalType::Float16, ]; check_sort_order(signed, SortOrder::SIGNED); + let float = vec![LogicalType::Float16]; + check_sort_order(float, SortOrder::TOTAL_ORDER); + // Undefined comparison let undefined = vec![ LogicalType::List, @@ -2316,7 +2384,7 @@ mod tests { fn check_sort_order(types: Vec, expected_order: SortOrder) { for tpe in types { assert_eq!( - ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY), + ColumnOrder::column_order_for_type(None, tpe, Type::BYTE_ARRAY).sort_order(), expected_order ); } @@ -2368,35 +2436,43 @@ mod tests { fn test_column_order_get_default_sort_order() { // Comparison based on physical type assert_eq!( - ColumnOrder::get_default_sort_order(Type::BOOLEAN), + ColumnOrder::get_default_sort_order(Type::BOOLEAN, true), SortOrder::UNSIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::INT32), + ColumnOrder::get_default_sort_order(Type::INT32, true), SortOrder::SIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::INT64), + ColumnOrder::get_default_sort_order(Type::INT64, true), SortOrder::SIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::INT96), + ColumnOrder::get_default_sort_order(Type::INT96, true), SortOrder::UNDEFINED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::FLOAT), + ColumnOrder::get_default_sort_order(Type::FLOAT, false), + SortOrder::TOTAL_ORDER + ); + assert_eq!( + ColumnOrder::get_default_sort_order(Type::DOUBLE, false), + SortOrder::TOTAL_ORDER + ); + assert_eq!( + ColumnOrder::get_default_sort_order(Type::FLOAT, true), SortOrder::SIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::DOUBLE), + ColumnOrder::get_default_sort_order(Type::DOUBLE, true), SortOrder::SIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::BYTE_ARRAY), + ColumnOrder::get_default_sort_order(Type::BYTE_ARRAY, true), SortOrder::UNSIGNED ); assert_eq!( - ColumnOrder::get_default_sort_order(Type::FIXED_LEN_BYTE_ARRAY), + ColumnOrder::get_default_sort_order(Type::FIXED_LEN_BYTE_ARRAY, true), SortOrder::UNSIGNED ); } @@ -2415,6 +2491,10 @@ mod tests { ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED).sort_order(), SortOrder::UNDEFINED ); + assert_eq!( + ColumnOrder::IEEE_754_TOTAL_ORDER.sort_order(), + SortOrder::TOTAL_ORDER + ); assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED); } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 11d4f3142a20..bb0107965cc1 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -16,7 +16,6 @@ // under the License. use bytes::Bytes; -use half::f16; use crate::basic::{ConvertedType, Encoding, LogicalType, Type}; use crate::bloom_filter::Sbbf; @@ -65,6 +64,7 @@ pub struct DataPageValues { pub encoding: Encoding, pub min_value: Option, pub max_value: Option, + pub nan_count: Option, pub variable_length_bytes: Option, } @@ -137,12 +137,18 @@ pub struct ColumnValueEncoderImpl { statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, + nan_count: Option, bloom_filter: Option, variable_length_bytes: Option, geo_stats_accumulator: Option>, } impl ColumnValueEncoderImpl { + fn is_floating_point_column(&self) -> bool { + matches!(self.descr.physical_type(), Type::FLOAT | Type::DOUBLE) + || self.descr.logical_type_ref() == Some(&LogicalType::Float16) + } + fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> { match value_indices { Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), @@ -155,6 +161,12 @@ impl ColumnValueEncoderImpl { // INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them && self.descr.converted_type() != ConvertedType::INTERVAL { + // Count NaN values for floating point types + if self.is_floating_point_column() { + let nan_count = slice.iter().filter(|v| is_nan(&self.descr, *v)).count() as u64; + *self.nan_count.get_or_insert(0) += nan_count; + } + if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() { update_geo_stats_accumulator(accumulator, slice.iter()); } else if let Some((min, max)) = self.min_max(slice, None) { @@ -221,6 +233,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { bloom_filter, min_value: None, max_value: None, + nan_count: None, variable_length_bytes: None, geo_stats_accumulator, }) @@ -316,6 +329,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { num_values: std::mem::take(&mut self.num_values), min_value: self.min_value.take(), max_value: self.max_value.take(), + nan_count: self.nan_count.take(), variable_length_bytes: self.variable_length_bytes.take(), }) } @@ -325,24 +339,35 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } +// Get min and max values for all values in `iter`. +// +// For floating point we need to compare NaN values until we encounter a non-NaN +// value which then becomes the new min/max. After this, only non-NaN values are +// evaluated. If all values are NaN, then the min/max NaNs as determined by +// IEEE 754 total order are returned. fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)> where T: ParquetValueType + 'a, I: Iterator, { - let first = loop { - let next = iter.next()?; - if !is_nan(descr, next) { - break next; - } - }; + let first = iter.next()?; + let mut min_max_nan = is_nan(descr, first); let mut min = first; let mut max = first; for val in iter { - if is_nan(descr, val) { + // skip NaNs if we've encounter non-NaN + if !min_max_nan && is_nan(descr, val) { continue; } + // if min/max are NaN, check for non-NaN and reset + if min_max_nan && !is_nan(descr, val) { + min = val; + max = val; + min_max_nan = false; + continue; + } + // both are NaN or non-NaN, so do the comparison if compare_greater(descr, min, val) { min = val; } @@ -351,37 +376,7 @@ where } } - // Float/Double statistics have special case for zero. - // - // If computed min is zero, whether negative or positive, - // the spec states that the min should be written as -0.0 - // (negative zero) - // - // For max, it has similar logic but will be written as 0.0 - // (positive zero) - let min = replace_zero(min, descr, -0.0); - let max = replace_zero(max, descr, 0.0); - - Some((min, max)) -} - -#[inline] -fn replace_zero(val: &T, descr: &ColumnDescriptor, replace: f32) -> T { - match T::PHYSICAL_TYPE { - Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => { - T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap() - } - Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => { - T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap() - } - Type::FIXED_LEN_BYTE_ARRAY - if descr.logical_type_ref() == Some(LogicalType::Float16).as_ref() - && f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO => - { - T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap() - } - _ => val.clone(), - } + Some((min.clone(), max.clone())) } fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 46f90d3f7762..c419fb0dcdb4 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -23,6 +23,7 @@ use half::f16; use crate::bloom_filter::Sbbf; use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::page_index::offset_index::OffsetIndexMetaData; +use std::cmp::Ordering; use std::collections::{BTreeSet, VecDeque}; use std::str; @@ -211,6 +212,7 @@ struct PageMetrics { num_buffered_values: u32, num_buffered_rows: u32, num_page_nulls: u64, + num_page_nans: Option, repetition_level_histogram: Option, definition_level_histogram: Option, } @@ -238,6 +240,7 @@ impl PageMetrics { self.num_buffered_values = 0; self.num_buffered_rows = 0; self.num_page_nulls = 0; + self.num_page_nans = None; self.repetition_level_histogram .as_mut() .map(LevelHistogram::reset); @@ -274,6 +277,7 @@ struct ColumnMetrics { min_column_value: Option, max_column_value: Option, num_column_nulls: u64, + num_column_nans: Option, column_distinct_count: Option, variable_length_bytes: Option, repetition_level_histogram: Option, @@ -794,17 +798,31 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { page_statistics: Option<&ValueStatistics>, page_variable_length_bytes: Option, ) { + // Determine if this is a floating-point column + let is_float_column = matches!(self.descr.physical_type(), Type::FLOAT | Type::DOUBLE) + || (self.descr.physical_type() == Type::FIXED_LEN_BYTE_ARRAY + && self.descr.logical_type_ref() == Some(&LogicalType::Float16)); + // update the column index let null_page = (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; // a page contains only null values, // and writers have to set the corresponding entries in min_values and max_values to byte[0] if null_page && self.column_index_builder.valid() { + // For float columns, always provide Some(n), even if n is 0 + // For non-float columns, always provide None + let nan_count = if is_float_column { + Some(self.page_metrics.num_page_nans.unwrap_or(0) as i64) + } else { + None + }; + self.column_index_builder.append( null_page, vec![], vec![], self.page_metrics.num_page_nulls as i64, + nan_count, ); } else if self.column_index_builder.valid() { // from page statistics @@ -838,6 +856,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone())); + // For float columns, always provide Some(n), even if n is 0 + // For non-float columns, always provide None + let nan_count = if is_float_column { + Some(stat.nan_count_opt().unwrap_or(0) as i64) + } else { + None + }; + if self.can_truncate_value() { self.column_index_builder.append( null_page, @@ -852,6 +878,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) .0, self.page_metrics.num_page_nulls as i64, + nan_count, ); } else { self.column_index_builder.append( @@ -859,6 +886,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { stat.min_bytes_opt().unwrap().to_vec(), stat.max_bytes_opt().unwrap().to_vec(), self.page_metrics.num_page_nulls as i64, + nan_count, ); } } @@ -1027,6 +1055,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; + if let Some(nan_count) = values_data.nan_count { + *self.column_metrics.num_column_nans.get_or_insert(0) += nan_count; + self.page_metrics.num_page_nans = Some(nan_count); + } + let page_statistics = match (values_data.min_value, values_data.max_value) { (Some(min), Some(max)) => { // Update chunk level statistics @@ -1040,7 +1073,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None, Some(self.page_metrics.num_page_nulls), false, - ), + ) + .with_nan_count(values_data.nan_count), ) } _ => None, @@ -1212,6 +1246,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Some(self.column_metrics.num_column_nulls), false, ) + .with_nan_count(self.column_metrics.num_column_nans) .with_backwards_compatible_min_max(backwards_compatible_min_max) .into(); @@ -1360,11 +1395,39 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } fn update_min(descr: &ColumnDescriptor, val: &T, min: &mut Option) { - update_stat::(descr, val, min, |cur| compare_greater(descr, cur, val)) + if min.is_none() { + *min = Some(val.clone()); + } else { + // safe to unwrap min since we've already tested for None + let is_min_nan = is_nan(descr, min.as_ref().unwrap()); + let is_val_nan = is_nan(descr, val); + match (is_min_nan, is_val_nan) { + // current min is not NaN, but incoming is NaN: skip + (false, true) => {} + // current min is NaN, but incoming is not: assign val to min + (true, false) => *min = Some(val.clone()), + // both NaN or non-NaN, safe to call update_stat() + _ => update_stat::(val, min, |cur| compare_greater(descr, cur, val)), + } + } } fn update_max(descr: &ColumnDescriptor, val: &T, max: &mut Option) { - update_stat::(descr, val, max, |cur| compare_greater(descr, val, cur)) + if max.is_none() { + *max = Some(val.clone()); + } else { + // safe to unwrap max since we've already tested for None + let is_max_nan = is_nan(descr, max.as_ref().unwrap()); + let is_val_nan = is_nan(descr, val); + match (is_max_nan, is_val_nan) { + // current max is not NaN, but incoming is NaN: skip + (false, true) => {} + // current max is NaN, but incoming is not: assign val to max + (true, false) => *max = Some(val.clone()), + // both NaN or non-NaN, safe to call update_stat() + _ => update_stat::(val, max, |cur| compare_greater(descr, val, cur)), + } + } } #[inline] @@ -1381,23 +1444,15 @@ fn is_nan(descr: &ColumnDescriptor, val: &T) -> bool { } } -/// Perform a conditional update of `cur`, skipping any NaN values +/// Perform a conditional update of `cur` /// -/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with -/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` -fn update_stat( - descr: &ColumnDescriptor, - val: &T, - cur: &mut Option, - should_update: F, -) where +/// Calls `should_update` with the value of `cur`, and updates `cur` to `Some(val)` if it +/// returns `true`. `cur` must not be `None` or this will panic. +fn update_stat(val: &T, cur: &mut Option, should_update: F) +where F: Fn(&T) -> bool, { - if is_nan(descr, val) { - return; - } - - if cur.as_ref().is_none_or(should_update) { + if should_update(cur.as_ref().unwrap()) { *cur = Some(val.clone()); } } @@ -1405,6 +1460,16 @@ fn update_stat( /// Evaluate `a > b` according to underlying logical type. fn compare_greater(descr: &ColumnDescriptor, a: &T, b: &T) -> bool { match T::PHYSICAL_TYPE { + Type::FLOAT => { + let a = f32::from_le_bytes(a.as_bytes().try_into().unwrap()); + let b = f32::from_le_bytes(b.as_bytes().try_into().unwrap()); + return a.total_cmp(&b) == Ordering::Greater; + } + Type::DOUBLE => { + let a = f64::from_le_bytes(a.as_bytes().try_into().unwrap()); + let b = f64::from_le_bytes(b.as_bytes().try_into().unwrap()); + return a.total_cmp(&b) == Ordering::Greater; + } Type::INT32 | Type::INT64 => { if let Some(LogicalType::Integer { is_signed: false, .. @@ -1482,7 +1547,7 @@ fn compare_greater_unsigned_int(a: &T, b: &T) -> bool { fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool { let a = f16::from_le_bytes(a.try_into().unwrap()); let b = f16::from_le_bytes(b.try_into().unwrap()); - a > b + a.total_cmp(&b) == Ordering::Greater } /// Signed comparison of bytes arrays @@ -2572,7 +2637,7 @@ mod tests { #[test] fn test_float_statistics() { let stats = statistics_roundtrip::(&[-1.0, 3.0, -2.0, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-2.0); assert_eq!(stats.max_opt().unwrap(), &3.0); @@ -2584,7 +2649,7 @@ mod tests { #[test] fn test_double_statistics() { let stats = statistics_roundtrip::(&[-1.0, 3.0, -2.0, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-2.0); assert_eq!(stats.max_opt().unwrap(), &3.0); @@ -2629,6 +2694,139 @@ mod tests { } } + #[test] + fn test_ieee754_total_order_float() { + // Test IEEE 754 total order for f32 + // Order should be: -NaN < -Inf < -1.0 < -0.0 < +0.0 < 1.0 < +Inf < +NaN + let neg_nan = f32::from_bits(0xffc00000); + let neg_inf = f32::NEG_INFINITY; + let neg_one = -1.0_f32; + let neg_zero = -0.0_f32; + let pos_zero = 0.0_f32; + let pos_one = 1.0_f32; + let pos_inf = f32::INFINITY; + let pos_nan = f32::from_bits(0x7fc00000); + + let values = vec![ + pos_nan, neg_zero, pos_inf, neg_one, neg_nan, pos_one, neg_inf, pos_zero, + ]; + + let stats = statistics_roundtrip::(&values); + if let Statistics::Float(stats) = stats { + // With IEEE 754 total order, min should be -NaN, max should be +NaN + // But since we filter out NaN values, min should be -Inf, max should be +Inf + assert_eq!(stats.min_opt().unwrap(), &neg_inf); + assert_eq!(stats.max_opt().unwrap(), &pos_inf); + assert_eq!(stats.nan_count_opt(), Some(2)); // neg_nan and pos_nan + } else { + panic!("Expected float statistics"); + } + } + + #[test] + fn test_ieee754_total_order_float_only_nan() { + // Test IEEE 754 total order for f32 + // Order should be: -NaN < -Inf < -1.0 < -0.0 < +0.0 < 1.0 < +Inf < +NaN + let neg_nan1 = f32::from_bits(0xffc00000); + let neg_nan2 = f32::from_bits(0xffc00001); + let neg_nan3 = f32::from_bits(0xffc00002); + let pos_nan1 = f32::from_bits(0x7fc00000); + let pos_nan2 = f32::from_bits(0x7fc00001); + let pos_nan3 = f32::from_bits(0x7fc00002); + + let values = vec![neg_nan1, neg_nan2, neg_nan3, pos_nan1, pos_nan2, pos_nan3]; + + let stats = statistics_roundtrip::(&values); + if let Statistics::Float(stats) = stats { + // With IEEE 754 total order, min should be -NaN, max should be +NaN + // But since we filter out NaN values, min should be -Inf, max should be +Inf + assert_eq!( + stats.min_opt().unwrap().total_cmp(&neg_nan3), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&pos_nan3), + Ordering::Equal + ); + assert_eq!(stats.nan_count_opt(), Some(6)); + } else { + panic!("Expected float statistics"); + } + } + + #[test] + fn test_ieee754_total_order_double() { + // Test IEEE 754 total order for f64 + let neg_nan = f64::from_bits(0xfff8000000000000); + let neg_inf = f64::NEG_INFINITY; + let neg_one = -1.0_f64; + let neg_zero = -0.0_f64; + let pos_zero = 0.0_f64; + let pos_one = 1.0_f64; + let pos_inf = f64::INFINITY; + let pos_nan = f64::from_bits(0x7ff8000000000000); + + let values = vec![ + pos_nan, neg_zero, pos_inf, neg_one, neg_nan, pos_one, neg_inf, pos_zero, + ]; + + let stats = statistics_roundtrip::(&values); + if let Statistics::Double(stats) = stats { + // With IEEE 754 total order, and NaN filtering + assert_eq!(stats.min_opt().unwrap(), &neg_inf); + assert_eq!(stats.max_opt().unwrap(), &pos_inf); + assert_eq!(stats.nan_count_opt(), Some(2)); + } else { + panic!("Expected double statistics"); + } + } + + #[test] + fn test_ieee754_total_order_double_only_nan() { + // Test IEEE 754 total order for f64 + // Order should be: -NaN < -Inf < -1.0 < -0.0 < +0.0 < 1.0 < +Inf < +NaN + let neg_nan1 = f64::from_bits(0xfff8000000000000); + let neg_nan2 = f64::from_bits(0xfff8000000000001); + let neg_nan3 = f64::from_bits(0xfff8000000000002); + let pos_nan1 = f64::from_bits(0x7ff8000000000000); + let pos_nan2 = f64::from_bits(0x7ff8000000000001); + let pos_nan3 = f64::from_bits(0x7ff8000000000002); + + let values = vec![neg_nan1, neg_nan2, neg_nan3, pos_nan1, pos_nan2, pos_nan3]; + + let stats = statistics_roundtrip::(&values); + if let Statistics::Double(stats) = stats { + // With IEEE 754 total order, min should be -NaN, max should be +NaN + // But since we filter out NaN values, min should be -Inf, max should be +Inf + assert_eq!( + stats.min_opt().unwrap().total_cmp(&neg_nan3), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&pos_nan3), + Ordering::Equal + ); + assert_eq!(stats.nan_count_opt(), Some(6)); + } else { + panic!("Expected float statistics"); + } + } + + #[test] + fn test_ieee754_total_order_zeros() { + // Test that -0.0 and +0.0 are handled correctly + let values = vec![-0.0_f32, 0.0_f32, -0.0_f32, 0.0_f32]; + + let stats = statistics_roundtrip::(&values); + if let Statistics::Float(stats) = stats { + // With IEEE 754 total order, -0.0 < +0.0 + assert_eq!(stats.min_opt().unwrap().to_bits(), (-0.0_f32).to_bits()); + assert_eq!(stats.max_opt().unwrap().to_bits(), 0.0_f32.to_bits()); + } else { + panic!("Expected float statistics"); + } + } + #[test] fn test_column_writer_check_float16_min_max() { let input = [ @@ -2642,7 +2840,7 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!( stats.min_opt().unwrap(), &ByteArray::from(-f16::from_f32(2.0)) @@ -2661,12 +2859,13 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( stats.max_opt().unwrap(), &ByteArray::from(f16::ONE + f16::ONE) ); + assert_eq!(stats.nan_count_opt(), Some(1)); } #[test] @@ -2677,12 +2876,13 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( stats.max_opt().unwrap(), &ByteArray::from(f16::ONE + f16::ONE) ); + assert_eq!(stats.nan_count_opt(), Some(1)); } #[test] @@ -2693,12 +2893,13 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( stats.max_opt().unwrap(), &ByteArray::from(f16::ONE + f16::ONE) ); + assert_eq!(stats.nan_count_opt(), Some(1)); } #[test] @@ -2709,9 +2910,16 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.min_bytes_opt().is_none()); - assert!(stats.max_bytes_opt().is_none()); - assert!(stats.is_min_max_backwards_compatible()); + assert_eq!( + stats.min_bytes_opt(), + Some(ByteArray::from(f16::NAN).as_bytes()) + ); + assert_eq!( + stats.max_bytes_opt(), + Some(ByteArray::from(f16::NAN).as_bytes()) + ); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.nan_count_opt(), Some(2)); } #[test] @@ -2722,8 +2930,8 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); - assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ZERO)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); } @@ -2735,9 +2943,9 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); - assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); } #[test] @@ -2748,8 +2956,8 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); - assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ZERO)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI)); } @@ -2761,18 +2969,19 @@ mod tests { .collect::>(); let stats = float16_statistics_roundtrip(&input); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI)); - assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); } #[test] fn test_float_statistics_nan_middle() { let stats = statistics_roundtrip::(&[1.0, f32::NAN, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &1.0); assert_eq!(stats.max_opt().unwrap(), &2.0); + assert_eq!(stats.nan_count_opt(), Some(1)) } else { panic!("expecting Statistics::Float"); } @@ -2781,10 +2990,11 @@ mod tests { #[test] fn test_float_statistics_nan_start() { let stats = statistics_roundtrip::(&[f32::NAN, 1.0, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &1.0); assert_eq!(stats.max_opt().unwrap(), &2.0); + assert_eq!(stats.nan_count_opt(), Some(1)) } else { panic!("expecting Statistics::Float"); } @@ -2793,19 +3003,20 @@ mod tests { #[test] fn test_float_statistics_nan_only() { let stats = statistics_roundtrip::(&[f32::NAN, f32::NAN]); - assert!(stats.min_bytes_opt().is_none()); - assert!(stats.max_bytes_opt().is_none()); - assert!(stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_bytes_opt(), Some(f32::NAN.as_bytes())); + assert_eq!(stats.max_bytes_opt(), Some(f32::NAN.as_bytes())); + assert_eq!(stats.nan_count_opt(), Some(2)); + assert!(!stats.is_min_max_backwards_compatible()); assert!(matches!(stats, Statistics::Float(_))); } #[test] fn test_float_statistics_zero_only() { let stats = statistics_roundtrip::(&[0.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { - assert_eq!(stats.min_opt().unwrap(), &-0.0); - assert!(stats.min_opt().unwrap().is_sign_negative()); + assert_eq!(stats.min_opt().unwrap(), &0.0); + assert!(stats.min_opt().unwrap().is_sign_positive()); assert_eq!(stats.max_opt().unwrap(), &0.0); assert!(stats.max_opt().unwrap().is_sign_positive()); } else { @@ -2816,12 +3027,12 @@ mod tests { #[test] fn test_float_statistics_neg_zero_only() { let stats = statistics_roundtrip::(&[-0.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-0.0); assert!(stats.min_opt().unwrap().is_sign_negative()); - assert_eq!(stats.max_opt().unwrap(), &0.0); - assert!(stats.max_opt().unwrap().is_sign_positive()); + assert_eq!(stats.max_opt().unwrap(), &-0.0); + assert!(stats.max_opt().unwrap().is_sign_negative()); } else { panic!("expecting Statistics::Float"); } @@ -2830,10 +3041,10 @@ mod tests { #[test] fn test_float_statistics_zero_min() { let stats = statistics_roundtrip::(&[0.0, 1.0, f32::NAN, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { - assert_eq!(stats.min_opt().unwrap(), &-0.0); - assert!(stats.min_opt().unwrap().is_sign_negative()); + assert_eq!(stats.min_opt().unwrap(), &0.0); + assert!(stats.min_opt().unwrap().is_sign_positive()); assert_eq!(stats.max_opt().unwrap(), &2.0); } else { panic!("expecting Statistics::Float"); @@ -2843,11 +3054,11 @@ mod tests { #[test] fn test_float_statistics_neg_zero_max() { let stats = statistics_roundtrip::(&[-0.0, -1.0, f32::NAN, -2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Float(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-2.0); - assert_eq!(stats.max_opt().unwrap(), &0.0); - assert!(stats.max_opt().unwrap().is_sign_positive()); + assert_eq!(stats.max_opt().unwrap(), &-0.0); + assert!(stats.max_opt().unwrap().is_sign_negative()); } else { panic!("expecting Statistics::Float"); } @@ -2856,10 +3067,11 @@ mod tests { #[test] fn test_double_statistics_nan_middle() { let stats = statistics_roundtrip::(&[1.0, f64::NAN, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &1.0); assert_eq!(stats.max_opt().unwrap(), &2.0); + assert_eq!(stats.nan_count_opt(), Some(1)) } else { panic!("expecting Statistics::Double"); } @@ -2868,10 +3080,11 @@ mod tests { #[test] fn test_double_statistics_nan_start() { let stats = statistics_roundtrip::(&[f64::NAN, 1.0, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &1.0); assert_eq!(stats.max_opt().unwrap(), &2.0); + assert_eq!(stats.nan_count_opt(), Some(1)) } else { panic!("expecting Statistics::Double"); } @@ -2880,19 +3093,20 @@ mod tests { #[test] fn test_double_statistics_nan_only() { let stats = statistics_roundtrip::(&[f64::NAN, f64::NAN]); - assert!(stats.min_bytes_opt().is_none()); - assert!(stats.max_bytes_opt().is_none()); + assert_eq!(stats.min_bytes_opt(), Some(f64::NAN.as_bytes())); + assert_eq!(stats.max_bytes_opt(), Some(f64::NAN.as_bytes())); + assert_eq!(stats.nan_count_opt(), Some(2)); assert!(matches!(stats, Statistics::Double(_))); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); } #[test] fn test_double_statistics_zero_only() { let stats = statistics_roundtrip::(&[0.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { - assert_eq!(stats.min_opt().unwrap(), &-0.0); - assert!(stats.min_opt().unwrap().is_sign_negative()); + assert_eq!(stats.min_opt().unwrap(), &0.0); + assert!(stats.min_opt().unwrap().is_sign_positive()); assert_eq!(stats.max_opt().unwrap(), &0.0); assert!(stats.max_opt().unwrap().is_sign_positive()); } else { @@ -2903,12 +3117,12 @@ mod tests { #[test] fn test_double_statistics_neg_zero_only() { let stats = statistics_roundtrip::(&[-0.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-0.0); assert!(stats.min_opt().unwrap().is_sign_negative()); - assert_eq!(stats.max_opt().unwrap(), &0.0); - assert!(stats.max_opt().unwrap().is_sign_positive()); + assert_eq!(stats.max_opt().unwrap(), &-0.0); + assert!(stats.max_opt().unwrap().is_sign_negative()); } else { panic!("expecting Statistics::Double"); } @@ -2917,10 +3131,10 @@ mod tests { #[test] fn test_double_statistics_zero_min() { let stats = statistics_roundtrip::(&[0.0, 1.0, f64::NAN, 2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { - assert_eq!(stats.min_opt().unwrap(), &-0.0); - assert!(stats.min_opt().unwrap().is_sign_negative()); + assert_eq!(stats.min_opt().unwrap(), &0.0); + assert!(stats.min_opt().unwrap().is_sign_positive()); assert_eq!(stats.max_opt().unwrap(), &2.0); } else { panic!("expecting Statistics::Double"); @@ -2930,11 +3144,11 @@ mod tests { #[test] fn test_double_statistics_neg_zero_max() { let stats = statistics_roundtrip::(&[-0.0, -1.0, f64::NAN, -2.0]); - assert!(stats.is_min_max_backwards_compatible()); + assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!(stats.min_opt().unwrap(), &-2.0); - assert_eq!(stats.max_opt().unwrap(), &0.0); - assert!(stats.max_opt().unwrap().is_sign_positive()); + assert_eq!(stats.max_opt().unwrap(), &-0.0); + assert!(stats.max_opt().unwrap().is_sign_negative()); } else { panic!("expecting Statistics::Double"); } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 9304b6c25a2b..681556dde5ba 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1438,6 +1438,7 @@ pub struct ColumnIndexBuilder { min_values: Vec>, max_values: Vec>, null_counts: Vec, + nan_counts: Vec>, boundary_order: BoundaryOrder, /// contains the concatenation of the histograms of all pages repetition_level_histograms: Option>, @@ -1462,6 +1463,7 @@ impl ColumnIndexBuilder { min_values: Vec::new(), max_values: Vec::new(), null_counts: Vec::new(), + nan_counts: Vec::new(), boundary_order: BoundaryOrder::UNORDERED, repetition_level_histograms: None, definition_level_histograms: None, @@ -1470,17 +1472,24 @@ impl ColumnIndexBuilder { } /// Append statistics for the next page + /// + /// For floating-point columns (FLOAT, DOUBLE, or FLOAT16), `nan_count` must always + /// be `Some(n)`, even if n is 0. For non-floating-point columns, `nan_count` must + /// always be `None`. This requirement ensures correct serialization according to + /// the Parquet specification. pub fn append( &mut self, null_page: bool, min_value: Vec, max_value: Vec, null_count: i64, + nan_count: Option, ) { self.null_pages.push(null_page); self.min_values.push(min_value); self.max_values.push(max_value); self.null_counts.push(null_count); + self.nan_counts.push(nan_count); } /// Append the given page-level histograms to the [`ColumnIndex`] histograms. @@ -1528,51 +1537,79 @@ impl ColumnIndexBuilder { pub fn build(self) -> Result { Ok(match self.column_type { Type::BOOLEAN => { - let index = self.build_page_index()?; + let index = self.build_page_index(false)?; ColumnIndexMetaData::BOOLEAN(index) } Type::INT32 => { - let index = self.build_page_index()?; + let index = self.build_page_index(false)?; ColumnIndexMetaData::INT32(index) } Type::INT64 => { - let index = self.build_page_index()?; + let index = self.build_page_index(false)?; ColumnIndexMetaData::INT64(index) } Type::INT96 => { - let index = self.build_page_index()?; + let index = self.build_page_index(false)?; ColumnIndexMetaData::INT96(index) } Type::FLOAT => { - let index = self.build_page_index()?; + let index = self.build_page_index(true)?; ColumnIndexMetaData::FLOAT(index) } Type::DOUBLE => { - let index = self.build_page_index()?; + let index = self.build_page_index(true)?; ColumnIndexMetaData::DOUBLE(index) } Type::BYTE_ARRAY => { - let index = self.build_byte_array_index()?; + let index = self.build_byte_array_index(false)?; ColumnIndexMetaData::BYTE_ARRAY(index) } Type::FIXED_LEN_BYTE_ARRAY => { - let index = self.build_byte_array_index()?; + let index = self.build_byte_array_index(true)?; ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) } }) } - fn build_page_index(self) -> Result> + fn build_nan_counts(nan_counts: &[Option]) -> Option> { + let has_some = nan_counts.iter().any(|x| x.is_some()); + let has_none = nan_counts.iter().any(|x| x.is_none()); + + if has_some && !has_none { + Some(nan_counts.iter().map(|x| x.unwrap()).collect()) + } else if !has_some && has_none { + None + } else { + debug_assert!( + false, + "Mixed Some/None in nan_counts - caller should provide consistent values" + ); + Some(nan_counts.iter().map(|x| x.unwrap_or(0)).collect()) + } + } + + fn build_page_index(self, may_have_nan: bool) -> Result> where T: ParquetValueType, { let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect(); let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect(); + // Parquet spec requires nan_counts to be either present for all pages or absent entirely. + // Callers must ensure consistency: + // - For floating-point columns: all pages must have Some(n) + // - For non-floating-point columns: all pages must have None + let nan_counts = if may_have_nan && !self.nan_counts.is_empty() { + Self::build_nan_counts(&self.nan_counts) + } else { + None + }; + PrimitiveColumnIndex::try_new( self.null_pages, self.boundary_order, Some(self.null_counts), + nan_counts, self.repetition_level_histograms, self.definition_level_histograms, min_values, @@ -1580,14 +1617,25 @@ impl ColumnIndexBuilder { ) } - fn build_byte_array_index(self) -> Result { + fn build_byte_array_index(self, may_have_nan: bool) -> Result { let min_values: Vec<&[u8]> = self.min_values.iter().map(|v| v.as_slice()).collect(); let max_values: Vec<&[u8]> = self.max_values.iter().map(|v| v.as_slice()).collect(); + // Parquet spec requires nan_counts to be either present for all pages or absent entirely. + // Callers must ensure consistency: + // - For floating-point columns: all pages must have Some(n) + // - For non-floating-point columns: all pages must have None + let nan_counts = if may_have_nan && !self.nan_counts.is_empty() { + Self::build_nan_counts(&self.nan_counts) + } else { + None + }; + ByteArrayColumnIndex::try_new( self.null_pages, self.boundary_order, Some(self.null_counts), + nan_counts, self.repetition_level_histograms, self.definition_level_histograms, min_values, @@ -2019,14 +2067,14 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let base_expected_size = 2766; + let base_expected_size = 2830; #[cfg(feature = "encryption")] - let base_expected_size = 2934; + let base_expected_size = 2998; assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(Type::BOOLEAN); - column_index.append(false, vec![1u8], vec![2u8, 3u8], 4); + column_index.append(false, vec![1u8], vec![2u8, 3u8], 4, None); let column_index = column_index.build().unwrap(); let native_index = match column_index { ColumnIndexMetaData::BOOLEAN(index) => index, @@ -2050,9 +2098,9 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let bigger_expected_size = 3192; + let bigger_expected_size = 3280; #[cfg(feature = "encryption")] - let bigger_expected_size = 3360; + let bigger_expected_size = 3448; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); @@ -2099,7 +2147,7 @@ mod tests { .set_row_groups(row_group_meta.clone()) .build(); - let base_expected_size = 2058; + let base_expected_size = 2090; assert_eq!(parquet_meta_data.memory_size(), base_expected_size); let footer_key = "0123456789012345".as_bytes(); @@ -2125,7 +2173,7 @@ mod tests { .set_file_decryptor(Some(decryptor)) .build(); - let expected_size_with_decryptor = 3072; + let expected_size_with_decryptor = 3104; assert!(expected_size_with_decryptor > base_expected_size); assert_eq!( diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 88cb96f35555..1e6ee29d9a18 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -113,6 +113,7 @@ struct Statistics<'a> { 6: optional binary<'a> min_value; 7: optional bool is_max_value_exact; 8: optional bool is_min_value_exact; + 9: optional i64 nan_count; } ); @@ -207,6 +208,8 @@ fn convert_stats( .transpose()?; // Generic distinct count (count of distinct values occurring) let distinct_count = stats.distinct_count.map(|value| value as u64); + // Generic nan count for floating point types + let nan_count = stats.nan_count.map(|value| value as u64); // Whether or not statistics use deprecated min/max fields. let old_format = stats.min_value.is_none() && stats.max_value.is_none(); // Generic min value as bytes. @@ -287,19 +290,25 @@ fn convert_stats( }; FStatistics::int96(min, max, distinct_count, null_count, old_format) } - Type::FLOAT => FStatistics::float( - min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), - max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), - distinct_count, - null_count, - old_format, + Type::FLOAT => FStatistics::Float( + ValueStatistics::new( + min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ) + .with_nan_count(nan_count), ), - Type::DOUBLE => FStatistics::double( - min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), - max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), - distinct_count, - null_count, - old_format, + Type::DOUBLE => FStatistics::Double( + ValueStatistics::new( + min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ) + .with_nan_count(nan_count), ), Type::BYTE_ARRAY => FStatistics::ByteArray( ValueStatistics::new( @@ -320,6 +329,7 @@ fn convert_stats( null_count, old_format, ) + .with_nan_count(nan_count) .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), @@ -873,6 +883,7 @@ pub(crate) fn parquet_metadata_from_bytes( column.logical_type_ref(), column.converted_type(), column.physical_type(), + true, ); cos[i] = ColumnOrder::TYPE_DEFINED_ORDER(sort_order); } @@ -990,6 +1001,7 @@ pub(crate) struct PageStatistics { 6: optional binary min_value; 7: optional bool is_max_value_exact; 8: optional bool is_min_value_exact; + 9: optional i64 nan_count; } ); @@ -1852,6 +1864,7 @@ pub(crate) mod tests { min_value: None, is_max_value_exact: None, is_min_value_exact: None, + nan_count: None, }; let decoded_none = super::convert_stats(&column_descr, Some(none_null_count)) .unwrap() @@ -1867,6 +1880,7 @@ pub(crate) mod tests { min_value: None, is_max_value_exact: None, is_min_value_exact: None, + nan_count: None, }; let decoded_zero = super::convert_stats(&column_descr, Some(zero_null_count)) .unwrap() diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 275b4ff28e56..143e9c52e73b 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -204,21 +204,16 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { let offset_indexes = self.finalize_offset_indexes()?; // We only include ColumnOrder for leaf nodes. - // Currently only supported ColumnOrder is TypeDefinedOrder so we set this - // for all leaf nodes. - // Even if the column has an undefined sort order, such as INTERVAL, this - // is still technically the defined TYPEORDER so it should still be set. let column_orders = self .schema_descr .columns() .iter() .map(|col| { - let sort_order = ColumnOrder::sort_order_for_type( + ColumnOrder::column_order_for_type( col.logical_type_ref(), col.converted_type(), col.physical_type(), - ); - ColumnOrder::TYPE_DEFINED_ORDER(sort_order) + ) }) .collect(); diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index d0050443323a..1d9f5cdbb4b4 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -43,6 +43,7 @@ pub struct ColumnIndex { pub(crate) null_counts: Option>, pub(crate) repetition_level_histograms: Option>, pub(crate) definition_level_histograms: Option>, + pub(crate) nan_counts: Option>, } impl ColumnIndex { @@ -58,6 +59,13 @@ impl ColumnIndex { self.null_counts.as_ref().map(|nc| nc[idx]) } + /// Returns the number of NaN values in the page indexed by `idx` + /// + /// Returns `None` if no NaN counts have been set in the index + pub fn nan_count(&self, idx: usize) -> Option { + self.nan_counts.as_ref().map(|nc| nc[idx]) + } + /// Returns the repetition level histogram for the page indexed by `idx` pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> { if let Some(rep_hists) = self.repetition_level_histograms.as_ref() { @@ -95,10 +103,12 @@ pub struct PrimitiveColumnIndex { } impl PrimitiveColumnIndex { + #[allow(clippy::too_many_arguments)] pub(crate) fn try_new( null_pages: Vec, boundary_order: BoundaryOrder, null_counts: Option>, + nan_counts: Option>, repetition_level_histograms: Option>, definition_level_histograms: Option>, min_bytes: Vec<&[u8]>, @@ -130,6 +140,7 @@ impl PrimitiveColumnIndex { null_counts, repetition_level_histograms, definition_level_histograms, + nan_counts, }, min_values, max_values, @@ -141,6 +152,7 @@ impl PrimitiveColumnIndex { index.null_pages, index.boundary_order, index.null_counts, + index.nan_counts, index.repetition_level_histograms, index.definition_level_histograms, index.min_values, @@ -263,11 +275,18 @@ impl WriteThrift for PrimitiveColumnIndex { .write_thrift_field(writer, 6, last_field_id)?; } if self.definition_level_histograms.is_some() { - self.definition_level_histograms + last_field_id = self + .definition_level_histograms .as_ref() .unwrap() .write_thrift_field(writer, 7, last_field_id)?; } + if self.nan_counts.is_some() { + self.nan_counts + .as_ref() + .unwrap() + .write_thrift_field(writer, 8, last_field_id)?; + } writer.write_struct_end() } } @@ -284,10 +303,12 @@ pub struct ByteArrayColumnIndex { } impl ByteArrayColumnIndex { + #[allow(clippy::too_many_arguments)] pub(crate) fn try_new( null_pages: Vec, boundary_order: BoundaryOrder, null_counts: Option>, + nan_counts: Option>, repetition_level_histograms: Option>, definition_level_histograms: Option>, min_values: Vec<&[u8]>, @@ -333,6 +354,7 @@ impl ByteArrayColumnIndex { null_pages, boundary_order, null_counts, + nan_counts, repetition_level_histograms, definition_level_histograms, }, @@ -348,6 +370,7 @@ impl ByteArrayColumnIndex { index.null_pages, index.boundary_order, index.null_counts, + index.nan_counts, index.repetition_level_histograms, index.definition_level_histograms, index.min_values, @@ -442,11 +465,18 @@ impl WriteThrift for ByteArrayColumnIndex { .write_thrift_field(writer, 6, last_field_id)?; } if self.definition_level_histograms.is_some() { - self.definition_level_histograms + last_field_id = self + .definition_level_histograms .as_ref() .unwrap() .write_thrift_field(writer, 7, last_field_id)?; } + if self.nan_counts.is_some() { + self.nan_counts + .as_ref() + .unwrap() + .write_thrift_field(writer, 8, last_field_id)?; + } writer.write_struct_end() } } @@ -563,6 +593,23 @@ impl ColumnIndexMetaData { } } + /// Returns array of NaN counts, one per page. + /// + /// Returns `None` if now null counts have been set in the index + pub fn nan_counts(&self) -> Option<&Vec> { + match self { + Self::NONE => None, + Self::BOOLEAN(index) => index.nan_counts.as_ref(), + Self::INT32(index) => index.nan_counts.as_ref(), + Self::INT64(index) => index.nan_counts.as_ref(), + Self::INT96(index) => index.nan_counts.as_ref(), + Self::FLOAT(index) => index.nan_counts.as_ref(), + Self::DOUBLE(index) => index.nan_counts.as_ref(), + Self::BYTE_ARRAY(index) => index.nan_counts.as_ref(), + Self::FIXED_LEN_BYTE_ARRAY(index) => index.nan_counts.as_ref(), + } + } + /// Returns the number of pages pub fn num_pages(&self) -> u64 { colidx_enum_func!(self, num_pages) @@ -575,6 +622,13 @@ impl ColumnIndexMetaData { colidx_enum_func!(self, null_count, idx) } + /// Returns the number of NaN values in the page indexed by `idx` + /// + /// Returns `None` if no null counts have been set in the index + pub fn nan_count(&self, idx: usize) -> Option { + colidx_enum_func!(self, nan_count, idx) + } + /// Returns the repetition level histogram for the page indexed by `idx` pub fn repetition_level_histogram(&self, idx: usize) -> Option<&[i64]> { colidx_enum_func!(self, repetition_level_histogram, idx) @@ -676,6 +730,7 @@ mod tests { null_pages: vec![false], boundary_order: BoundaryOrder::ASCENDING, null_counts: Some(vec![0]), + nan_counts: None, repetition_level_histograms: Some(vec![1, 2]), definition_level_histograms: Some(vec![1, 2, 3]), }, @@ -700,6 +755,7 @@ mod tests { null_pages: vec![true], boundary_order: BoundaryOrder::ASCENDING, null_counts: Some(vec![1]), + nan_counts: None, repetition_level_histograms: None, definition_level_histograms: Some(vec![1, 0]), }, @@ -727,6 +783,7 @@ mod tests { &[], // this shouldn't be empty as null_pages[1] is false ], null_counts: None, + nan_counts: None, repetition_level_histograms: None, definition_level_histograms: None, boundary_order: BoundaryOrder::UNORDERED, diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index ce7fc5fbaac5..0f2a56ae2b8f 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -159,6 +159,7 @@ pub(super) struct ThriftColumnIndex<'a> { 5: optional list null_counts 6: optional list repetition_level_histograms; 7: optional list definition_level_histograms; + 8: optional list nan_counts } ); diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 9682fd54b8df..44a7dce52e48 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -139,6 +139,8 @@ pub(crate) fn from_thrift_page_stats( .transpose()?; // Generic distinct count (count of distinct values occurring) let distinct_count = stats.distinct_count.map(|value| value as u64); + // Generic nan count for floating point types + let nan_count = stats.nan_count.map(|value| value as u64); // Whether or not statistics use deprecated min/max fields. let old_format = stats.min_value.is_none() && stats.max_value.is_none(); // Generic min value as bytes. @@ -230,19 +232,29 @@ pub(crate) fn from_thrift_page_stats( }; Statistics::int96(min, max, distinct_count, null_count, old_format) } - Type::FLOAT => Statistics::float( - min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), - max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), - distinct_count, - null_count, - old_format, + Type::FLOAT => Statistics::Float( + ValueStatistics::new( + min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ) + .with_nan_count(nan_count) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), - Type::DOUBLE => Statistics::double( - min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), - max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), - distinct_count, - null_count, - old_format, + Type::DOUBLE => Statistics::Double( + ValueStatistics::new( + min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ) + .with_nan_count(nan_count) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), Type::BYTE_ARRAY => Statistics::ByteArray( ValueStatistics::new( @@ -263,6 +275,12 @@ pub(crate) fn from_thrift_page_stats( null_count, old_format, ) + // Note: We set nan_count here even though we can't verify if this is Float16. + // The spec says nan_count should only be set for Float16 logical type, + // but this function doesn't have access to logical type information. + // Writers should only set nan_count for Float16, and readers should + // handle this gracefully. + .with_nan_count(nan_count) .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), ), @@ -288,6 +306,11 @@ pub(crate) fn page_stats_to_thrift(stats: Option<&Statistics>) -> Option) -> Option Option { + statistics_enum_func![self, nan_count_opt] + } + /// Returns `true` if the min value is set, and is an exact min value. pub fn min_is_exact(&self) -> bool { statistics_enum_func![self, min_is_exact] @@ -511,6 +540,8 @@ pub struct ValueStatistics { // Distinct count could be omitted in some cases distinct_count: Option, null_count: Option, + // NaN count for floating point types + nan_count: Option, // Whether or not the min or max values are exact, or truncated. is_max_value_exact: bool, @@ -541,6 +572,7 @@ impl ValueStatistics { max, distinct_count, null_count, + nan_count: None, is_min_max_deprecated, is_min_max_backwards_compatible: is_min_max_deprecated, } @@ -580,6 +612,16 @@ impl ValueStatistics { } } + /// Returns NaN count for floating point types. + pub fn nan_count_opt(&self) -> Option { + self.nan_count + } + + /// Set the NaN count for floating point types. + pub fn with_nan_count(self, nan_count: Option) -> Self { + Self { nan_count, ..self } + } + /// Returns min value of the statistics, if known. pub fn min_opt(&self) -> Option<&T> { self.min.as_ref() @@ -698,6 +740,8 @@ impl fmt::Debug for ValueStatistics { #[cfg(test)] mod tests { + use core::f32; + use super::*; #[test] @@ -729,6 +773,7 @@ mod tests { min_value: None, is_max_value_exact: None, is_min_value_exact: None, + nan_count: None, }; from_thrift_page_stats(Type::INT32, Some(thrift_stats)).unwrap(); @@ -1050,6 +1095,7 @@ mod tests { min_value: None, is_max_value_exact: None, is_min_value_exact: None, + nan_count: None, }; let err = from_thrift_page_stats(Type::BOOLEAN, Some(tstatistics)).unwrap_err(); assert_eq!( @@ -1103,6 +1149,7 @@ mod tests { min_value: None, is_max_value_exact: None, is_min_value_exact: None, + nan_count: None, }; let err = from_thrift_page_stats(Type::INT96, Some(thrift_stats.clone())).unwrap_err(); @@ -1141,4 +1188,141 @@ mod tests { _ => unreachable!(), } } + + #[test] + fn test_nan_count_float() { + // Test NaN count for f32 + let stats = Statistics::Float( + ValueStatistics::new(Some(1.0_f32), Some(5.0_f32), None, Some(0), false) + .with_nan_count(Some(3)), + ); + + assert_eq!(stats.nan_count_opt(), Some(3)); + + // Verify round-trip through thrift + let thrift_stats = page_stats_to_thrift(Some(&stats)).unwrap(); + assert_eq!(thrift_stats.nan_count, Some(3)); + + let round_tripped = from_thrift_page_stats(Type::FLOAT, Some(thrift_stats)) + .unwrap() + .unwrap(); + assert_eq!(round_tripped.nan_count_opt(), Some(3)); + } + + #[test] + fn test_nan_count_double() { + // Test NaN count for f64 + let stats = Statistics::Double( + ValueStatistics::new(Some(1.0_f64), Some(5.0_f64), None, Some(0), false) + .with_nan_count(Some(5)), + ); + + assert_eq!(stats.nan_count_opt(), Some(5)); + + // Verify round-trip through thrift + let thrift_stats = page_stats_to_thrift(Some(&stats)).unwrap(); + assert_eq!(thrift_stats.nan_count, Some(5)); + + let round_tripped = from_thrift_page_stats(Type::DOUBLE, Some(thrift_stats)) + .unwrap() + .unwrap(); + assert_eq!(round_tripped.nan_count_opt(), Some(5)); + } + + #[test] + fn test_nan_count_none_for_non_float() { + // NaN count should not be set for non-floating point types + let stats = Statistics::int32(Some(1), Some(100), None, Some(0), false); + assert_eq!(stats.nan_count_opt(), None); + + let thrift_stats = page_stats_to_thrift(Some(&stats)).unwrap(); + assert_eq!(thrift_stats.nan_count, None); + } + + #[test] + fn test_nan_count_backwards_compatible() { + // Test that missing nan_count field is handled correctly + let thrift_stats = PageStatistics { + min: None, + max: None, + min_value: Some(vec![0, 0, 0, 0]), // 0.0_f32 in bytes + max_value: Some(vec![0, 0, 128, 63]), // 1.0_f32 in bytes + null_count: Some(0), + distinct_count: None, + nan_count: None, // Not set + is_min_value_exact: None, + is_max_value_exact: None, + }; + + let stats = from_thrift_page_stats(Type::FLOAT, Some(thrift_stats)) + .unwrap() + .unwrap(); + + // nan_count should be None when not provided + assert_eq!(stats.nan_count_opt(), None); + } + + #[test] + fn test_statistics_with_nan_min_max() { + // Test that when there are only NaN values, min/max are NaN + let stats = Statistics::Float( + ValueStatistics::new( + Some(f32::NAN), // min and max should have NaN values + Some(f32::NAN), + None, + Some(0), + false, + ) + .with_nan_count(Some(10)), // All values are NaN + ); + + assert_eq!(stats.min_bytes_opt(), Some(f32::NAN.as_bytes())); + assert_eq!(stats.max_bytes_opt(), Some(f32::NAN.as_bytes())); + assert_eq!(stats.nan_count_opt(), Some(10)); + + // Verify serialization handles this case + let thrift_stats = page_stats_to_thrift(Some(&stats)).unwrap(); + assert_eq!(thrift_stats.min_value, Some(f32::NAN.as_bytes().to_vec())); + assert_eq!(thrift_stats.max_value, Some(f32::NAN.as_bytes().to_vec())); + assert_eq!(thrift_stats.nan_count, Some(10)); + } + + #[test] + fn test_nan_count_too_large() { + // Test that nan_count larger than i64::MAX is not serialized + let stats = Statistics::Float( + ValueStatistics::new(Some(1.0_f32), Some(2.0_f32), None, Some(0), false) + .with_nan_count(Some(u64::MAX)), + ); + + let thrift_stats = page_stats_to_thrift(Some(&stats)).unwrap(); + // u64::MAX can't fit in i64, so it should be None + assert_eq!(thrift_stats.nan_count, None); + } + + #[test] + fn test_nan_counts_in_column_index() { + // Test that nan_counts are properly collected in page index + use crate::file::metadata::ColumnIndexBuilder; + + // Test for floating-point column - all pages must have Some(n) + let mut float_builder = ColumnIndexBuilder::new(Type::FLOAT); + float_builder.append(false, vec![0u8; 4], vec![255u8; 4], 0, Some(5)); + float_builder.append(false, vec![0u8; 4], vec![255u8; 4], 2, Some(3)); + float_builder.append(false, vec![0u8; 4], vec![255u8; 4], 0, Some(0)); // No NaN but still Some(0) + + let float_column_index = float_builder.build().unwrap(); + // Verify nan_counts field is properly set for float column + assert_eq!(float_column_index.nan_counts(), Some(&vec![5, 3, 0])); + + // Test for non-floating-point column - all pages must have None + let mut int_builder = ColumnIndexBuilder::new(Type::INT32); + int_builder.append(false, vec![0u8; 4], vec![255u8; 4], 0, None); + int_builder.append(false, vec![0u8; 4], vec![255u8; 4], 2, None); + int_builder.append(false, vec![0u8; 4], vec![255u8; 4], 0, None); + + let int_column_index = int_builder.build().unwrap(); + // Verify nan_counts field is None for non-float column + assert_eq!(int_column_index.nan_counts(), None); + } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7d69904451d3..2cfbf1a91ebf 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1226,7 +1226,7 @@ mod tests { // INTERVAL ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED), // Float16 - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::IEEE_754_TOTAL_ORDER, // String ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), ]; diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 2925557e7b86..c169d02c510d 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -995,13 +995,24 @@ impl ColumnDescriptor { } } - /// Returns the sort order for this column + /// Returns the sort order for this column as currently defined for the logical or + /// physical type. + /// + /// Returns `SortOrder::UNDEFINED` for non-primitive types. pub fn sort_order(&self) -> SortOrder { - ColumnOrder::sort_order_for_type( - self.logical_type_ref(), - self.converted_type(), - self.physical_type(), - ) + match self.primitive_type.as_ref() { + Type::PrimitiveType { + basic_info, + physical_type, + .. + } => ColumnOrder::column_order_for_type( + basic_info.logical_type_ref(), + basic_info.converted_type(), + *physical_type, + ) + .sort_order(), + _ => SortOrder::UNDEFINED, + } } } diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 4f7ddcff4ad1..c2d0ad8ba76d 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -781,7 +781,7 @@ async fn test_float_16() { expected_min: Arc::new(Float16Array::from(vec![ f16::from_f32(-5.), f16::from_f32(-4.), - f16::from_f32(-0.), + f16::from_f32(0.), f16::from_f32(5.), ])), // maxes are [-1, 0, 4, 9] @@ -817,7 +817,7 @@ async fn test_float_32() { Test { reader: &reader, // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Float32Array::from(vec![-5., -4., -0., 5.0])), + expected_min: Arc::new(Float32Array::from(vec![-5., -4., 0., 5.0])), // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])), // nulls are [0, 0, 0, 0] @@ -846,7 +846,7 @@ async fn test_float_64() { Test { reader: &reader, // mins are [-5, -4, 0, 5] - expected_min: Arc::new(Float64Array::from(vec![-5., -4., -0., 5.0])), + expected_min: Arc::new(Float64Array::from(vec![-5., -4., 0., 5.0])), // maxes are [-1, 0, 4, 9] expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])), // nulls are [0, 0, 0, 0] @@ -1897,7 +1897,7 @@ async fn test_float64() { Test { reader: &reader, - expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])), + expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, 0.0, 5.0])), expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), @@ -1925,7 +1925,7 @@ async fn test_float16() { Test { reader: &reader, expected_min: Arc::new(Float16Array::from( - vec![-5.0, -4.0, -0.0, 5.0] + vec![-5.0, -4.0, 0.0, 5.0] .into_iter() .map(f16::from_f32) .collect::>(), diff --git a/parquet/tests/ieee754_nan_interop.rs b/parquet/tests/ieee754_nan_interop.rs new file mode 100644 index 000000000000..cc3326dae555 --- /dev/null +++ b/parquet/tests/ieee754_nan_interop.rs @@ -0,0 +1,448 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Interoperability test for https://github.com/apache/parquet-format/pull/514. +//! Demonstrate reading NaN statstics and counts from a file generated with +//! parquet-java, and show that on write we produce the same statistics. + +use bytes::Bytes; +use core::f32; +use half::f16; +use std::{path::PathBuf, sync::Arc}; + +use arrow::util::test_util::parquet_test_data; +use arrow_array::{Array, Float16Array, Float32Array, Float64Array, RecordBatch, UInt64Array}; +use arrow_schema::{DataType, Field, Schema}; +use parquet::{ + arrow::{ + ArrowWriter, + arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions, statistics::StatisticsConverter}, + }, + errors::Result, + file::{metadata::ParquetMetaData, properties::WriterProperties}, + schema::types::SchemaDescriptor, +}; + +const NAN_COUNTS: [u64; 5] = [0, 4, 10, 0, 0]; + +const FLOAT_NEG_NAN_SMALL: f32 = f32::from_bits(0xffffffff); +const FLOAT_NEG_NAN_LARGE: f32 = f32::from_bits(0xfff00001); +const FLOAT_NAN_SMALL: f32 = f32::from_bits(0x7fc00001); +const FLOAT_NAN_LARGE: f32 = f32::from_bits(0x7fffffff); + +const FLOAT_MINS: [f32; 5] = [-2.0, -2.0, FLOAT_NEG_NAN_SMALL, 0.0, -5.0]; +const FLOAT_MAXS: [f32; 5] = [5.0, 3.0, FLOAT_NAN_LARGE, 5.0, -0.0]; + +fn validate_float_metadata( + metadata: &ParquetMetaData, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, +) -> Result<()> { + let converter = StatisticsConverter::try_new("float_ieee754", arrow_schema, parquet_schema)?; + let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect(); + + // verify column statistics mins + let exp: Arc = Arc::new(Float32Array::from(FLOAT_MINS.to_vec())); + let mins = converter.row_group_mins(metadata.row_groups())?; + assert_eq!(&mins, &exp); + + // verify page mins (should be 1 page per row group, so should be same) + let page_mins = converter.data_page_mins( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_mins, &exp); + + let exp: Arc = Arc::new(Float32Array::from(FLOAT_MAXS.to_vec())); + let maxs = converter.row_group_maxes(metadata.row_groups())?; + assert_eq!(&maxs, &exp); + + // verify page maxs (should be 1 page per row group, so should be same) + let page_maxs = converter.data_page_maxes( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_maxs, &exp); + + let exp = UInt64Array::from(NAN_COUNTS.to_vec()); + let nans = converter.row_group_nan_counts(metadata.row_groups())?; + assert_eq!(&nans, &exp); + + let page_nans = converter.data_page_nan_counts( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_nans, &exp); + + Ok(()) +} + +const DOUBLE_NEG_NAN_SMALL: f64 = f64::from_bits(0xffffffffffffffff); +const DOUBLE_NEG_NAN_LARGE: f64 = f64::from_bits(0xfff0000000000001); +const DOUBLE_NAN_SMALL: f64 = f64::from_bits(0x7ff0000000000001); +const DOUBLE_NAN_LARGE: f64 = f64::from_bits(0x7fffffffffffffff); + +const DOUBLE_MINS: [f64; 5] = [-2.0, -2.0, DOUBLE_NEG_NAN_SMALL, 0.0, -5.0]; +const DOUBLE_MAXS: [f64; 5] = [5.0, 3.0, DOUBLE_NAN_LARGE, 5.0, -0.0]; + +fn validate_double_metadata( + metadata: &ParquetMetaData, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, +) -> Result<()> { + let converter = StatisticsConverter::try_new("double_ieee754", arrow_schema, parquet_schema)?; + let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect(); + + // verify column statistics mins + let exp: Arc = Arc::new(Float64Array::from(DOUBLE_MINS.to_vec())); + let mins = converter.row_group_mins(metadata.row_groups())?; + assert_eq!(&mins, &exp); + + // verify page mins (should be 1 page per row group, so should be same) + let page_mins = converter.data_page_mins( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_mins, &exp); + + let exp: Arc = Arc::new(Float64Array::from(DOUBLE_MAXS.to_vec())); + let maxs = converter.row_group_maxes(metadata.row_groups())?; + assert_eq!(&maxs, &exp); + + // verify page maxs (should be 1 page per row group, so should be same) + let page_maxs = converter.data_page_maxes( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_maxs, &exp); + + let exp = UInt64Array::from(NAN_COUNTS.to_vec()); + let nans = converter.row_group_nan_counts(metadata.row_groups())?; + assert_eq!(&nans, &exp); + + let page_nans = converter.data_page_nan_counts( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_nans, &exp); + + Ok(()) +} + +const FLOAT16_NEG_NAN_SMALL: f16 = f16::from_bits(0xffff); +const FLOAT16_NEG_NAN_LARGE: f16 = f16::from_bits(0xfc01); +const FLOAT16_NAN_SMALL: f16 = f16::from_bits(0x7c01); +const FLOAT16_NAN_LARGE: f16 = f16::from_bits(0x7fff); + +const FLOAT16_MINS: [f16; 5] = [ + f16::from_bits(0xc000), + f16::from_bits(0xc000), + FLOAT16_NEG_NAN_SMALL, + f16::from_bits(0x0000), + f16::from_bits(0xc500), +]; +const FLOAT16_MAXS: [f16; 5] = [ + f16::from_bits(0x4500), + f16::from_bits(0x4200), + FLOAT16_NAN_LARGE, + f16::from_bits(0x4500), + f16::from_bits(0x8000), +]; + +fn validate_float16_metadata( + metadata: &ParquetMetaData, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, +) -> Result<()> { + let converter = StatisticsConverter::try_new("float16_ieee754", arrow_schema, parquet_schema)?; + let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect(); + + // verify column statistics mins + let exp: Arc = Arc::new(Float16Array::from(FLOAT16_MINS.to_vec())); + let mins = converter.row_group_mins(metadata.row_groups())?; + assert_eq!(&mins, &exp); + + // verify page mins (should be 1 page per row group, so should be same) + let page_mins = converter.data_page_mins( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_mins, &exp); + + let exp: Arc = Arc::new(Float16Array::from(FLOAT16_MAXS.to_vec())); + let maxs = converter.row_group_maxes(metadata.row_groups())?; + assert_eq!(&maxs, &exp); + + // verify page maxs (should be 1 page per row group, so should be same) + let page_maxs = converter.data_page_maxes( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_maxs, &exp); + + let exp = UInt64Array::from(NAN_COUNTS.to_vec()); + let nans = converter.row_group_nan_counts(metadata.row_groups())?; + assert_eq!(&nans, &exp); + + let page_nans = converter.data_page_nan_counts( + metadata.column_index().unwrap(), + metadata.offset_index().unwrap(), + &row_group_indices, + )?; + assert_eq!(&page_nans, &exp); + + Ok(()) +} + +fn validate_metadata( + metadata: &ParquetMetaData, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, +) -> Result<()> { + validate_float_metadata(metadata, arrow_schema, parquet_schema)?; + validate_double_metadata(metadata, arrow_schema, parquet_schema)?; + validate_float16_metadata(metadata, arrow_schema, parquet_schema) +} + +#[test] +fn test_ieee754_interop() { + // 1) read interop file + // 2) validate stats are as expected + // 3) rewrite file, check validate metadata from writer + // 4) re-read what we've written, again validate metadata + let parquet_testing_data = parquet_test_data(); + let path = PathBuf::from(parquet_testing_data).join("floating_orders_nan_count.parquet"); + println!("Reading file: {path:?}"); + + let file = std::fs::File::open(&path).unwrap(); + let options = ArrowReaderOptions::new() + .with_page_index_policy(parquet::file::metadata::PageIndexPolicy::Required); + let builder = ArrowReaderBuilder::try_new_with_options(file, options).unwrap(); + let file_metadata = builder.metadata().clone(); + let schema = builder.schema().clone(); + let parquet_schema = builder.parquet_schema().clone(); + + println!("validate interop file"); + validate_metadata(file_metadata.as_ref(), schema.as_ref(), &parquet_schema) + .expect("validate read metadata"); + + let reader = builder.build().unwrap(); + let mut outbuf = Vec::new(); + { + let writer_options = WriterProperties::builder() + .set_max_row_group_row_count(Some(10)) + .build(); + let mut writer = ArrowWriter::try_new(&mut outbuf, schema.clone(), Some(writer_options)) + .expect("create arrow writer"); + for maybe_batch in reader { + let batch = maybe_batch.expect("reading batch"); + writer.write(&batch).expect("writing data"); + } + let write_meta = writer.close().expect("closing file"); + println!("validate writer output"); + validate_metadata(&write_meta, schema.as_ref(), &parquet_schema) + .expect("validate written metadata"); + } + + //fs::write("output.pq", outbuf.clone()).unwrap(); + + // now re-validate the bit we've written + let options = ArrowReaderOptions::new() + .with_page_index_policy(parquet::file::metadata::PageIndexPolicy::Required); + let builder = ArrowReaderBuilder::try_new_with_options(Bytes::from(outbuf), options).unwrap(); + let file_metadata = builder.metadata().clone(); + let schema = builder.schema().clone(); + let parquet_schema = builder.parquet_schema().clone(); + + println!("validate from rust output"); + validate_metadata(file_metadata.as_ref(), schema.as_ref(), &parquet_schema) + .expect("validate re-read metadata"); +} + +// This test replicates the data produced by the parquet-java code that generated +// parquet-testing/data/floating_orders_nan_count.parquet +#[test] +fn test_ieee754_interop2() { + // define schema + let schema = Schema::new(vec![ + Field::new("float_ieee754", DataType::Float32, false), + Field::new("double_ieee754", DataType::Float64, false), + Field::new("float16_ieee754", DataType::Float16, false), + ]); + let schema = Arc::new(schema); + + let mut outbuf = Vec::new(); + { + let writer_options = WriterProperties::builder() + .set_max_row_group_row_count(Some(10)) + .build(); + let mut writer = ArrowWriter::try_new(&mut outbuf, schema.clone(), Some(writer_options)) + .expect("create arrow writer"); + + // this only works for non-NaN cases + let make_batch = |data: &[f32]| -> RecordBatch { + let arr1 = Float32Array::from(data.to_vec()); + let arr2 = Float64Array::from(data.iter().map(|v| *v as f64).collect::>()); + let arr3 = + Float16Array::from(data.iter().map(|v| f16::from_f32(*v)).collect::>()); + + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arr1), Arc::new(arr2), Arc::new(arr3)], + ) + .unwrap() + }; + + // batch 1: no NaNs + let batch = make_batch(&[-2.0f32, -1.0, -0.0, 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0]); + writer.write(&batch).expect("writing batch1"); + + // batch 2: mixed + let float_data = vec![ + FLOAT_NEG_NAN_SMALL, + -2.0, + FLOAT_NEG_NAN_LARGE, + -1.0, + -0.0, + 0.0, + 1.0, + FLOAT_NAN_SMALL, + 3.0, + FLOAT_NAN_LARGE, + ]; + let double_data = vec![ + DOUBLE_NEG_NAN_SMALL, + -2.0, + DOUBLE_NEG_NAN_LARGE, + -1.0, + -0.0, + 0.0, + 1.0, + DOUBLE_NAN_SMALL, + 3.0, + DOUBLE_NAN_LARGE, + ]; + let float16_data = vec![ + FLOAT16_NEG_NAN_SMALL, + f16::from_f32(-2.0), + FLOAT16_NEG_NAN_LARGE, + f16::from_f32(-1.0), + f16::from_f32(-0.0), + f16::from_f32(0.0), + f16::from_f32(1.0), + FLOAT16_NAN_SMALL, + f16::from_f32(3.0), + FLOAT16_NAN_LARGE, + ]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float32Array::from(float_data)), + Arc::new(Float64Array::from(double_data)), + Arc::new(Float16Array::from(float16_data)), + ], + ) + .unwrap(); + writer.write(&batch).expect("writing batch2"); + + // batch 3: all NaN + let float_data = vec![ + FLOAT_NEG_NAN_SMALL, + FLOAT_NEG_NAN_LARGE, + FLOAT_NAN_SMALL, + FLOAT_NAN_LARGE, + FLOAT_NEG_NAN_SMALL, + FLOAT_NEG_NAN_LARGE, + FLOAT_NAN_SMALL, + FLOAT_NAN_LARGE, + FLOAT_NEG_NAN_SMALL, + FLOAT_NAN_LARGE, + ]; + let double_data = vec![ + DOUBLE_NEG_NAN_SMALL, + DOUBLE_NEG_NAN_LARGE, + DOUBLE_NAN_SMALL, + DOUBLE_NAN_LARGE, + DOUBLE_NEG_NAN_SMALL, + DOUBLE_NEG_NAN_LARGE, + DOUBLE_NAN_SMALL, + DOUBLE_NAN_LARGE, + DOUBLE_NEG_NAN_SMALL, + DOUBLE_NAN_LARGE, + ]; + let float16_data = vec![ + FLOAT16_NEG_NAN_SMALL, + FLOAT16_NEG_NAN_LARGE, + FLOAT16_NAN_SMALL, + FLOAT16_NAN_LARGE, + FLOAT16_NEG_NAN_SMALL, + FLOAT16_NEG_NAN_LARGE, + FLOAT16_NAN_SMALL, + FLOAT16_NAN_LARGE, + FLOAT16_NEG_NAN_SMALL, + FLOAT16_NAN_LARGE, + ]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float32Array::from(float_data)), + Arc::new(Float64Array::from(double_data)), + Arc::new(Float16Array::from(float16_data)), + ], + ) + .unwrap(); + writer.write(&batch).expect("writing batch3"); + + // batch 4: 0 min + let batch = make_batch(&[0.0f32, 0.0, 0.0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0]); + writer.write(&batch).expect("writing batch4"); + + // batch 5: -0 max + let batch = make_batch(&[ + -5.0f32, -4.0, -3.0, -2.0, -1.5, -1.0, -0.5, -0.0, -0.0, -0.0, + ]); + writer.write(&batch).expect("writing batch5"); + + let write_meta = writer.close().expect("closing file"); + let parquet_schema = write_meta.file_metadata().schema_descr(); + println!("validate writer output"); + validate_metadata(&write_meta, schema.as_ref(), parquet_schema) + .expect("validate written metadata"); + } + + //fs::write("output2.pq", outbuf.clone()).unwrap(); + + // now re-validate the bit we've written + let options = ArrowReaderOptions::new() + .with_page_index_policy(parquet::file::metadata::PageIndexPolicy::Required); + let builder = ArrowReaderBuilder::try_new_with_options(Bytes::from(outbuf), options).unwrap(); + let file_metadata = builder.metadata().clone(); + let schema = builder.schema().clone(); + let parquet_schema = builder.parquet_schema().clone(); + + println!("validate from rust output"); + validate_metadata(file_metadata.as_ref(), schema.as_ref(), &parquet_schema) + .expect("validate re-read metadata"); +}