Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 303 additions & 0 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,31 @@ impl Transaction {
Operation::Merge { fragments, .. } => {
final_fragments.extend(fragments.clone());

// Refresh last_updated_at_version_meta for fragments that were
// Carry-forward fragments that were not touched must not be bumped.
if next_row_id.is_some() {
let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
let old_frag_files: std::collections::HashMap<u64, &Vec<lance_table::format::DataFile>> =
maybe_existing_fragments?
.iter()
.map(|f| (f.id, &f.files))
.collect();

for fragment in final_fragments.iter_mut() {
let is_rewritten = match old_frag_files.get(&fragment.id) {
Some(old_files) => &fragment.files != *old_files,
None => true,
};

if is_rewritten {
lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
fragment,
new_version,
)?;
}
}
}

// Some fields that have indices may have been removed, so we should
// remove those indices as well.
Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
Expand Down Expand Up @@ -4527,4 +4552,282 @@ mod tests {
// cause strict validation of the new STABLE fragments.
validate_operation(Some(&legacy_manifest), &operation).unwrap();
}

// Helper function to create a manifest with stable row IDs enabled and the given fragments.
fn manifest_with_stable_row_ids(
schema: &ArrowSchema,
fragments: Vec<Fragment>,
version: u64,
next_row_id: u64,
) -> Manifest {
let mut manifest = Manifest::new(
LanceSchema::try_from(schema).unwrap(),
Arc::new(fragments),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
manifest.version = version;
manifest.next_row_id = next_row_id;
manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS;
manifest.writer_feature_flags |= FLAG_STABLE_ROW_IDS;
manifest
}

// Helper function to build a simple DataFile for testing purposes.
fn test_data_file(path: &str, fields: Vec<i32>) -> DataFile {
DataFile::new(
path,
fields,
vec![],
lance_file::format::MAJOR_VERSION as u32,
lance_file::format::MINOR_VERSION as u32,
None,
None,
)
}

// Helper function to build a fragment with stable row ID metadata.
fn fragment_with_row_ids(
id: u64,
physical_rows: usize,
row_id_start: u64,
files: Vec<DataFile>,
last_updated_version: u64,
) -> Fragment {
let row_id_seq = RowIdSequence::from(row_id_start..row_id_start + physical_rows as u64);
let serialized = write_row_ids(&row_id_seq);

let version_meta = lance_table::rowids::version::build_version_meta(
&Fragment {
id,
physical_rows: Some(physical_rows),
row_id_meta: Some(RowIdMeta::Inline(serialized.clone())),
files: files.clone(),
deletion_file: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
},
last_updated_version,
);

Fragment {
id,
physical_rows: Some(physical_rows),
row_id_meta: Some(RowIdMeta::Inline(serialized)),
files,
deletion_file: None,
last_updated_at_version_meta: version_meta.clone(),
created_at_version_meta: version_meta,
}
}

// Helper function to extract the uniform version from a fragment's last_updated_at_version_meta.
fn get_last_updated_version(frag: &Fragment) -> u64 {
let meta = frag
.last_updated_at_version_meta
.as_ref()
.expect("expected last_updated_at_version_meta to be set");
let seq = meta.load_sequence().unwrap();
let versions: Vec<u64> = seq.versions().collect();
assert!(
!versions.is_empty(),
"version sequence should not be empty for fragment {}",
frag.id
);
assert!(
versions.iter().all(|&v| v == versions[0]),
"expected uniform version sequence, got {:?}",
versions
);
versions[0]
}

/// Simulate: dataset at version 5 has two fragments.
/// A Merge operation rewrites frag0 (adds a new data file) but
/// carries forward frag1 unchanged.
/// Expected: frag0.last_updated = 6 (new version), frag1.last_updated = 5 (unchanged).
#[test]
fn test_merge_refreshes_last_updated_for_rewritten_fragments() {
let schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
ArrowField::new("name", DataType::Utf8, true),
]);

let original_file_a = test_data_file("data_a.lance", vec![0, 1]);
let original_file_b = test_data_file("data_b.lance", vec![0, 1]);

let frag0 = fragment_with_row_ids(0, 10, 0, vec![original_file_a.clone()], 5);
let frag1 = fragment_with_row_ids(1, 10, 10, vec![original_file_b.clone()], 5);

let manifest = manifest_with_stable_row_ids(&schema, vec![frag0, frag1], 5, 20);

let new_column_file = test_data_file("new_col_frag0.lance", vec![2]);
let merged_frag0 = Fragment {
files: vec![original_file_a, new_column_file],
..manifest.fragments[0].clone()
};
let merged_frag1 = manifest.fragments[1].clone();

let new_schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
ArrowField::new("name", DataType::Utf8, true),
ArrowField::new("score", DataType::Int32, true),
]);

let transaction = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_frag0, merged_frag1],
schema: LanceSchema::try_from(&new_schema).unwrap(),
},
None,
);

let config = ManifestWriteConfig {
use_stable_row_ids: true,
..Default::default()
};

let (result_manifest, _) = transaction
.build_manifest(Some(&manifest), vec![], "txn_merge", &config)
.unwrap();

assert_eq!(result_manifest.fragments.len(), 2);

let frag0_updated = get_last_updated_version(&result_manifest.fragments[0]);
assert_eq!(
frag0_updated, 6,
"Rewritten fragment should have last_updated=6, got {}",
frag0_updated
);

let frag1_updated = get_last_updated_version(&result_manifest.fragments[1]);
assert_eq!(
frag1_updated, 5,
"Carry-forward fragment should have last_updated=5, got {}",
frag1_updated
);
}

/// Simulate: all fragments are rewritten (typical add_columns via Dataset::merge).
/// Expected: all fragments get last_updated = new version.
#[test]
fn test_merge_refreshes_all_fragments_when_all_rewritten() {
let schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
]);

let file_a = test_data_file("data_a.lance", vec![0]);
let file_b = test_data_file("data_b.lance", vec![0]);

let frag0 = fragment_with_row_ids(0, 5, 0, vec![file_a.clone()], 1);
let frag1 = fragment_with_row_ids(1, 5, 5, vec![file_b.clone()], 1);

let manifest = manifest_with_stable_row_ids(&schema, vec![frag0, frag1], 1, 10);

let new_col_a = test_data_file("new_col_a.lance", vec![1]);
let new_col_b = test_data_file("new_col_b.lance", vec![1]);

let merged_frag0 = Fragment {
files: vec![file_a, new_col_a],
..manifest.fragments[0].clone()
};
let merged_frag1 = Fragment {
files: vec![file_b, new_col_b],
..manifest.fragments[1].clone()
};

let new_schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
ArrowField::new("score", DataType::Int32, true),
]);

let transaction = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_frag0, merged_frag1],
schema: LanceSchema::try_from(&new_schema).unwrap(),
},
None,
);

let config = ManifestWriteConfig {
use_stable_row_ids: true,
..Default::default()
};

let (result_manifest, _) = transaction
.build_manifest(Some(&manifest), vec![], "txn_merge_all", &config)
.unwrap();

assert_eq!(result_manifest.fragments.len(), 2);

for (i, frag) in result_manifest.fragments.iter().enumerate() {
let v = get_last_updated_version(frag);
assert_eq!(
v, 2,
"Fragment {} should have last_updated=2, got {}",
i, v
);
}
}

/// When stable row IDs are NOT enabled, Merge should not touch version metadata.
#[test]
fn test_merge_without_stable_row_ids_does_not_set_version_meta() {
let schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
]);

let file_a = test_data_file("data_a.lance", vec![0]);

let frag0 = Fragment {
id: 0,
physical_rows: Some(10),
row_id_meta: None,
files: vec![file_a.clone()],
deletion_file: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
};

let mut manifest = Manifest::new(
LanceSchema::try_from(&schema).unwrap(),
Arc::new(vec![frag0]),
DataStorageFormat::new(LanceFileVersion::V2_0),
HashMap::new(),
);
manifest.version = 1;

let new_col = test_data_file("new_col.lance", vec![1]);
let merged_frag0 = Fragment {
files: vec![file_a, new_col],
..manifest.fragments[0].clone()
};

let new_schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
ArrowField::new("score", DataType::Int32, true),
]);

let transaction = Transaction::new(
manifest.version,
Operation::Merge {
fragments: vec![merged_frag0],
schema: LanceSchema::try_from(&new_schema).unwrap(),
},
None,
);

let (result_manifest, _) = transaction
.build_manifest(Some(&manifest), vec![], "txn_no_stable", &Default::default())
.unwrap();

assert!(
result_manifest.fragments[0]
.last_updated_at_version_meta
.is_none(),
"Without stable row IDs, version meta should not be set"
);
}
}
Loading