Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 154 additions & 18 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};

use super::{Action, CommitInfo, Metadata, Protocol};
use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, rb_from_scan_meta};
use crate::kernel::schema::cast::merge_delta_struct;
use crate::kernel::{ARROW_HANDLER, StructType, spawn_blocking_with_span};
use crate::logstore::{LogStore, LogStoreExt};
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter, to_kernel_predicate};
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct Snapshot {
}

impl Snapshot {

pub async fn try_new_with_engine(
log_store: &dyn LogStore,
engine: Arc<dyn Engine>,
Expand Down Expand Up @@ -114,13 +116,12 @@ impl Snapshot {
snapshot
};

let schema = Arc::new(
snapshot
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()?,
);
let table_schema = snapshot.table_configuration().schema();
let schema = Arc::new (match &config.custom_schema {
Some(custom) => merge_delta_struct(table_schema.as_ref(), custom.as_ref())
.and_then(|m| (&m).try_into_arrow())?,
None => table_schema.as_ref().try_into_arrow()?,
});

Ok(Self {
inner: snapshot,
Expand Down Expand Up @@ -183,13 +184,12 @@ impl Snapshot {
.await
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;

let schema = Arc::new(
snapshot
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()?,
);
let table_schema = snapshot.table_configuration().schema();
let schema = Arc::new (match &self.config.custom_schema {
Some(custom) => merge_delta_struct(table_schema.as_ref(), custom.as_ref())
.and_then(|m| (&m).try_into_arrow())?,
None => table_schema.as_ref().try_into_arrow()?,
});

Ok(Arc::new(Self {
inner: snapshot,
Expand All @@ -205,7 +205,15 @@ impl Snapshot {

/// Get the table schema of the snapshot
pub fn schema(&self) -> KernelSchemaRef {
self.inner.table_configuration().schema()
self.config.custom_schema
.as_ref()
.map(|custom| {
let table_schema = self.inner.table_configuration().schema();
merge_delta_struct(table_schema.as_ref(), custom.as_ref())
.expect("Failed to merge schemas")
})
.map(Arc::new)
.unwrap_or_else(|| self.inner.table_configuration().schema())
}

pub fn arrow_schema(&self) -> SchemaRef {
Expand Down Expand Up @@ -545,9 +553,12 @@ pub(crate) async fn resolve_snapshot(

fn read_adds_size(array: &dyn ProvidesColumnByName) -> DeltaResult<usize> {
if let Some(arr) = ex::extract_and_cast_opt::<StructArray>(array, "add") {
let size = ex::extract_and_cast::<Int64Array>(arr, "size")?;
let sum = sum_array_checked::<arrow::array::types::Int64Type, _>(size)?.unwrap_or_default();
Ok(sum as usize)
if let Some(size) = ex::extract_and_cast_opt::<Int64Array>(arr, "size") {
let sum = sum_array_checked::<arrow::array::types::Int64Type, _>(size)?.unwrap_or_default();
Ok(sum as usize)
} else {
Ok(0)
}
} else {
Ok(0)
}
Expand Down Expand Up @@ -955,4 +966,129 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_snapshot_with_custom_schema() -> TestResult {
use crate::kernel::{DataType, PrimitiveType};
use crate::operations::create::CreateBuilder;

let full_schema = StructType::try_new(vec![
StructField::new("id", DataType::Primitive(PrimitiveType::Long), false),
StructField::new("name", DataType::Primitive(PrimitiveType::String), true),
StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true),
])?;

let table = CreateBuilder::new()
.with_location("memory:///test_snapshot_custom_schema")
.with_columns(full_schema.fields().cloned())
.await?;

let log_store = table.log_store();

let snapshot = Snapshot::try_new(log_store.as_ref(), Default::default(), None).await?;
assert_eq!(snapshot.schema().fields().len(), 3);
assert_eq!(snapshot.arrow_schema().fields().len(), 3);

let custom_schema = StructType::try_new(vec![
StructField::new("extra_field", DataType::Primitive(PrimitiveType::String), true),
])?;

let mut config = DeltaTableConfig::default();
config.custom_schema = Some(Arc::new(custom_schema));
let snapshot_with_custom = Snapshot::try_new(log_store.as_ref(), config, None).await?;

assert_eq!(snapshot_with_custom.schema().fields().len(), 4);
assert_eq!(snapshot_with_custom.arrow_schema().fields().len(), 4);

assert!(snapshot_with_custom.schema().field("extra_field").is_some());
assert!(snapshot_with_custom.schema().field("id").is_some());
assert!(snapshot_with_custom.schema().field("name").is_some());
assert!(snapshot_with_custom.schema().field("age").is_some());

Ok(())
}

#[tokio::test]
async fn test_eager_snapshot_with_custom_schema() -> TestResult {
use crate::kernel::{DataType, PrimitiveType};
use crate::operations::create::CreateBuilder;

let full_schema = StructType::try_new(vec![
StructField::new("id", DataType::Primitive(PrimitiveType::Long), false),
StructField::new("value", DataType::Primitive(PrimitiveType::Integer), true),
StructField::new("description", DataType::Primitive(PrimitiveType::String), true),
])?;

let table = CreateBuilder::new()
.with_location("memory:///test_eager_snapshot_custom_schema")
.with_columns(full_schema.fields().cloned())
.await?;

let log_store = table.log_store();

let snapshot = EagerSnapshot::try_new(log_store.as_ref(), Default::default(), None).await?;
assert_eq!(snapshot.schema().fields().len(), 3);
assert_eq!(snapshot.arrow_schema().fields().len(), 3);

let custom_schema = StructType::try_new(vec![
StructField::new("computed_field", DataType::Primitive(PrimitiveType::Double), true),
StructField::new("metadata_field", DataType::Primitive(PrimitiveType::String), true),
])?;

let mut config = DeltaTableConfig::default();
config.custom_schema = Some(Arc::new(custom_schema));
let snapshot_with_custom = EagerSnapshot::try_new(log_store.as_ref(), config, None).await?;

assert_eq!(snapshot_with_custom.schema().fields().len(), 5);
assert_eq!(snapshot_with_custom.arrow_schema().fields().len(), 5);

assert!(snapshot_with_custom.schema().field("id").is_some());
assert!(snapshot_with_custom.schema().field("value").is_some());
assert!(snapshot_with_custom.schema().field("description").is_some());
assert!(snapshot_with_custom.schema().field("computed_field").is_some());
assert!(snapshot_with_custom.schema().field("metadata_field").is_some());

let kernel_schema = snapshot_with_custom.schema();
let arrow_schema = snapshot_with_custom.arrow_schema();
assert_eq!(kernel_schema.fields().len(), arrow_schema.fields().len());

Ok(())
}

#[tokio::test]
async fn test_snapshot_custom_schema_type_override() -> TestResult {
use crate::kernel::{DataType, PrimitiveType};
use crate::operations::create::CreateBuilder;

let table_schema = StructType::try_new(vec![
StructField::new("id", DataType::Primitive(PrimitiveType::Long), false),
StructField::new("count", DataType::Primitive(PrimitiveType::Integer), false),
])?;

let table = CreateBuilder::new()
.with_location("memory:///test_snapshot_schema_override")
.with_columns(table_schema.fields().cloned())
.await?;

let log_store = table.log_store();

let custom_schema = StructType::try_new(vec![
StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true),
])?;

let mut config = DeltaTableConfig::default();
config.custom_schema = Some(Arc::new(custom_schema));
let snapshot = Snapshot::try_new(log_store.as_ref(), config, None).await?;

assert_eq!(snapshot.schema().fields().len(), 2);

let schema = snapshot.schema();
let count_field = schema.field("count").unwrap();
assert!(count_field.is_nullable());

let id_field = schema.field("id").unwrap();
assert!(!id_field.is_nullable());

Ok(())
}
}
Loading
Loading