diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 32bfa0006..20d8f2314 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -47,6 +47,7 @@ use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; use super::{Action, CommitInfo, Metadata, Protocol}; use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, rb_from_scan_meta}; +use crate::kernel::schema::cast::merge_delta_struct; use crate::kernel::{ARROW_HANDLER, StructType, spawn_blocking_with_span}; use crate::logstore::{LogStore, LogStoreExt}; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter, to_kernel_predicate}; @@ -78,6 +79,7 @@ pub struct Snapshot { } impl Snapshot { + pub async fn try_new_with_engine( log_store: &dyn LogStore, engine: Arc, @@ -114,13 +116,12 @@ impl Snapshot { snapshot }; - let schema = Arc::new( - snapshot - .table_configuration() - .schema() - .as_ref() - .try_into_arrow()?, - ); + let table_schema = snapshot.table_configuration().schema(); + let schema = Arc::new (match &config.custom_schema { + Some(custom) => merge_delta_struct(table_schema.as_ref(), custom.as_ref()) + .and_then(|m| (&m).try_into_arrow())?, + None => table_schema.as_ref().try_into_arrow()?, + }); Ok(Self { inner: snapshot, @@ -183,13 +184,12 @@ impl Snapshot { .await .map_err(|e| DeltaTableError::Generic(e.to_string()))??; - let schema = Arc::new( - snapshot - .table_configuration() - .schema() - .as_ref() - .try_into_arrow()?, - ); + let table_schema = snapshot.table_configuration().schema(); + let schema = Arc::new (match &self.config.custom_schema { + Some(custom) => merge_delta_struct(table_schema.as_ref(), custom.as_ref()) + .and_then(|m| (&m).try_into_arrow())?, + None => table_schema.as_ref().try_into_arrow()?, + }); Ok(Arc::new(Self { inner: snapshot, @@ -205,7 +205,15 @@ impl Snapshot { /// Get the table schema of the snapshot pub fn schema(&self) -> KernelSchemaRef { - self.inner.table_configuration().schema() + self.config.custom_schema + .as_ref() + .map(|custom| { + let table_schema = self.inner.table_configuration().schema(); + merge_delta_struct(table_schema.as_ref(), custom.as_ref()) + .expect("Failed to merge schemas") + }) + .map(Arc::new) + .unwrap_or_else(|| self.inner.table_configuration().schema()) } pub fn arrow_schema(&self) -> SchemaRef { @@ -545,9 +553,12 @@ pub(crate) async fn resolve_snapshot( fn read_adds_size(array: &dyn ProvidesColumnByName) -> DeltaResult { if let Some(arr) = ex::extract_and_cast_opt::(array, "add") { - let size = ex::extract_and_cast::(arr, "size")?; - let sum = sum_array_checked::(size)?.unwrap_or_default(); - Ok(sum as usize) + if let Some(size) = ex::extract_and_cast_opt::(arr, "size") { + let sum = sum_array_checked::(size)?.unwrap_or_default(); + Ok(sum as usize) + } else { + Ok(0) + } } else { Ok(0) } @@ -955,4 +966,129 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_snapshot_with_custom_schema() -> TestResult { + use crate::kernel::{DataType, PrimitiveType}; + use crate::operations::create::CreateBuilder; + + let full_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), false), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true), + ])?; + + let table = CreateBuilder::new() + .with_location("memory:///test_snapshot_custom_schema") + .with_columns(full_schema.fields().cloned()) + .await?; + + let log_store = table.log_store(); + + let snapshot = Snapshot::try_new(log_store.as_ref(), Default::default(), None).await?; + assert_eq!(snapshot.schema().fields().len(), 3); + assert_eq!(snapshot.arrow_schema().fields().len(), 3); + + let custom_schema = StructType::try_new(vec![ + StructField::new("extra_field", DataType::Primitive(PrimitiveType::String), true), + ])?; + + let mut config = DeltaTableConfig::default(); + config.custom_schema = Some(Arc::new(custom_schema)); + let snapshot_with_custom = Snapshot::try_new(log_store.as_ref(), config, None).await?; + + assert_eq!(snapshot_with_custom.schema().fields().len(), 4); + assert_eq!(snapshot_with_custom.arrow_schema().fields().len(), 4); + + assert!(snapshot_with_custom.schema().field("extra_field").is_some()); + assert!(snapshot_with_custom.schema().field("id").is_some()); + assert!(snapshot_with_custom.schema().field("name").is_some()); + assert!(snapshot_with_custom.schema().field("age").is_some()); + + Ok(()) + } + + #[tokio::test] + async fn test_eager_snapshot_with_custom_schema() -> TestResult { + use crate::kernel::{DataType, PrimitiveType}; + use crate::operations::create::CreateBuilder; + + let full_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), false), + StructField::new("value", DataType::Primitive(PrimitiveType::Integer), true), + StructField::new("description", DataType::Primitive(PrimitiveType::String), true), + ])?; + + let table = CreateBuilder::new() + .with_location("memory:///test_eager_snapshot_custom_schema") + .with_columns(full_schema.fields().cloned()) + .await?; + + let log_store = table.log_store(); + + let snapshot = EagerSnapshot::try_new(log_store.as_ref(), Default::default(), None).await?; + assert_eq!(snapshot.schema().fields().len(), 3); + assert_eq!(snapshot.arrow_schema().fields().len(), 3); + + let custom_schema = StructType::try_new(vec![ + StructField::new("computed_field", DataType::Primitive(PrimitiveType::Double), true), + StructField::new("metadata_field", DataType::Primitive(PrimitiveType::String), true), + ])?; + + let mut config = DeltaTableConfig::default(); + config.custom_schema = Some(Arc::new(custom_schema)); + let snapshot_with_custom = EagerSnapshot::try_new(log_store.as_ref(), config, None).await?; + + assert_eq!(snapshot_with_custom.schema().fields().len(), 5); + assert_eq!(snapshot_with_custom.arrow_schema().fields().len(), 5); + + assert!(snapshot_with_custom.schema().field("id").is_some()); + assert!(snapshot_with_custom.schema().field("value").is_some()); + assert!(snapshot_with_custom.schema().field("description").is_some()); + assert!(snapshot_with_custom.schema().field("computed_field").is_some()); + assert!(snapshot_with_custom.schema().field("metadata_field").is_some()); + + let kernel_schema = snapshot_with_custom.schema(); + let arrow_schema = snapshot_with_custom.arrow_schema(); + assert_eq!(kernel_schema.fields().len(), arrow_schema.fields().len()); + + Ok(()) + } + + #[tokio::test] + async fn test_snapshot_custom_schema_type_override() -> TestResult { + use crate::kernel::{DataType, PrimitiveType}; + use crate::operations::create::CreateBuilder; + + let table_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Long), false), + StructField::new("count", DataType::Primitive(PrimitiveType::Integer), false), + ])?; + + let table = CreateBuilder::new() + .with_location("memory:///test_snapshot_schema_override") + .with_columns(table_schema.fields().cloned()) + .await?; + + let log_store = table.log_store(); + + let custom_schema = StructType::try_new(vec![ + StructField::new("count", DataType::Primitive(PrimitiveType::Integer), true), + ])?; + + let mut config = DeltaTableConfig::default(); + config.custom_schema = Some(Arc::new(custom_schema)); + let snapshot = Snapshot::try_new(log_store.as_ref(), config, None).await?; + + assert_eq!(snapshot.schema().fields().len(), 2); + + let schema = snapshot.schema(); + let count_field = schema.field("count").unwrap(); + assert!(count_field.is_nullable()); + + let id_field = schema.field("id").unwrap(); + assert!(!id_field.is_nullable()); + + Ok(()) + } } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index bdb9eec0c..2047fa719 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -12,10 +12,13 @@ use tracing::debug; use url::Url; use super::normalize_table_url; +use crate::kernel::StructType; use crate::logstore::storage::IORuntime; use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories}; use crate::{DeltaResult, DeltaTable, DeltaTableError}; use crate::kernel::size_limits::LogSizeLimiter; +use arrow::datatypes::Schema as ArrowSchema; +use delta_kernel::engine::arrow_conversion::TryIntoKernel; /// possible version specifications for loading a delta table #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] @@ -60,6 +63,11 @@ pub struct DeltaTableConfig { #[delta(skip)] pub log_size_limiter: Option, + + #[serde(skip_serializing, skip_deserializing)] + #[delta(skip)] + /// Optional custom schema to use instead of loading from table metadata + pub custom_schema: Option>, } impl Default for DeltaTableConfig { @@ -70,6 +78,7 @@ impl Default for DeltaTableConfig { log_batch_size: 1024, io_runtime: None, log_size_limiter: None, + custom_schema: None, } } } @@ -80,6 +89,11 @@ impl PartialEq for DeltaTableConfig { && self.log_buffer_size == other.log_buffer_size && self.log_batch_size == other.log_batch_size && self.log_size_limiter == other.log_size_limiter + && match (&self.custom_schema, &other.custom_schema) { + (Some(a), Some(b)) => Arc::ptr_eq(a, b) || a == b, + (None, None) => true, + _ => false, + } } } @@ -250,6 +264,28 @@ impl DeltaTableBuilder { self } + /// Set a custom schema to merge with the one from table metadata + /// + /// This allows you to override some of the schema that would normally be read + /// from the Delta log. This can be useful when you want to work with a subset + /// of columns or apply custom schema transformations. + /// + pub fn with_schema(mut self, schema: Arc) -> Self { + self.table_config.custom_schema = Some(schema); + self + } + + /// Set a custom schema from an Arrow schema to merge with the one from table metadata + /// + /// This is a convenience method that converts an Arrow schema to a Delta schema + /// and then sets it as the custom schema. This allows you to override the schema + /// that would normally be read from the Delta log using an Arrow schema. + /// + pub fn with_arrow_schema(self, arrow_schema: Arc) -> DeltaResult { + let delta_schema: StructType = arrow_schema.as_ref().try_into_kernel()?; + Ok(self.with_schema(Arc::new(delta_schema))) + } + /// Storage options for configuring backend object store pub fn storage_options(&self) -> HashMap { let mut storage_options = self.storage_options.clone().unwrap_or_default(); @@ -736,4 +772,142 @@ mod tests { std::fs::remove_dir_all(&test_dir).ok(); } + + #[test] + fn test_with_schema() -> DeltaResult<()> { + use crate::kernel::{DataType, PrimitiveType, StructField}; + + let schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Integer), false), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + ])?; + + let table_url = Url::parse("memory:///test").unwrap(); + let builder = DeltaTableBuilder::from_url(table_url)? + .with_schema(Arc::new(schema.clone())); + + // Verify the schema is set in the config + assert!(builder.table_config.custom_schema.is_some()); + let custom_schema = builder.table_config.custom_schema.as_ref().unwrap(); + assert_eq!(custom_schema.fields().len(), 2); + assert_eq!(custom_schema.fields().next().unwrap().name(), "id"); + + Ok(()) + } + + #[tokio::test] + async fn test_with_schema_integration() -> DeltaResult<()> { + use crate::kernel::{DataType, PrimitiveType, StructField}; + use crate::operations::create::CreateBuilder; + + // First create a table with a full schema + let full_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Integer), false), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true), + ])?; + + let table = CreateBuilder::new() + .with_location("memory:///test_custom_schema") + .with_columns(full_schema.fields().cloned()) + .await?; + + // Verify the table was created with the full schema + let snapshot = table.snapshot()?; + assert_eq!(snapshot.schema().fields().len(), 3); + + // Now load the table with a custom schema (subset of columns) + let custom_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Integer), false), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + ])?; + + let table_url = Url::parse("memory:///test_custom_schema").unwrap(); + let table_with_custom = DeltaTableBuilder::from_url(table_url)? + .with_schema(Arc::new(custom_schema.clone())) + .build()?; + + // The table should have the custom schema in its config + assert!(table_with_custom.config.custom_schema.is_some()); + let config_schema = table_with_custom.config.custom_schema.as_ref().unwrap(); + assert_eq!(config_schema.fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_with_arrow_schema() -> DeltaResult<()> { + use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; + use crate::kernel::{DataType, PrimitiveType, StructField}; + use crate::operations::create::CreateBuilder; + + // First create a table with a full schema + let full_schema = StructType::try_new(vec![ + StructField::new("id", DataType::Primitive(PrimitiveType::Integer), false), + StructField::new("name", DataType::Primitive(PrimitiveType::String), true), + StructField::new("age", DataType::Primitive(PrimitiveType::Integer), true), + ])?; + + let table = CreateBuilder::new() + .with_location("memory:///test_arrow_schema") + .with_columns(full_schema.fields().cloned()) + .await?; + + // Verify the table was created with the full schema + let snapshot = table.snapshot()?; + assert_eq!(snapshot.schema().fields().len(), 3); + + // Now load the table with a custom Arrow schema (subset of columns) + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("name", ArrowDataType::Utf8, true), + ]); + + let table_url = Url::parse("memory:///test_arrow_schema").unwrap(); + let table_with_custom = DeltaTableBuilder::from_url(table_url)? + .with_arrow_schema(Arc::new(arrow_schema))? + .build()?; + + // The table should have the custom schema in its config + assert!(table_with_custom.config.custom_schema.is_some()); + let config_schema = table_with_custom.config.custom_schema.as_ref().unwrap(); + assert_eq!(config_schema.fields().len(), 2); + + // Verify the field names match + let field_names: Vec<&str> = config_schema.fields() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(field_names, vec!["id", "name"]); + + Ok(()) + } + + #[test] + fn test_with_arrow_schema_conversion() -> DeltaResult<()> { + use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; + + // Create an Arrow schema + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("name", ArrowDataType::Utf8, true), + ArrowField::new("value", ArrowDataType::Float64, true), + ]); + + let table_url = Url::parse("memory:///test_conversion").unwrap(); + let builder = DeltaTableBuilder::from_url(table_url)? + .with_arrow_schema(Arc::new(arrow_schema))?; + + // Verify the schema was converted and set + assert!(builder.table_config.custom_schema.is_some()); + let delta_schema = builder.table_config.custom_schema.as_ref().unwrap(); + assert_eq!(delta_schema.fields().len(), 3); + + // Verify field names + let field_names: Vec<&str> = delta_schema.fields() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(field_names, vec!["id", "name", "value"]); + + Ok(()) + } }