Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 149 additions & 3 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use arrow_array::{
use arrow_array::{
Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
};
use arrow_buffer::MutableBuffer;
use arrow_buffer::{MutableBuffer, OffsetBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
use arrow_select::{interleave::interleave, take::take};
Expand Down Expand Up @@ -1059,6 +1059,20 @@ fn merge_struct_validity(
}
}

/// Normalize offsets so they start from zero.
///
/// When a ListArray is sliced, its offsets may not start at zero but `trimmed_values()`
/// returns values starting from index zero. This function subtracts the first offset
/// from all offsets to keep them consistent with the trimmed values buffer.
fn normalize_offsets<O: OffsetSizeTrait>(offsets: &OffsetBuffer<O>) -> OffsetBuffer<O> {
let first = offsets.first().copied().unwrap_or(O::zero());
if first == O::zero() {
return offsets.clone();
}
let normalized: Vec<O> = offsets.iter().map(|&v| v - first).collect();
OffsetBuffer::new(normalized.into())
}

fn merge_list_child_values(
child_field: &Field,
left_values: ArrayRef,
Expand Down Expand Up @@ -1378,9 +1392,10 @@ fn merge_with_schema(
);
let merged_validity =
merge_struct_validity(left_list.nulls(), right_list.nulls());
let normalized_offsets = normalize_offsets(left_list.offsets());
let merged_list = ListArray::new(
child_field.clone(),
left_list.offsets().clone(),
normalized_offsets,
merged_values,
merged_validity,
);
Expand All @@ -1403,9 +1418,10 @@ fn merge_with_schema(
);
let merged_validity =
merge_struct_validity(left_list.nulls(), right_list.nulls());
let normalized_offsets = normalize_offsets(left_list.offsets());
let merged_list = LargeListArray::new(
child_field.clone(),
left_list.offsets().clone(),
normalized_offsets,
merged_values,
merged_validity,
);
Expand Down Expand Up @@ -2556,4 +2572,134 @@ mod tests {
&Int32Array::from(vec![1, 2]) as &dyn Array
);
}

#[test]
fn test_merge_with_schema_sliced_list_with_nonzero_offset() {
// Regression test for https://github.com/lancedb/lance/issues/6580
//
// When a ListArray is sliced (e.g. from a filtered scan's trailing batch),
// its offsets don't start at 0. `trimmed_values()` returns values starting
// from 0, so the offsets must be normalized to match.

// Build a list<struct{id: i32}> with 3 rows:
// row 0: [{id: 1}, {id: 2}] offsets: 0..2
// row 1: [{id: 3}] offsets: 2..3
// row 2: [{id: 4}, {id: 5}] offsets: 3..5
let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let struct_field = Field::new("id", DataType::Int32, false);
let inner_struct = Arc::new(StructArray::new(
Fields::from(vec![struct_field.clone()]),
vec![ids as ArrayRef],
None,
));
let list_field = Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![struct_field.clone()])),
true,
));
let full_list = ListArray::new(
list_field,
OffsetBuffer::from_lengths([2, 1, 2]),
inner_struct,
None,
);

// Slice to take rows 1..3 — offsets become [2, 3, 5] (non-zero start)
let sliced_list = full_list.slice(1, 2);
let sliced_list = sliced_list.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(*sliced_list.offsets().first().unwrap(), 2);

// Build a right batch with a different field in the struct
let right_names = Arc::new(StringArray::from(vec!["a", "b", "c"]));
let right_struct_field = Field::new("name", DataType::Utf8, true);
let right_struct = Arc::new(StructArray::new(
Fields::from(vec![right_struct_field.clone()]),
vec![right_names as ArrayRef],
None,
));
let right_list = ListArray::new(
Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![right_struct_field.clone()])),
true,
)),
OffsetBuffer::from_lengths([1, 2]),
right_struct,
None,
);

// Target schema merges both struct fields
let target_schema = Schema::new(vec![Field::new(
"col",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![struct_field, right_struct_field])),
true,
))),
true,
)]);

let left_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"col",
sliced_list.data_type().clone(),
true,
)])),
vec![Arc::new(sliced_list.clone()) as ArrayRef],
)
.unwrap();

let right_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"col",
right_list.data_type().clone(),
true,
)])),
vec![Arc::new(right_list) as ArrayRef],
)
.unwrap();

// Before the fix, this panicked with "offset past values" because
// offsets [2, 3, 5] pointed past the trimmed values buffer of length 3.
let merged = left_batch
.merge_with_schema(&right_batch, &target_schema)
.unwrap();

let merged_list = merged
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();

// Offsets should be normalized: [0, 1, 3]
assert_eq!(*merged_list.offsets().first().unwrap(), 0);
assert_eq!(merged_list.len(), 2);

// Row 0 should have 1 element (original row 1: [{id: 3}])
assert_eq!(merged_list.value_length(0), 1);
// Row 1 should have 2 elements (original row 2: [{id: 4}, {id: 5}])
assert_eq!(merged_list.value_length(1), 2);

// Verify struct values
let merged_struct = merged_list.values().as_struct();
assert_eq!(merged_struct.num_columns(), 2);

let merged_ids = merged_struct
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(merged_ids.values(), &[3, 4, 5]);

let merged_names = merged_struct
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(merged_names.value(0), "a");
assert_eq!(merged_names.value(1), "b");
assert_eq!(merged_names.value(2), "c");
}
}
Loading