diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index 02adfdc95e..f67101bbff 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -25,6 +25,9 @@ class Blob: uri: Optional[str] = None position: Optional[int] = None size: Optional[int] = None + # Rows sharing the same positive ref_id are de-duplicated in storage. + # 0 / None means no sharing. + ref_id: Optional[int] = None def __post_init__(self) -> None: if self.data is not None and self.uri is not None: @@ -74,6 +77,7 @@ def __init__(self) -> None: pa.field("uri", pa.utf8(), nullable=True), pa.field("position", pa.uint64(), nullable=True), pa.field("size", pa.uint64(), nullable=True), + pa.field("ref_id", pa.uint32(), nullable=True), ] ) pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2") @@ -119,6 +123,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values: list[Optional[str]] = [] position_values: list[Optional[int]] = [] size_values: list[Optional[int]] = [] + ref_id_values: list[Optional[int]] = [] null_mask: list[bool] = [] for v in values: @@ -127,6 +132,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(None) position_values.append(None) size_values.append(None) + ref_id_values.append(None) null_mask.append(True) continue @@ -135,6 +141,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(v.uri) position_values.append(v.position) size_values.append(v.size) + ref_id_values.append(v.ref_id) null_mask.append(False) continue @@ -145,6 +152,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(v) position_values.append(None) size_values.append(None) + ref_id_values.append(None) null_mask.append(False) continue @@ -153,6 +161,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_values.append(None) position_values.append(None) size_values.append(None) + ref_id_values.append(None) null_mask.append(False) continue @@ -165,10 +174,11 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray": uri_arr = pa.array(uri_values, type=pa.utf8()) position_arr = pa.array(position_values, type=pa.uint64()) size_arr = pa.array(size_values, type=pa.uint64()) + ref_id_arr = pa.array(ref_id_values, type=pa.uint32()) mask_arr = pa.array(null_mask, type=pa.bool_()) storage = pa.StructArray.from_arrays( - [data_arr, uri_arr, position_arr, size_arr], - names=["data", "uri", "position", "size"], + [data_arr, uri_arr, position_arr, size_arr, ref_id_arr], + names=["data", "uri", "position", "size", "ref_id"], mask=mask_arr, ) return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value] diff --git a/python/test_ref_id_dedup.py b/python/test_ref_id_dedup.py new file mode 100644 index 0000000000..af7921dfb7 --- /dev/null +++ b/python/test_ref_id_dedup.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +"""Verify ref_id dedup works on all three blob storage paths. + +For each size class (Inline, Packed, Dedicated), writes 20 rows that all +share the same ref_id. The physical storage should hold 1 copy, not 20. +""" +import os +import shutil +from pathlib import Path + +import lance +import pyarrow as pa +from lance.blob import Blob, blob_array, blob_field + +WORK = Path("./ref_id_dedup_test") +N_ROWS = 20 + + +def make_batch(n_rows: int, payload: bytes, ref_id: int) -> pa.RecordBatch: + images = blob_array([Blob(data=payload, ref_id=ref_id) for _ in range(n_rows)]) + schema = pa.schema([ + pa.field("row_id", pa.int64()), + blob_field("blob"), + ]) + return pa.RecordBatch.from_arrays( + [pa.array(range(n_rows), type=pa.int64()), images], + schema=schema, + ) + + +def dataset_size_bytes(ds_path: Path) -> tuple[int, int, int]: + """Return (main_lance_bytes, sidecar_bytes, sidecar_count).""" + main_bytes = 0 + sidecar_bytes = 0 + sidecar_count = 0 + for p in ds_path.rglob("*"): + if p.is_file(): + if p.suffix == ".blob": + sidecar_bytes += p.stat().st_size + sidecar_count += 1 + elif p.suffix == ".lance": + main_bytes += p.stat().st_size + return main_bytes, sidecar_bytes, sidecar_count + + +def run_case(label: str, payload_size: int, ref_id: int) -> None: + case_dir = WORK / label + case_dir.mkdir(parents=True, exist_ok=True) + ds_path = case_dir / "ds.lance" + + payload = os.urandom(payload_size) + batch = make_batch(N_ROWS, payload, ref_id) + + lance.write_dataset(batch, str(ds_path), mode="create", data_storage_version="2.2") + + main_b, side_b, side_n = dataset_size_bytes(ds_path) + ideal = payload_size + + print(f"\n=== {label} (payload={payload_size:,}B, ref_id={ref_id}, rows={N_ROWS}) ===") + print(f" main .lance: {main_b:>13,} B") + print(f" sidecar total: {side_b:>13,} B ({side_n} files)") + print(f" ideal (1 copy): {ideal:>13,} B") + print(f" naive (20 copies): {ideal * N_ROWS:>11,} B") + + # Read back to verify correctness + ds = lance.dataset(str(ds_path)) + blobs = ds.take_blobs("blob", indices=list(range(N_ROWS))) + read = [bytes(b.read()) for b in blobs] + ok = all(b == payload for b in read) + unique = len({hash(b) for b in read}) + print(f" read-back OK: {ok} ({unique} unique contents out of {N_ROWS} rows)") + + # Dedup verdict + total_storage = main_b + side_b + amp = total_storage / ideal if ideal else 0 + dedup_works = amp < 3.0 # allow some overhead for descriptors, headers, etc + verdict = "✓ DEDUP" if dedup_works else "✗ DUPLICATED" + print(f" amplification: {amp:.2f}x [{verdict}]") + + +def main() -> None: + if WORK.exists(): + shutil.rmtree(WORK) + WORK.mkdir(parents=True) + + # Inline path: 32 KB (< 64 KB) + run_case("inline_32kb", 32 * 1024, ref_id=101) + + # Packed path: 1 MB (> 64 KB, < 4 MB) + run_case("packed_1mb", 1 * 1024 * 1024, ref_id=102) + + # Dedicated path: 6 MB (> 4 MB) + run_case("dedicated_6mb", 6 * 1024 * 1024, ref_id=103) + + +if __name__ == "__main__": + main() diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index d395420e13..93b491265b 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -53,6 +53,7 @@ pub static BLOB_V2_DESC_FIELDS: LazyLock = LazyLock::new(|| { ArrowField::new("size", DataType::UInt64, false), ArrowField::new("blob_id", DataType::UInt32, false), ArrowField::new("blob_uri", DataType::Utf8, false), + ArrowField::new("ref_id", DataType::UInt32, true), ]) }); diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 41e2068803..d9199a8dee 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -739,7 +739,23 @@ impl CoreFieldDecoderStrategy { return Ok(scheduler); } - // Maybe a blob descriptions struct? + // Blob v2 extension type: route the 6-field descriptor struct + // through the structural scheduler. We match by type name because + // PyArrow extension metadata may not round-trip through Lance's + // schema serialization. + let type_str = data_type.to_string(); + if type_str.contains("lance.blob.v2") || type_str.contains("BlobType>") { + let column_info = column_infos.expect_next()?; + let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new( + column_info.as_ref(), + self.decompressor_strategy.as_ref(), + self.cache_repetition_index, + field, + )?); + column_infos.next_top_level(); + return Ok(scheduler); + } + // Maybe a blob descriptions struct? (Blob v1) if field.is_blob() { let column_info = column_infos.peek(); if column_info.page_infos.iter().any(|page| { diff --git a/rust/lance-encoding/src/encodings/logical/blob.rs b/rust/lance-encoding/src/encodings/logical/blob.rs index cad2112baf..17c5aa6c0c 100644 --- a/rust/lance-encoding/src/encodings/logical/blob.rs +++ b/rust/lance-encoding/src/encodings/logical/blob.rs @@ -228,9 +228,14 @@ impl FieldEncoder for BlobStructuralEncoder { } } -/// Blob v2 structural encoder +/// Blob v2 structural encoder with ref_id deduplication on the Inline path. pub struct BlobV2StructuralEncoder { descriptor_encoder: Box, + /// Maps ref_id -> (position, size) for Inline blobs already appended to + /// the out-of-line buffer. Subsequent rows with the same ref_id reuse the + /// cached coordinates instead of re-appending identical bytes. + /// Lives only for the duration of this encoder (single write session). + ref_dedup_tmp_map: HashMap, } impl BlobV2StructuralEncoder { @@ -258,7 +263,10 @@ impl BlobV2StructuralEncoder { Arc::new(HashMap::new()), )?); - Ok(Self { descriptor_encoder }) + Ok(Self { + descriptor_encoder, + ref_dedup_tmp_map: HashMap::new(), + }) } } @@ -314,6 +322,11 @@ impl FieldEncoder for BlobV2StructuralEncoder { Error::invalid_input_source("Blob v2 struct missing `position` field".into()) })? .as_primitive::(); + // Optional ref_id column: if present and > 0, dedup Inline bytes by + // sharing the same out-of-line buffer position across rows. + let ref_id_col = struct_arr + .column_by_name("ref_id") + .map(|c| c.as_primitive::()); let row_count = struct_arr.len(); @@ -322,8 +335,15 @@ impl FieldEncoder for BlobV2StructuralEncoder { let mut size_builder = PrimitiveBuilder::::with_capacity(row_count); let mut blob_id_builder = PrimitiveBuilder::::with_capacity(row_count); let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16); + let mut ref_id_builder = PrimitiveBuilder::::with_capacity(row_count); for i in 0..row_count { + let ref_id = ref_id_col + .as_ref() + .filter(|col| !col.is_null(i)) + .map(|col| col.value(i)) + .unwrap_or(0); + let (kind_value, position_value, size_value, blob_id_value, uri_value) = if struct_arr.is_null(i) || kind_col.is_null(i) { (BlobKind::Inline as u8, 0, 0, 0, "".to_string()) @@ -370,18 +390,41 @@ impl FieldEncoder for BlobV2StructuralEncoder { "".to_string(), ), BlobKind::Inline => { - let data_val = data_col.value(i); - let blob_len = data_val.len() as u64; - let position = external_buffers - .add_buffer(LanceBuffer::from(Buffer::from(data_val))); - - ( - BlobKind::Inline as u8, - position, - blob_len, - 0, - "".to_string(), - ) + // ref_id dedup: only first occurrence appends to + // the out-of-line buffer; later rows reuse (pos, size). + if ref_id > 0 { + if let Some(&(pos, size)) = + self.ref_dedup_tmp_map.get(&ref_id) + { + (BlobKind::Inline as u8, pos, size, 0, "".to_string()) + } else { + let data_val = data_col.value(i); + let blob_len = data_val.len() as u64; + let position = external_buffers + .add_buffer(LanceBuffer::from(Buffer::from(data_val))); + self.ref_dedup_tmp_map + .insert(ref_id, (position, blob_len)); + ( + BlobKind::Inline as u8, + position, + blob_len, + 0, + "".to_string(), + ) + } + } else { + let data_val = data_col.value(i); + let blob_len = data_val.len() as u64; + let position = external_buffers + .add_buffer(LanceBuffer::from(Buffer::from(data_val))); + ( + BlobKind::Inline as u8, + position, + blob_len, + 0, + "".to_string(), + ) + } } } }; @@ -391,6 +434,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { size_builder.append_value(size_value); blob_id_builder.append_value(blob_id_value); uri_builder.append_value(uri_value); + ref_id_builder.append_value(ref_id); } let children: Vec = vec![ Arc::new(kind_builder.finish()), @@ -398,6 +442,7 @@ impl FieldEncoder for BlobV2StructuralEncoder { Arc::new(size_builder.finish()), Arc::new(blob_id_builder.finish()), Arc::new(uri_builder.finish()), + Arc::new(ref_id_builder.finish()), ]; let descriptor_array = Arc::new(StructArray::try_new( diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index a828f3fc33..5104943e31 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -213,6 +213,20 @@ pub struct BlobPreprocessor { external_blob_mode: ExternalBlobMode, source_store_registry: Arc, source_store_params: ObjectStoreParams, + // Cache for ref_id sharing on Packed / Dedicated sidecar paths. + // Inline dedup lives in the encoder (only it knows buffer positions). + ref_id_sidecar_cache: HashMap, +} + +/// Cached sidecar reference for ref_id deduplication. +/// +/// First row with a given ref_id writes the blob to a sidecar (Packed or +/// Dedicated). Subsequent rows with the same ref_id reuse the cached coordinates +/// instead of writing again. +#[derive(Clone, Copy)] +enum SidecarRef { + Dedicated { blob_id: u32, size: u64 }, + Packed { blob_id: u32, position: u64, size: u64 }, } /// A logical slice of an external blob that can be materialized or streamed into Lance-managed @@ -346,6 +360,7 @@ impl BlobPreprocessor { external_base_resolver, allow_external_blob_outside_bases, external_blob_mode, + ref_id_sidecar_cache: HashMap::new(), source_store_registry, source_store_params, } @@ -486,6 +501,9 @@ impl BlobPreprocessor { let size_col = struct_arr .column_by_name("size") .map(|col| col.as_primitive::()); + let ref_id_col = struct_arr + .column_by_name("ref_id") + .map(|col| col.as_primitive::()); let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0); let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0); @@ -496,6 +514,8 @@ impl BlobPreprocessor { let mut kind_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); let mut position_builder = PrimitiveBuilder::::with_capacity(struct_arr.len()); + let mut ref_id_builder = + PrimitiveBuilder::::with_capacity(struct_arr.len()); let struct_nulls = struct_arr.nulls(); @@ -507,6 +527,7 @@ impl BlobPreprocessor { blob_size_builder.append_null(); kind_builder.append_null(); position_builder.append_null(); + ref_id_builder.append_null(); continue; } @@ -521,6 +542,41 @@ impl BlobPreprocessor { .map(|col| !col.is_null(i)) .unwrap_or(false); let data_len = if has_data { data_col.value(i).len() } else { 0 }; + // 0 (or null) means no sharing; non-zero values participate in dedup. + let ref_id = ref_id_col + .as_ref() + .filter(|col| !col.is_null(i)) + .map(|col| col.value(i)) + .unwrap_or(0); + + // Early cache hit: reuse a previously-written sidecar blob. + if ref_id > 0 { + if let Some(cached) = self.ref_id_sidecar_cache.get(&ref_id).copied() { + match cached { + SidecarRef::Dedicated { blob_id, size } => { + kind_builder.append_value(BlobKind::Dedicated as u8); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(blob_id); + blob_size_builder.append_value(size); + position_builder.append_null(); + ref_id_builder.append_value(ref_id); + continue; + } + SidecarRef::Packed { blob_id, position, size } => { + kind_builder.append_value(BlobKind::Packed as u8); + data_builder.append_null(); + uri_builder.append_null(); + blob_id_builder.append_value(blob_id); + blob_size_builder.append_value(size); + position_builder.append_value(position); + ref_id_builder.append_value(ref_id); + continue; + } + } + } + // Not cached: falls through to normal write path; then cached below. + } let dedicated_threshold = self.dedicated_thresholds[idx]; if has_data && data_len > dedicated_threshold { @@ -534,6 +590,13 @@ impl BlobPreprocessor { blob_id_builder.append_value(blob_id); blob_size_builder.append_value(data_len as u64); position_builder.append_null(); + ref_id_builder.append_value(ref_id); + if ref_id > 0 { + self.ref_id_sidecar_cache.insert( + ref_id, + SidecarRef::Dedicated { blob_id, size: data_len as u64 }, + ); + } continue; } @@ -548,6 +611,17 @@ impl BlobPreprocessor { blob_id_builder.append_value(pack_blob_id); blob_size_builder.append_value(data_len as u64); position_builder.append_value(position); + ref_id_builder.append_value(ref_id); + if ref_id > 0 { + self.ref_id_sidecar_cache.insert( + ref_id, + SidecarRef::Packed { + blob_id: pack_blob_id, + position, + size: data_len as u64, + }, + ); + } continue; } @@ -583,6 +657,13 @@ impl BlobPreprocessor { blob_id_builder.append_value(blob_id); blob_size_builder.append_value(data_len); position_builder.append_null(); + ref_id_builder.append_value(ref_id); + if ref_id > 0 { + self.ref_id_sidecar_cache.insert( + ref_id, + SidecarRef::Dedicated { blob_id, size: data_len }, + ); + } continue; } @@ -597,6 +678,17 @@ impl BlobPreprocessor { blob_id_builder.append_value(pack_blob_id); blob_size_builder.append_value(data_len); position_builder.append_value(position); + ref_id_builder.append_value(ref_id); + if ref_id > 0 { + self.ref_id_sidecar_cache.insert( + ref_id, + SidecarRef::Packed { + blob_id: pack_blob_id, + position, + size: data_len, + }, + ); + } continue; } @@ -608,6 +700,7 @@ impl BlobPreprocessor { blob_id_builder.append_null(); blob_size_builder.append_null(); position_builder.append_null(); + ref_id_builder.append_value(ref_id); continue; } @@ -629,6 +722,7 @@ impl BlobPreprocessor { blob_size_builder.append_null(); position_builder.append_null(); } + ref_id_builder.append_value(ref_id); continue; } @@ -640,6 +734,7 @@ impl BlobPreprocessor { blob_id_builder.append_null(); blob_size_builder.append_null(); position_builder.append_null(); + ref_id_builder.append_value(ref_id); } else { data_builder.append_null(); uri_builder.append_null(); @@ -647,6 +742,7 @@ impl BlobPreprocessor { blob_size_builder.append_null(); kind_builder.append_null(); position_builder.append_null(); + ref_id_builder.append_null(); } } @@ -657,6 +753,7 @@ impl BlobPreprocessor { arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true), arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true), arrow_schema::Field::new("position", ArrowDataType::UInt64, true), + arrow_schema::Field::new("ref_id", ArrowDataType::UInt32, true), ]; let struct_array = arrow_array::StructArray::try_new( @@ -668,6 +765,7 @@ impl BlobPreprocessor { Arc::new(blob_id_builder.finish()), Arc::new(blob_size_builder.finish()), Arc::new(position_builder.finish()), + Arc::new(ref_id_builder.finish()), ], struct_nulls.cloned(), )?; @@ -1769,17 +1867,19 @@ fn blob_version_from_descriptions(descriptions: &StructArray) -> Result= 5 && fields[0].name() == "kind" && fields[1].name() == "position" && fields[2].name() == "size" && fields[3].name() == "blob_id" && fields[4].name() == "blob_uri" + && (fields.len() == 5 || (fields.len() == 6 && fields[5].name() == "ref_id")) { return Ok(BlobVersion::V2); } Err(Error::invalid_input_source(format!( - "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri) but got {:?}", + "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri[,ref_id]) but got {:?}", fields.iter().map(|f| f.name().as_str()).collect::>(), ) .into()))