Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class Blob:
uri: Optional[str] = None
position: Optional[int] = None
size: Optional[int] = None
# Rows sharing the same positive ref_id are de-duplicated in storage.
# 0 / None means no sharing.
ref_id: Optional[int] = None

def __post_init__(self) -> None:
if self.data is not None and self.uri is not None:
Expand Down Expand Up @@ -74,6 +77,7 @@ def __init__(self) -> None:
pa.field("uri", pa.utf8(), nullable=True),
pa.field("position", pa.uint64(), nullable=True),
pa.field("size", pa.uint64(), nullable=True),
pa.field("ref_id", pa.uint32(), nullable=True),
]
)
pa.ExtensionType.__init__(self, storage_type, "lance.blob.v2")
Expand Down Expand Up @@ -119,6 +123,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values: list[Optional[str]] = []
position_values: list[Optional[int]] = []
size_values: list[Optional[int]] = []
ref_id_values: list[Optional[int]] = []
null_mask: list[bool] = []

for v in values:
Expand All @@ -127,6 +132,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(None)
position_values.append(None)
size_values.append(None)
ref_id_values.append(None)
null_mask.append(True)
continue

Expand All @@ -135,6 +141,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(v.uri)
position_values.append(v.position)
size_values.append(v.size)
ref_id_values.append(v.ref_id)
null_mask.append(False)
continue

Expand All @@ -145,6 +152,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(v)
position_values.append(None)
size_values.append(None)
ref_id_values.append(None)
null_mask.append(False)
continue

Expand All @@ -153,6 +161,7 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_values.append(None)
position_values.append(None)
size_values.append(None)
ref_id_values.append(None)
null_mask.append(False)
continue

Expand All @@ -165,10 +174,11 @@ def from_pylist(cls, values: list[Any]) -> "BlobArray":
uri_arr = pa.array(uri_values, type=pa.utf8())
position_arr = pa.array(position_values, type=pa.uint64())
size_arr = pa.array(size_values, type=pa.uint64())
ref_id_arr = pa.array(ref_id_values, type=pa.uint32())
mask_arr = pa.array(null_mask, type=pa.bool_())
storage = pa.StructArray.from_arrays(
[data_arr, uri_arr, position_arr, size_arr],
names=["data", "uri", "position", "size"],
[data_arr, uri_arr, position_arr, size_arr, ref_id_arr],
names=["data", "uri", "position", "size", "ref_id"],
mask=mask_arr,
)
return pa.ExtensionArray.from_storage(BlobType(), storage) # type: ignore[return-value]
Expand Down
97 changes: 97 additions & 0 deletions python/test_ref_id_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""Verify ref_id dedup works on all three blob storage paths.

For each size class (Inline, Packed, Dedicated), writes 20 rows that all
share the same ref_id. The physical storage should hold 1 copy, not 20.
"""
import os
import shutil
from pathlib import Path

import lance
import pyarrow as pa
from lance.blob import Blob, blob_array, blob_field

WORK = Path("./ref_id_dedup_test")
N_ROWS = 20


def make_batch(n_rows: int, payload: bytes, ref_id: int) -> pa.RecordBatch:
images = blob_array([Blob(data=payload, ref_id=ref_id) for _ in range(n_rows)])
schema = pa.schema([
pa.field("row_id", pa.int64()),
blob_field("blob"),
])
return pa.RecordBatch.from_arrays(
[pa.array(range(n_rows), type=pa.int64()), images],
schema=schema,
)


def dataset_size_bytes(ds_path: Path) -> tuple[int, int, int]:
"""Return (main_lance_bytes, sidecar_bytes, sidecar_count)."""
main_bytes = 0
sidecar_bytes = 0
sidecar_count = 0
for p in ds_path.rglob("*"):
if p.is_file():
if p.suffix == ".blob":
sidecar_bytes += p.stat().st_size
sidecar_count += 1
elif p.suffix == ".lance":
main_bytes += p.stat().st_size
return main_bytes, sidecar_bytes, sidecar_count


def run_case(label: str, payload_size: int, ref_id: int) -> None:
case_dir = WORK / label
case_dir.mkdir(parents=True, exist_ok=True)
ds_path = case_dir / "ds.lance"

payload = os.urandom(payload_size)
batch = make_batch(N_ROWS, payload, ref_id)

lance.write_dataset(batch, str(ds_path), mode="create", data_storage_version="2.2")

main_b, side_b, side_n = dataset_size_bytes(ds_path)
ideal = payload_size

print(f"\n=== {label} (payload={payload_size:,}B, ref_id={ref_id}, rows={N_ROWS}) ===")
print(f" main .lance: {main_b:>13,} B")
print(f" sidecar total: {side_b:>13,} B ({side_n} files)")
print(f" ideal (1 copy): {ideal:>13,} B")
print(f" naive (20 copies): {ideal * N_ROWS:>11,} B")

# Read back to verify correctness
ds = lance.dataset(str(ds_path))
blobs = ds.take_blobs("blob", indices=list(range(N_ROWS)))
read = [bytes(b.read()) for b in blobs]
ok = all(b == payload for b in read)
unique = len({hash(b) for b in read})
print(f" read-back OK: {ok} ({unique} unique contents out of {N_ROWS} rows)")

# Dedup verdict
total_storage = main_b + side_b
amp = total_storage / ideal if ideal else 0
dedup_works = amp < 3.0 # allow some overhead for descriptors, headers, etc
verdict = "✓ DEDUP" if dedup_works else "✗ DUPLICATED"
print(f" amplification: {amp:.2f}x [{verdict}]")


def main() -> None:
if WORK.exists():
shutil.rmtree(WORK)
WORK.mkdir(parents=True)

# Inline path: 32 KB (< 64 KB)
run_case("inline_32kb", 32 * 1024, ref_id=101)

# Packed path: 1 MB (> 64 KB, < 4 MB)
run_case("packed_1mb", 1 * 1024 * 1024, ref_id=102)

# Dedicated path: 6 MB (> 4 MB)
run_case("dedicated_6mb", 6 * 1024 * 1024, ref_id=103)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub static BLOB_V2_DESC_FIELDS: LazyLock<Fields> = LazyLock::new(|| {
ArrowField::new("size", DataType::UInt64, false),
ArrowField::new("blob_id", DataType::UInt32, false),
ArrowField::new("blob_uri", DataType::Utf8, false),
ArrowField::new("ref_id", DataType::UInt32, true),
])
});

Expand Down
18 changes: 17 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,23 @@ impl CoreFieldDecoderStrategy {

return Ok(scheduler);
}
// Maybe a blob descriptions struct?
// Blob v2 extension type: route the 6-field descriptor struct
// through the structural scheduler. We match by type name because
// PyArrow extension metadata may not round-trip through Lance's
// schema serialization.
let type_str = data_type.to_string();
if type_str.contains("lance.blob.v2") || type_str.contains("BlobType>") {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);
column_infos.next_top_level();
return Ok(scheduler);
}
// Maybe a blob descriptions struct? (Blob v1)
if field.is_blob() {
let column_info = column_infos.peek();
if column_info.page_infos.iter().any(|page| {
Expand Down
73 changes: 59 additions & 14 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,14 @@ impl FieldEncoder for BlobStructuralEncoder {
}
}

/// Blob v2 structural encoder
/// Blob v2 structural encoder with ref_id deduplication on the Inline path.
pub struct BlobV2StructuralEncoder {
descriptor_encoder: Box<dyn FieldEncoder>,
/// Maps ref_id -> (position, size) for Inline blobs already appended to
/// the out-of-line buffer. Subsequent rows with the same ref_id reuse the
/// cached coordinates instead of re-appending identical bytes.
/// Lives only for the duration of this encoder (single write session).
ref_dedup_tmp_map: HashMap<u32, (u64, u64)>,
}

impl BlobV2StructuralEncoder {
Expand Down Expand Up @@ -258,7 +263,10 @@ impl BlobV2StructuralEncoder {
Arc::new(HashMap::new()),
)?);

Ok(Self { descriptor_encoder })
Ok(Self {
descriptor_encoder,
ref_dedup_tmp_map: HashMap::new(),
})
}
}

Expand Down Expand Up @@ -314,6 +322,11 @@ impl FieldEncoder for BlobV2StructuralEncoder {
Error::invalid_input_source("Blob v2 struct missing `position` field".into())
})?
.as_primitive::<UInt64Type>();
// Optional ref_id column: if present and > 0, dedup Inline bytes by
// sharing the same out-of-line buffer position across rows.
let ref_id_col = struct_arr
.column_by_name("ref_id")
.map(|c| c.as_primitive::<UInt32Type>());

let row_count = struct_arr.len();

Expand All @@ -322,8 +335,15 @@ impl FieldEncoder for BlobV2StructuralEncoder {
let mut size_builder = PrimitiveBuilder::<UInt64Type>::with_capacity(row_count);
let mut blob_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);
let mut uri_builder = StringBuilder::with_capacity(row_count, row_count * 16);
let mut ref_id_builder = PrimitiveBuilder::<UInt32Type>::with_capacity(row_count);

for i in 0..row_count {
let ref_id = ref_id_col
.as_ref()
.filter(|col| !col.is_null(i))
.map(|col| col.value(i))
.unwrap_or(0);

let (kind_value, position_value, size_value, blob_id_value, uri_value) =
if struct_arr.is_null(i) || kind_col.is_null(i) {
(BlobKind::Inline as u8, 0, 0, 0, "".to_string())
Expand Down Expand Up @@ -370,18 +390,41 @@ impl FieldEncoder for BlobV2StructuralEncoder {
"".to_string(),
),
BlobKind::Inline => {
let data_val = data_col.value(i);
let blob_len = data_val.len() as u64;
let position = external_buffers
.add_buffer(LanceBuffer::from(Buffer::from(data_val)));

(
BlobKind::Inline as u8,
position,
blob_len,
0,
"".to_string(),
)
// ref_id dedup: only first occurrence appends to
// the out-of-line buffer; later rows reuse (pos, size).
if ref_id > 0 {
if let Some(&(pos, size)) =
self.ref_dedup_tmp_map.get(&ref_id)
{
(BlobKind::Inline as u8, pos, size, 0, "".to_string())
} else {
let data_val = data_col.value(i);
let blob_len = data_val.len() as u64;
let position = external_buffers
.add_buffer(LanceBuffer::from(Buffer::from(data_val)));
self.ref_dedup_tmp_map
.insert(ref_id, (position, blob_len));
(
BlobKind::Inline as u8,
position,
blob_len,
0,
"".to_string(),
)
}
} else {
let data_val = data_col.value(i);
let blob_len = data_val.len() as u64;
let position = external_buffers
.add_buffer(LanceBuffer::from(Buffer::from(data_val)));
(
BlobKind::Inline as u8,
position,
blob_len,
0,
"".to_string(),
)
}
}
}
};
Expand All @@ -391,13 +434,15 @@ impl FieldEncoder for BlobV2StructuralEncoder {
size_builder.append_value(size_value);
blob_id_builder.append_value(blob_id_value);
uri_builder.append_value(uri_value);
ref_id_builder.append_value(ref_id);
}
let children: Vec<ArrayRef> = vec![
Arc::new(kind_builder.finish()),
Arc::new(position_builder.finish()),
Arc::new(size_builder.finish()),
Arc::new(blob_id_builder.finish()),
Arc::new(uri_builder.finish()),
Arc::new(ref_id_builder.finish()),
];

let descriptor_array = Arc::new(StructArray::try_new(
Expand Down
Loading
Loading