From ef9120b3e943f4e8b61bee9b307173a4dd0d01fe Mon Sep 17 00:00:00 2001 From: Catalin Dobre Date: Tue, 17 Feb 2026 00:04:47 +0200 Subject: [PATCH 1/2] feat: expose snapshot load metrics in order to detect if log truncation was applied or not --- .../table_provider/next/scan/codec.rs | 3 + crates/core/src/kernel/snapshot/mod.rs | 29 +- crates/core/src/kernel/snapshot/serde.rs | 4 + .../core/src/kernel/snapshot/size_limits.rs | 248 +++++++++++++++--- crates/core/src/table/mod.rs | 13 + 5 files changed, 255 insertions(+), 42 deletions(-) diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs index 6a8a12679..a6a65a589 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs @@ -27,6 +27,7 @@ use crate::delta_datafusion::engine::{to_datafusion_expr, to_delta_expression}; use crate::delta_datafusion::DeltaScanConfig; use crate::delta_datafusion::expr_adapter::build_expr_adapter_factory; use crate::DeltaTableConfig; +use crate::kernel::size_limits::SnapshotLoadMetrics; use crate::kernel::Snapshot; /// Codec for serializing/deserializing [`DeltaScanExec`] physical plans. @@ -126,11 +127,13 @@ impl TryFrom<&DeltaScanExec> for DeltaScanExecWire { // At upgrade, RECHECK usage sites for DeltaTableConfig, we'll need to re-evaluate if // stuff begins writing to it let delta_table_config = DeltaTableConfig::default(); + let load_metrics = SnapshotLoadMetrics::from_snapshot(&exec_scan_plan_scan_snapshot); Snapshot { inner: exec_scan_plan_scan_snapshot, // config: delta_table_config, schema, + load_metrics, } }; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 32bfa0006..fbe74a9a3 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -55,6 +55,7 @@ pub use self::log_data::*; pub use iterators::*; pub use scan::*; pub use stream::*; +use crate::kernel::size_limits::SnapshotLoadMetrics; mod iterators; mod log_data; @@ -75,6 +76,8 @@ pub struct Snapshot { pub(crate) config: DeltaTableConfig, /// Logical table schema pub(crate) schema: SchemaRef, + /// Metrics captured during snapshot loading + pub(crate) load_metrics: SnapshotLoadMetrics, } impl Snapshot { @@ -106,13 +109,8 @@ impl Snapshot { } }; - let snapshot = if let Some(limiter) = &config.log_size_limiter { - let segment = limiter.truncate(snapshot.log_segment().clone(), log_store).await?; - let table_configuration = snapshot.table_configuration().clone(); - Arc::new(KernelSnapshot::new(segment, table_configuration)) - } else { - snapshot - }; + let (snapshot, load_metrics) = + size_limits::apply_optional_log_limiter(snapshot, config.log_size_limiter.as_ref(), log_store).await?; let schema = Arc::new( snapshot @@ -126,6 +124,7 @@ impl Snapshot { inner: snapshot, config, schema, + load_metrics, }) } @@ -191,10 +190,13 @@ impl Snapshot { .try_into_arrow()?, ); + let load_metrics = SnapshotLoadMetrics::from_snapshot(snapshot.as_ref()); + Ok(Arc::new(Self { inner: snapshot, schema, config: self.config.clone(), + load_metrics, })) } @@ -227,6 +229,11 @@ impl Snapshot { &self.config } + /// Get the metrics captured during snapshot loading + pub fn load_metrics(&self) -> &SnapshotLoadMetrics { + &self.load_metrics + } + /// Get the table root of the snapshot pub(crate) fn table_root_path(&self) -> DeltaResult { Ok(Path::from_url_path(self.inner.table_root().path())?) @@ -677,6 +684,11 @@ impl EagerSnapshot { self.snapshot.load_config() } + /// Get the metrics captured during snapshot loading + pub fn load_metrics(&self) -> &SnapshotLoadMetrics { + self.snapshot.load_metrics() + } + /// Well known table configuration pub fn table_properties(&self) -> &TableProperties { self.snapshot.table_properties() @@ -830,11 +842,14 @@ mod tests { .as_ref() .try_into_arrow()?; + let load_metrics = SnapshotLoadMetrics::from_snapshot(&snapshot); + Ok(( Self { inner: snapshot, config: Default::default(), schema: Arc::new(schema), + load_metrics, }, log_store, )) diff --git a/crates/core/src/kernel/snapshot/serde.rs b/crates/core/src/kernel/snapshot/serde.rs index 7188b8a11..5751748ea 100644 --- a/crates/core/src/kernel/snapshot/serde.rs +++ b/crates/core/src/kernel/snapshot/serde.rs @@ -18,6 +18,7 @@ use url::Url; use crate::DeltaTableConfig; use super::{EagerSnapshot, Snapshot}; +use super::size_limits::SnapshotLoadMetrics; impl Serialize for Snapshot { fn serialize(&self, serializer: S) -> Result @@ -219,10 +220,13 @@ impl<'de> Visitor<'de> for SnapshotVisitor { .try_into_arrow() .map_err(de::Error::custom)?; + let load_metrics = SnapshotLoadMetrics::from_snapshot(&snapshot); + Ok(Snapshot { inner: Arc::new(snapshot), schema: Arc::new(schema), config, + load_metrics, }) } } diff --git a/crates/core/src/kernel/snapshot/size_limits.rs b/crates/core/src/kernel/snapshot/size_limits.rs index 3c451cd15..bb0222f98 100644 --- a/crates/core/src/kernel/snapshot/size_limits.rs +++ b/crates/core/src/kernel/snapshot/size_limits.rs @@ -5,13 +5,169 @@ use std::collections::HashMap; use std::convert::identity; use std::num::{NonZeroU64, NonZeroUsize}; use std::ops::RangeInclusive; +use std::sync::Arc; use delta_kernel::log_segment::LogSegment; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; -use delta_kernel::Version; +use delta_kernel::Snapshot; use itertools::Itertools; use strum::Display; use tracing::{debug, info, trace, warn}; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SnapshotLoadMetrics { + /// Current version of the snapshot + pub current_version: i64, + + /// Oldest commit version included in the loaded snapshot + /// None if only checkpoint was loaded (no commits) + pub oldest_version_loaded: Option, + + /// Final log segment size in bytes (after truncation if applied) + pub loaded_log_segment_size: u64, + + /// Whether log size limiting was applied during load + pub log_size_limiter_applied: bool, + + /// Original log segment size in bytes (before any truncation) + /// None if no limiter was configured + pub original_log_segment_size: Option, + + /// Number of commits discarded due to truncation + /// None if no truncation occurred + pub num_commits_discarded: Option, +} + +impl Default for SnapshotLoadMetrics { + fn default() -> Self { + Self { + current_version: 0, + oldest_version_loaded: None, + loaded_log_segment_size: 0, + log_size_limiter_applied: false, + original_log_segment_size: None, + num_commits_discarded: None, + } + } +} + +pub(crate) fn log_segment_total_byte_size(segment: &LogSegment) -> u64 { + segment + .checkpoint_parts + .iter() + .chain(segment.ascending_commit_files.iter()) + .map(|p| p.location.size) + .sum() +} + +pub(crate) async fn apply_optional_log_limiter( + snapshot: Arc, + limiter: Option<&LogSizeLimiter>, + log_store: &dyn LogStore, +) -> DeltaResult<(Arc, SnapshotLoadMetrics)> { + let current_version = snapshot.version() as i64; + + if let Some(limiter) = limiter { + let original_segment = snapshot.log_segment().clone(); + let original_size = log_segment_total_byte_size(&original_segment); + + let (truncated_segment, truncation_info) = + limiter.truncate(original_segment, log_store).await?; + let table_configuration = snapshot.table_configuration().clone(); + + let oldest_version = truncated_segment + .ascending_commit_files + .first() + .map(|p| p.version as i64); + + let metrics = match truncation_info { + Some(info) => SnapshotLoadMetrics::with_truncation( + current_version, + oldest_version, + info.truncated_size, + info.original_size, + info.commits_discarded, + ), + None => SnapshotLoadMetrics::no_truncation( + current_version, + oldest_version, + original_size, + ), + }; + + Ok(( + Arc::new(Snapshot::new(truncated_segment, table_configuration)), + metrics, + )) + } else { + let metrics = SnapshotLoadMetrics::from_snapshot(snapshot.as_ref()); + Ok((snapshot, metrics)) + } +} + +impl SnapshotLoadMetrics { + pub fn from_snapshot(snapshot: &Snapshot) -> Self { + let log_segment = snapshot.log_segment(); + let log_size = log_segment_total_byte_size(log_segment); + let oldest_version = log_segment + .ascending_commit_files + .first() + .map(|p| p.version as i64); + + SnapshotLoadMetrics::no_truncation( + snapshot.version() as i64, + oldest_version, + log_size, + ) + } + + pub fn no_truncation( + version: i64, + oldest_version: Option, + log_segment_size: u64, + ) -> Self { + Self { + current_version: version, + oldest_version_loaded: oldest_version, + loaded_log_segment_size: log_segment_size, + log_size_limiter_applied: false, + original_log_segment_size: None, + num_commits_discarded: None, + } + } + + pub fn with_truncation( + version: i64, + oldest_version: Option, + truncated_size: u64, + original_size: u64, + commits_discarded: usize, + ) -> Self { + Self { + current_version: version, + oldest_version_loaded: oldest_version, + loaded_log_segment_size: truncated_size, + log_size_limiter_applied: true, + original_log_segment_size: Some(original_size), + num_commits_discarded: Some(commits_discarded), + } + } + + pub fn was_truncated(&self) -> bool { + self.log_size_limiter_applied && self.num_commits_discarded.is_some() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TruncationInfo { + /// Original log segment size in bytes (before truncation) + pub original_size: u64, + /// Final log segment size in bytes (after truncation) + pub truncated_size: u64, + /// Number of commits discarded + pub commits_discarded: usize, +} + #[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum OversizePolicy { @@ -75,15 +231,11 @@ impl LogSizeLimiter { .transpose() } - pub(super) async fn truncate(&self, log_segment: LogSegment, log_store: &dyn LogStore) -> DeltaResult { - let total_size: u64 = log_segment - .checkpoint_parts - .iter() - .chain(log_segment.ascending_commit_files.iter()) - .map(|parsed_path| parsed_path.location.size) - .sum(); + pub(super) async fn truncate(&self, log_segment: LogSegment, log_store: &dyn LogStore) -> DeltaResult<(LogSegment, Option)> { + let total_size: u64 = log_segment_total_byte_size(&log_segment); let total_size = total_size; let size_limit = self.size_limit.get(); + let original_commit_count = log_segment.ascending_commit_files.len(); if total_size > size_limit { warn!( @@ -97,17 +249,28 @@ impl LogSizeLimiter { Table log segment size ({} bytes) exceeds maximum allowed size ({} bytes). Consider increasing the size limit or using an oversize policy other than {}. "#, total_size, self.size_limit, self.oversize_policy))), - OversizePolicy::UseTruncatedCommitLog(num_commits) => - truncated_commit_log(log_segment, log_store, num_commits, size_limit).await, + OversizePolicy::UseTruncatedCommitLog(num_commits) => { + let (truncated_segment, truncated_size) = + truncated_commit_log(log_segment, log_store, num_commits, size_limit).await?; + let final_commit_count = truncated_segment.ascending_commit_files.len(); + let commits_discarded = original_commit_count.saturating_sub(final_commit_count); + + let truncation_info = TruncationInfo { + original_size: total_size, + truncated_size, + commits_discarded, + }; + Ok((truncated_segment, Some(truncation_info))) + } } } else { debug!("Log segment size ({} bytes) is within the limit of {} bytes", total_size, size_limit); - Ok(log_segment) + Ok((log_segment, None)) } } } -async fn truncated_commit_log(log_segment: LogSegment, log_store: &dyn LogStore, num_commits: &NonZeroUsize, size_limit: u64) -> DeltaResult { +async fn truncated_commit_log(log_segment: LogSegment, log_store: &dyn LogStore, num_commits: &NonZeroUsize, size_limit: u64) -> DeltaResult<(LogSegment, u64)> { let num_commits = num_commits.get(); let truncated_log: Vec = if log_segment.ascending_commit_files.len() < num_commits { let segment_version = log_segment.end_version as usize; @@ -126,25 +289,32 @@ async fn truncated_commit_log(log_segment: LogSegment, log_store: &dyn LogStore, }; let mut truncated_log_size = 0_u64; // keep track of the total size to cut it shorter if needed let latest_commit_file = truncated_log.last().cloned(); - Ok(LogSegment { + let final_commits: Vec = truncated_log.into_iter() + .rev() + .take_while(|file_meta| { + truncated_log_size += file_meta.location.size; + truncated_log_size <= size_limit + }) + .collect::>() + .into_iter() + .rev() + .collect(); + + // Calculate the actual final size + let final_size: u64 = final_commits.iter().map(|f| f.location.size).sum(); + + let segment = LogSegment { end_version: log_segment.end_version, - ascending_commit_files: truncated_log.into_iter() - .rev() - .take_while(|file_meta| { - truncated_log_size += file_meta.location.size; - truncated_log_size <= size_limit - }) - .collect::>() - .into_iter() - .rev() - .collect(), + ascending_commit_files: final_commits, checkpoint_parts: vec![], ascending_compaction_files: vec![], log_root: log_store.log_root_url(), checkpoint_version: None, latest_crc_file: None, latest_commit_file, - }) + }; + + Ok((segment, final_size)) } async fn list_commit_files( @@ -272,7 +442,9 @@ mod tests { let segment = create_log_segment(&log_store, None).await?; assert_segment_with_checkpoint(&segment, 90, 10); // total size < size limit - assert_eq!(limiter.truncate(segment.clone(), &log_store).await?, segment); + let (truncated_segment, truncation_info) = limiter.truncate(segment.clone(), &log_store).await?; + assert_eq!(truncated_segment, segment); + assert_eq!(truncation_info, None); Ok(()) } @@ -311,15 +483,18 @@ mod tests { let segment = create_log_segment(&log_store, Some(25)).await?; assert_segment_with_checkpoint(&segment, 25, 0); - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 16..=25); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 16..=25); let segment = create_log_segment(&log_store, Some(7)).await?; assert_segment_with_checkpoint(&segment, 5, 2); - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 0..=7); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 0..=7); let segment = create_log_segment(&log_store, Some(19)).await?; assert_segment_with_checkpoint(&segment, 15, 4); - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 10..=19); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 10..=19); Ok(()) } @@ -337,12 +512,14 @@ mod tests { let segment = create_log_segment(&log_store, Some(30)).await?; assert_segment_with_commits_only(&segment, 0..=30); // size limit not exceeded: 31 commits * 10 bytes < 500 bytes, segment not truncated - assert_eq!(limiter.truncate(segment.clone(), &log_store).await?, segment); + let (truncated_segment, truncation_info) = limiter.truncate(segment.clone(), &log_store).await?; + assert_eq!(truncated_segment, segment); let segment = create_log_segment(&log_store, Some(75)).await?; assert_segment_with_commits_only(&segment, 0..=75); // size limit exceeded: 75 commits * 10 bytes > 500 bytes; keeps the last 10 commits - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 66..=75); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 66..=75); Ok(()) } @@ -360,7 +537,8 @@ mod tests { let segment = create_log_segment(&log_store, Some(70)).await?; assert_segment_with_checkpoint(&segment, 50, 20); // less than 50 commits available in the vacuumed store - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 30..=70); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 30..=70); Ok(()) } @@ -378,7 +556,8 @@ mod tests { let segment = create_log_segment(&log_store, None).await?; assert_segment_with_checkpoint(&segment, 125, 25); // only loads 50 commits instead of the configured 100 to stay within the size limit - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 101..=150); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 101..=150); Ok(()) } @@ -398,7 +577,8 @@ mod tests { let segment = create_log_segment(&log_store, Some(23)).await?; assert_segment_with_checkpoint(&segment, 20, 3); - assert_segment_with_commits_only(&limiter.truncate(segment, &log_store).await?, 4..=23 ); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 4..=23 ); Ok(()) } @@ -456,8 +636,6 @@ mod tests { use object_store::{GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult}; use std::ops::RangeInclusive; use std::sync::Arc; - use rand::seq::SliceRandom; - use rand::thread_rng; use url::Url; use uuid::Uuid; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 35380cc89..d7bca50a3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -33,6 +33,7 @@ mod columns; // Re-exposing for backwards compatibility pub use columns::*; +use crate::kernel::size_limits::SnapshotLoadMetrics; /// In memory representation of a Delta Table /// @@ -358,6 +359,18 @@ impl DeltaTable { self.state.as_ref().ok_or(DeltaTableError::NotInitialized) } + /// Returns the metrics captured during snapshot loading. + /// + /// This method provides access to information about how the snapshot was loaded, + /// including whether log size limiting was applied and if truncation occurred. + /// + /// ## Returns + /// + /// A reference to the snapshot load metrics if the table has been loaded, `None` otherwise. + pub fn snapshot_load_metrics(&self) -> Option<&SnapshotLoadMetrics> { + self.state.as_ref().map(|s| s.snapshot().load_metrics()) + } + /// Time travel Delta table to the latest version that's created at or before provided /// `datetime` argument. /// From 6b24db50b7e292339f0edfad64ad8f0821c27f88 Mon Sep 17 00:00:00 2001 From: dvlascenco Date: Wed, 25 Mar 2026 18:23:51 +0200 Subject: [PATCH 2/2] fixup: calculation of total file size --- crates/core/src/kernel/snapshot/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index fbe74a9a3..08eb79618 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -550,13 +550,11 @@ pub(crate) async fn resolve_snapshot( } } -fn read_adds_size(array: &dyn ProvidesColumnByName) -> DeltaResult { - if let Some(arr) = ex::extract_and_cast_opt::(array, "add") { - let size = ex::extract_and_cast::(arr, "size")?; - let sum = sum_array_checked::(size)?.unwrap_or_default(); - Ok(sum as usize) +fn read_adds_size(array: &dyn ProvidesColumnByName) -> usize { + if let Some(size) = ex::extract_and_cast_opt::(array, "size") { + sum_array_checked::(size).unwrap().unwrap_or_default() as usize } else { - Ok(0) + 0 } } @@ -716,7 +714,7 @@ impl EagerSnapshot { self .files .iter() - .map(|frb| read_adds_size(frb).unwrap_or_default()) + .map(|frb| read_adds_size(frb)) .sum() }