From c7d1708d475a0e5c37a5a799464340bf87cbc50b Mon Sep 17 00:00:00 2001 From: Shangqing Yang Date: Tue, 21 Apr 2026 18:30:22 -0400 Subject: [PATCH] fix: ensure _row_last_updated_at_version is correctly refreshed for rewritten fragments in Operation::Merge --- rust/lance/src/dataset/transaction.rs | 303 ++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 62f30696234..6a29c313067 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1959,6 +1959,31 @@ impl Transaction { Operation::Merge { fragments, .. } => { final_fragments.extend(fragments.clone()); + // Refresh last_updated_at_version_meta for fragments that were + // Carry-forward fragments that were not touched must not be bumped. + if next_row_id.is_some() { + let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); + let old_frag_files: std::collections::HashMap> = + maybe_existing_fragments? + .iter() + .map(|f| (f.id, &f.files)) + .collect(); + + for fragment in final_fragments.iter_mut() { + let is_rewritten = match old_frag_files.get(&fragment.id) { + Some(old_files) => &fragment.files != *old_files, + None => true, + }; + + if is_rewritten { + lance_table::rowids::version::refresh_row_latest_update_meta_for_full_frag_rewrite_cols( + fragment, + new_version, + )?; + } + } + } + // Some fields that have indices may have been removed, so we should // remove those indices as well. Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments) @@ -4527,4 +4552,282 @@ mod tests { // cause strict validation of the new STABLE fragments. validate_operation(Some(&legacy_manifest), &operation).unwrap(); } + + // Helper function to create a manifest with stable row IDs enabled and the given fragments. + fn manifest_with_stable_row_ids( + schema: &ArrowSchema, + fragments: Vec, + version: u64, + next_row_id: u64, + ) -> Manifest { + let mut manifest = Manifest::new( + LanceSchema::try_from(schema).unwrap(), + Arc::new(fragments), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.version = version; + manifest.next_row_id = next_row_id; + manifest.reader_feature_flags |= FLAG_STABLE_ROW_IDS; + manifest.writer_feature_flags |= FLAG_STABLE_ROW_IDS; + manifest + } + + // Helper function to build a simple DataFile for testing purposes. + fn test_data_file(path: &str, fields: Vec) -> DataFile { + DataFile::new( + path, + fields, + vec![], + lance_file::format::MAJOR_VERSION as u32, + lance_file::format::MINOR_VERSION as u32, + None, + None, + ) + } + + // Helper function to build a fragment with stable row ID metadata. + fn fragment_with_row_ids( + id: u64, + physical_rows: usize, + row_id_start: u64, + files: Vec, + last_updated_version: u64, + ) -> Fragment { + let row_id_seq = RowIdSequence::from(row_id_start..row_id_start + physical_rows as u64); + let serialized = write_row_ids(&row_id_seq); + + let version_meta = lance_table::rowids::version::build_version_meta( + &Fragment { + id, + physical_rows: Some(physical_rows), + row_id_meta: Some(RowIdMeta::Inline(serialized.clone())), + files: files.clone(), + deletion_file: None, + last_updated_at_version_meta: None, + created_at_version_meta: None, + }, + last_updated_version, + ); + + Fragment { + id, + physical_rows: Some(physical_rows), + row_id_meta: Some(RowIdMeta::Inline(serialized)), + files, + deletion_file: None, + last_updated_at_version_meta: version_meta.clone(), + created_at_version_meta: version_meta, + } + } + + // Helper function to extract the uniform version from a fragment's last_updated_at_version_meta. + fn get_last_updated_version(frag: &Fragment) -> u64 { + let meta = frag + .last_updated_at_version_meta + .as_ref() + .expect("expected last_updated_at_version_meta to be set"); + let seq = meta.load_sequence().unwrap(); + let versions: Vec = seq.versions().collect(); + assert!( + !versions.is_empty(), + "version sequence should not be empty for fragment {}", + frag.id + ); + assert!( + versions.iter().all(|&v| v == versions[0]), + "expected uniform version sequence, got {:?}", + versions + ); + versions[0] + } + + /// Simulate: dataset at version 5 has two fragments. + /// A Merge operation rewrites frag0 (adds a new data file) but + /// carries forward frag1 unchanged. + /// Expected: frag0.last_updated = 6 (new version), frag1.last_updated = 5 (unchanged). + #[test] + fn test_merge_refreshes_last_updated_for_rewritten_fragments() { + let schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("name", DataType::Utf8, true), + ]); + + let original_file_a = test_data_file("data_a.lance", vec![0, 1]); + let original_file_b = test_data_file("data_b.lance", vec![0, 1]); + + let frag0 = fragment_with_row_ids(0, 10, 0, vec![original_file_a.clone()], 5); + let frag1 = fragment_with_row_ids(1, 10, 10, vec![original_file_b.clone()], 5); + + let manifest = manifest_with_stable_row_ids(&schema, vec![frag0, frag1], 5, 20); + + let new_column_file = test_data_file("new_col_frag0.lance", vec![2]); + let merged_frag0 = Fragment { + files: vec![original_file_a, new_column_file], + ..manifest.fragments[0].clone() + }; + let merged_frag1 = manifest.fragments[1].clone(); + + let new_schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("name", DataType::Utf8, true), + ArrowField::new("score", DataType::Int32, true), + ]); + + let transaction = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_frag0, merged_frag1], + schema: LanceSchema::try_from(&new_schema).unwrap(), + }, + None, + ); + + let config = ManifestWriteConfig { + use_stable_row_ids: true, + ..Default::default() + }; + + let (result_manifest, _) = transaction + .build_manifest(Some(&manifest), vec![], "txn_merge", &config) + .unwrap(); + + assert_eq!(result_manifest.fragments.len(), 2); + + let frag0_updated = get_last_updated_version(&result_manifest.fragments[0]); + assert_eq!( + frag0_updated, 6, + "Rewritten fragment should have last_updated=6, got {}", + frag0_updated + ); + + let frag1_updated = get_last_updated_version(&result_manifest.fragments[1]); + assert_eq!( + frag1_updated, 5, + "Carry-forward fragment should have last_updated=5, got {}", + frag1_updated + ); + } + + /// Simulate: all fragments are rewritten (typical add_columns via Dataset::merge). + /// Expected: all fragments get last_updated = new version. + #[test] + fn test_merge_refreshes_all_fragments_when_all_rewritten() { + let schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ]); + + let file_a = test_data_file("data_a.lance", vec![0]); + let file_b = test_data_file("data_b.lance", vec![0]); + + let frag0 = fragment_with_row_ids(0, 5, 0, vec![file_a.clone()], 1); + let frag1 = fragment_with_row_ids(1, 5, 5, vec![file_b.clone()], 1); + + let manifest = manifest_with_stable_row_ids(&schema, vec![frag0, frag1], 1, 10); + + let new_col_a = test_data_file("new_col_a.lance", vec![1]); + let new_col_b = test_data_file("new_col_b.lance", vec![1]); + + let merged_frag0 = Fragment { + files: vec![file_a, new_col_a], + ..manifest.fragments[0].clone() + }; + let merged_frag1 = Fragment { + files: vec![file_b, new_col_b], + ..manifest.fragments[1].clone() + }; + + let new_schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("score", DataType::Int32, true), + ]); + + let transaction = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_frag0, merged_frag1], + schema: LanceSchema::try_from(&new_schema).unwrap(), + }, + None, + ); + + let config = ManifestWriteConfig { + use_stable_row_ids: true, + ..Default::default() + }; + + let (result_manifest, _) = transaction + .build_manifest(Some(&manifest), vec![], "txn_merge_all", &config) + .unwrap(); + + assert_eq!(result_manifest.fragments.len(), 2); + + for (i, frag) in result_manifest.fragments.iter().enumerate() { + let v = get_last_updated_version(frag); + assert_eq!( + v, 2, + "Fragment {} should have last_updated=2, got {}", + i, v + ); + } + } + + /// When stable row IDs are NOT enabled, Merge should not touch version metadata. + #[test] + fn test_merge_without_stable_row_ids_does_not_set_version_meta() { + let schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ]); + + let file_a = test_data_file("data_a.lance", vec![0]); + + let frag0 = Fragment { + id: 0, + physical_rows: Some(10), + row_id_meta: None, + files: vec![file_a.clone()], + deletion_file: None, + last_updated_at_version_meta: None, + created_at_version_meta: None, + }; + + let mut manifest = Manifest::new( + LanceSchema::try_from(&schema).unwrap(), + Arc::new(vec![frag0]), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.version = 1; + + let new_col = test_data_file("new_col.lance", vec![1]); + let merged_frag0 = Fragment { + files: vec![file_a, new_col], + ..manifest.fragments[0].clone() + }; + + let new_schema = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("score", DataType::Int32, true), + ]); + + let transaction = Transaction::new( + manifest.version, + Operation::Merge { + fragments: vec![merged_frag0], + schema: LanceSchema::try_from(&new_schema).unwrap(), + }, + None, + ); + + let (result_manifest, _) = transaction + .build_manifest(Some(&manifest), vec![], "txn_no_stable", &Default::default()) + .unwrap(); + + assert!( + result_manifest.fragments[0] + .last_updated_at_version_meta + .is_none(), + "Without stable row IDs, version meta should not be set" + ); + } }