Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use datafusion::prelude::Expr;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::source::DataSourceExec;
use datafusion_proto::bytes::Serializeable;

Check warning on line 18 in crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs

View workflow job for this annotation

GitHub Actions / Spell Check

"Serializeable" should be "Serializable".
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
use delta_kernel::expressions::{ColumnName, Expression, FieldTransform, Transform};
Expand All @@ -27,6 +27,7 @@
use crate::delta_datafusion::DeltaScanConfig;
use crate::delta_datafusion::expr_adapter::build_expr_adapter_factory;
use crate::DeltaTableConfig;
use crate::kernel::size_limits::SnapshotLoadMetrics;
use crate::kernel::Snapshot;

/// Codec for serializing/deserializing [`DeltaScanExec`] physical plans.
Expand Down Expand Up @@ -126,11 +127,13 @@
// At upgrade, RECHECK usage sites for DeltaTableConfig, we'll need to re-evaluate if
// stuff begins writing to it
let delta_table_config = DeltaTableConfig::default();
let load_metrics = SnapshotLoadMetrics::from_snapshot(&exec_scan_plan_scan_snapshot);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a little strange as the SnapshotLoadMetrics::from_snapshot could be hidden within snapshot (as it acts on the inner object ) - I think I don't like the fact that a SnapshotLoadMetrics comes from size_limits which really seems like an implementation detail

Snapshot {
inner: exec_scan_plan_scan_snapshot,
//
config: delta_table_config,
schema,
load_metrics,
}
};

Expand Down
41 changes: 27 additions & 14 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use arrow::compute::{filter_record_batch, is_not_null};
use arrow::datatypes::SchemaRef;
use arrow_arith::aggregate::sum_array_checked;
use arrow_array::{Int64Array, StructArray};

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `StructArray`
use delta_kernel::actions::{Remove, Sidecar};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
Expand Down Expand Up @@ -55,6 +55,7 @@
pub use iterators::*;
pub use scan::*;
pub use stream::*;
use crate::kernel::size_limits::SnapshotLoadMetrics;

mod iterators;
mod log_data;
Expand All @@ -75,6 +76,8 @@
pub(crate) config: DeltaTableConfig,
/// Logical table schema
pub(crate) schema: SchemaRef,
/// Metrics captured during snapshot loading
pub(crate) load_metrics: SnapshotLoadMetrics,
}

impl Snapshot {
Expand Down Expand Up @@ -106,13 +109,8 @@
}
};

let snapshot = if let Some(limiter) = &config.log_size_limiter {
let segment = limiter.truncate(snapshot.log_segment().clone(), log_store).await?;
let table_configuration = snapshot.table_configuration().clone();
Arc::new(KernelSnapshot::new(segment, table_configuration))
} else {
snapshot
};
let (snapshot, load_metrics) =
size_limits::apply_optional_log_limiter(snapshot, config.log_size_limiter.as_ref(), log_store).await?;

let schema = Arc::new(
snapshot
Expand All @@ -126,6 +124,7 @@
inner: snapshot,
config,
schema,
load_metrics,
})
}

Expand Down Expand Up @@ -191,10 +190,13 @@
.try_into_arrow()?,
);

let load_metrics = SnapshotLoadMetrics::from_snapshot(snapshot.as_ref());

Ok(Arc::new(Self {
inner: snapshot,
schema,
config: self.config.clone(),
load_metrics,
}))
}

Expand Down Expand Up @@ -227,6 +229,11 @@
&self.config
}

/// Get the metrics captured during snapshot loading
pub fn load_metrics(&self) -> &SnapshotLoadMetrics {
&self.load_metrics
}

/// Get the table root of the snapshot
pub(crate) fn table_root_path(&self) -> DeltaResult<Path> {
Ok(Path::from_url_path(self.inner.table_root().path())?)
Expand Down Expand Up @@ -543,13 +550,11 @@
}
}

fn read_adds_size(array: &dyn ProvidesColumnByName) -> DeltaResult<usize> {
if let Some(arr) = ex::extract_and_cast_opt::<StructArray>(array, "add") {
let size = ex::extract_and_cast::<Int64Array>(arr, "size")?;
let sum = sum_array_checked::<arrow::array::types::Int64Type, _>(size)?.unwrap_or_default();
Ok(sum as usize)
fn read_adds_size(array: &dyn ProvidesColumnByName) -> usize {
if let Some(size) = ex::extract_and_cast_opt::<Int64Array>(array, "size") {
sum_array_checked::<arrow::array::types::Int64Type, _>(size).unwrap().unwrap_or_default() as usize
} else {
Ok(0)
0
}
}

Expand Down Expand Up @@ -677,6 +682,11 @@
self.snapshot.load_config()
}

/// Get the metrics captured during snapshot loading
pub fn load_metrics(&self) -> &SnapshotLoadMetrics {
self.snapshot.load_metrics()
}

/// Well known table configuration
pub fn table_properties(&self) -> &TableProperties {
self.snapshot.table_properties()
Expand Down Expand Up @@ -704,7 +714,7 @@
self
.files
.iter()
.map(|frb| read_adds_size(frb).unwrap_or_default())
.map(|frb| read_adds_size(frb))
.sum()
}

Expand Down Expand Up @@ -830,11 +840,14 @@
.as_ref()
.try_into_arrow()?;

let load_metrics = SnapshotLoadMetrics::from_snapshot(&snapshot);

Ok((
Self {
inner: snapshot,
config: Default::default(),
schema: Arc::new(schema),
load_metrics,
},
log_store,
))
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/kernel/snapshot/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use url::Url;
use crate::DeltaTableConfig;

use super::{EagerSnapshot, Snapshot};
use super::size_limits::SnapshotLoadMetrics;

impl Serialize for Snapshot {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -219,10 +220,13 @@ impl<'de> Visitor<'de> for SnapshotVisitor {
.try_into_arrow()
.map_err(de::Error::custom)?;

let load_metrics = SnapshotLoadMetrics::from_snapshot(&snapshot);

Ok(Snapshot {
inner: Arc::new(snapshot),
schema: Arc::new(schema),
config,
load_metrics,
})
}
}
Expand Down
Loading
Loading