From 4896eab9083c99d20f98bf54ce72ecac89213bb2 Mon Sep 17 00:00:00 2001 From: fecet Date: Mon, 23 Mar 2026 18:07:05 +0800 Subject: [PATCH] feat: bundle table_metadata_updates with Operation::Update Let Operation::Update commits atomically carry an UpdateMap that's merged into manifest.table_metadata alongside the row changes. Streaming writers (consumer merge_insert, broker-style fold) can record a cursor/offset slot in the same commit as the data they produce, avoiding a separate UpdateConfig round-trip. - proto: Transaction.Update.table_metadata_updates = 9 - conflict_resolver: central key-level check runs before per-op checks so a row conflict on concurrent Updates does not mask an incompatible metadata collision; also handles the cross-kind pairing with Operation::UpdateConfig - Python: LanceOperation.Update.table_metadata_updates - Java JNI: compile-compat passthrough (no Java API surface yet) --- java/lance-jni/src/transaction.rs | 2 + protos/transaction.proto | 6 + python/python/lance/dataset.py | 6 + python/src/transaction.rs | 7 + rust/lance/src/dataset/fragment.rs | 2 + rust/lance/src/dataset/transaction.rs | 42 +++++ rust/lance/src/dataset/write/commit.rs | 1 + rust/lance/src/dataset/write/merge_insert.rs | 2 + .../dataset/write/merge_insert/exec/delete.rs | 1 + .../dataset/write/merge_insert/exec/write.rs | 1 + rust/lance/src/dataset/write/update.rs | 1 + rust/lance/src/io/commit/conflict_resolver.rs | 157 +++++++++++++++++- 12 files changed, 226 insertions(+), 2 deletions(-) diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index d46140020f0..27b48e5974f 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -433,6 +433,7 @@ fn convert_to_java_operation_inner<'local>( fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter: _, + .. } => { let removed_ids: Vec> = removed_fragment_ids .iter() @@ -1222,6 +1223,7 @@ fn convert_to_rust_operation( fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter: None, + table_metadata_updates: None, } } "DataReplacement" => { diff --git a/protos/transaction.proto b/protos/transaction.proto index 06268feb252..87a8d59396c 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -248,6 +248,12 @@ message Transaction { // Filter for checking existence of keys in newly inserted rows, used for conflict detection. // Only tracks keys from INSERT operations during merge insert, not updates. optional KeyExistenceFilter inserted_rows = 8; + // Atomic updates to `manifest.table_metadata`, applied alongside the row + // changes in this Update commit. Intended for user-constructed Update + // transactions that bundle a small durable slot (e.g. a consumer cursor) + // with the data they produce, avoiding a separate UpdateConfig round-trip. + // Lance's internal write paths (`update`, `merge_insert`) leave this unset. + optional UpdateMap table_metadata_updates = 9; } // The mode of update operation diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 79641ab4438..ae5dd8d54c8 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4966,6 +4966,11 @@ class Update(BaseOperation): fields_for_preserving_frag_bitmap: list[int] The fields that used to judge whether to preserve the new frag's id into the frag bitmap of the specified indices. + table_metadata_updates: Optional[UpdateMap] + Atomic updates to the manifest's ``table_metadata`` applied + alongside the row changes in this commit. Lance's internal Update + writers leave this ``None``; set it to bundle a small durable slot + (e.g. a consumer cursor) with the data you produce. """ removed_fragment_ids: List[int] = dataclasses.field(default_factory=list) @@ -4978,6 +4983,7 @@ class Update(BaseOperation): default_factory=list ) update_mode: str = "" + table_metadata_updates: Optional["LanceOperation.UpdateMap"] = None def __post_init__(self): LanceOperation._validate_fragments(self.updated_fragments) diff --git a/python/src/transaction.rs b/python/src/transaction.rs index eae5b49a15d..56ea0b195b7 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -285,6 +285,9 @@ impl FromPyObject<'_> for PyLance { .ok() .map(|py_mode| py_mode.0); + let table_metadata_updates = + extract_update_map(&ob.getattr("table_metadata_updates")?)?; + let op = Operation::Update { removed_fragment_ids, updated_fragments, @@ -294,6 +297,7 @@ impl FromPyObject<'_> for PyLance { fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter: None, + table_metadata_updates, }; Ok(Self(op)) } @@ -445,6 +449,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { fields_modified, fields_for_preserving_frag_bitmap, update_mode, + table_metadata_updates, .. } => { let removed_fragment_ids = removed_fragment_ids.into_pyobject(py)?; @@ -465,6 +470,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { let cls = namespace .getattr("Update") .expect("Failed to get Update class"); + let table_metadata_updates = export_update_map(py, table_metadata_updates)?; cls.call1(( removed_fragment_ids, updated_fragments, @@ -472,6 +478,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { fields_modified, fields_for_preserving_frag_bitmap, update_mode, + table_metadata_updates, )) } Operation::DataReplacement { replacements } => { diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 29b33a75018..61254436cb3 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2863,6 +2863,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), inserted_rows_filter: None, + table_metadata_updates: None, }; let mut dataset1 = Dataset::commit( test_uri, @@ -2936,6 +2937,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: Some(UpdateMode::RewriteColumns), inserted_rows_filter: None, + table_metadata_updates: None, }; let dataset2 = Dataset::commit( test_uri, diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 62f30696234..1739de54c14 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -108,6 +108,22 @@ pub struct UpdateMap { pub replace: bool, } +impl UpdateMap { + /// Any `replace: true` wipes the target, so it conflicts with everything; + /// otherwise two maps conflict when they touch the same key. + pub(crate) fn conflicts_with(&self, other: &Self) -> bool { + if self.replace || other.replace { + return true; + } + let self_keys: std::collections::HashSet<&str> = + self.update_entries.iter().map(|e| e.key.as_str()).collect(); + other + .update_entries + .iter() + .any(|e| self_keys.contains(e.key.as_str())) + } +} + /// An operation on a dataset. #[derive(Debug, Clone, DeepSizeOf)] pub enum Operation { @@ -222,6 +238,12 @@ pub enum Operation { /// Optional filter for detecting conflicts on inserted row keys. /// Only tracks keys from INSERT operations during merge insert, not updates. inserted_rows_filter: Option, + /// Atomic updates to `manifest.table_metadata`, applied alongside the + /// row changes in this commit. Lance's internal write paths + /// (`merge_insert`, `update`) leave this `None`; user-constructed + /// Update commits may set it to bundle a small durable slot (e.g. a + /// consumer cursor) with the data they produce. + table_metadata_updates: Option, }, /// Project to a new schema. This only changes the schema, not the data. @@ -421,6 +443,7 @@ impl PartialEq for Operation { fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap, update_mode: a_update_mode, inserted_rows_filter: a_inserted_rows_filter, + table_metadata_updates: a_table_metadata_updates, }, Self::Update { removed_fragment_ids: b_removed, @@ -431,6 +454,7 @@ impl PartialEq for Operation { fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap, update_mode: b_update_mode, inserted_rows_filter: b_inserted_rows_filter, + table_metadata_updates: b_table_metadata_updates, }, ) => { compare_vec(a_removed, b_removed) @@ -444,6 +468,7 @@ impl PartialEq for Operation { ) && a_update_mode == b_update_mode && a_inserted_rows_filter == b_inserted_rows_filter + && a_table_metadata_updates == b_table_metadata_updates } (Self::Project { schema: a }, Self::Project { schema: b }) => a == b, ( @@ -1219,6 +1244,9 @@ impl Operation { } pub(crate) fn modifies_same_metadata(&self, other: &Self) -> bool { + // `table_metadata_updates` collisions are handled centrally in + // `TransactionRebase::check_txn`, since both `Update` and + // `UpdateConfig` can carry them and must be compared across kinds. match (self, other) { ( Self::UpdateConfig { @@ -2220,6 +2248,14 @@ impl Transaction { } } } + Operation::Update { + table_metadata_updates: Some(table_metadata_updates), + .. + } => { + let mut table_metadata = manifest.table_metadata.clone(); + apply_update_map(&mut table_metadata, table_metadata_updates); + manifest.table_metadata = table_metadata; + } _ => {} } @@ -2857,6 +2893,7 @@ impl TryFrom for Transaction { fields_for_preserving_frag_bitmap, update_mode, inserted_rows, + table_metadata_updates, })) => Operation::Update { removed_fragment_ids, updated_fragments: updated_fragments @@ -2881,6 +2918,7 @@ impl TryFrom for Transaction { inserted_rows_filter: inserted_rows .map(|ik| KeyExistenceFilter::try_from(&ik)) .transpose()?, + table_metadata_updates: table_metadata_updates.as_ref().map(UpdateMap::from), }, Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => { Operation::Project { @@ -3183,6 +3221,7 @@ impl From<&Transaction> for pb::Transaction { fields_for_preserving_frag_bitmap, update_mode, inserted_rows_filter, + table_metadata_updates, } => pb::transaction::Operation::Update(pb::transaction::Update { removed_fragment_ids: removed_fragment_ids.clone(), updated_fragments: updated_fragments @@ -3196,6 +3235,9 @@ impl From<&Transaction> for pb::Transaction { .map(pb::MergedGeneration::from) .collect(), fields_for_preserving_frag_bitmap: fields_for_preserving_frag_bitmap.clone(), + table_metadata_updates: table_metadata_updates + .as_ref() + .map(pb::transaction::UpdateMap::from), update_mode: update_mode .as_ref() .map(|mode| match mode { diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 08a4f8f8f02..03401f06685 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -755,6 +755,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, read_version: 1, tag: None, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 3654fcc24a8..b3338aa33dd 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1649,6 +1649,7 @@ impl MergeInsertJob { fields_for_preserving_frag_bitmap: vec![], // in-place update do not affect preserving frag bitmap update_mode: Some(RewriteColumns), inserted_rows_filter: None, // not implemented for v1 + table_metadata_updates: None, }; // We have rewritten the fragments, not just the deletion files, so // we can't use affected rows here. @@ -1723,6 +1724,7 @@ impl MergeInsertJob { .collect(), update_mode: Some(RewriteRows), inserted_rows_filter: None, // not implemented for v1 + table_metadata_updates: None, }; let affected_rows = Some(RowAddrTreeMap::from(removed_row_addrs)); diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs index 1302aeb69d9..4c94c0f407c 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -284,6 +284,7 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec { .collect(), update_mode: None, inserted_rows_filter: None, // Delete-only operations don't insert rows + table_metadata_updates: None, }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 703d4e1f6c8..f7bdc56cf61 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -928,6 +928,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { .collect(), update_mode: Some(RewriteRows), inserted_rows_filter: inserted_rows_filter.clone(), + table_metadata_updates: None, }; // Step 5: Create and store the transaction diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index ec34000642d..92dfe5deb3a 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -377,6 +377,7 @@ impl UpdateJob { fields_for_preserving_frag_bitmap, update_mode: Some(RewriteRows), inserted_rows_filter: None, + table_metadata_updates: None, }; let transaction = Transaction::new(dataset.manifest.version, operation, None); diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index ec03ba596db..2edc26fab3a 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -7,7 +7,7 @@ use crate::index::mem_wal::{load_mem_wal_index_details, new_mem_wal_index_meta}; use crate::io::deletion::read_dataset_deletion_file; use crate::{ Dataset, - dataset::transaction::{Operation, Transaction}, + dataset::transaction::{Operation, Transaction, UpdateMap}, }; use futures::{StreamExt, TryStreamExt}; use lance_core::utils::mask::RowSetOps; @@ -192,6 +192,19 @@ impl<'a> TransactionRebase<'a> { /// return Ok(()). pub fn check_txn(&mut self, other_transaction: &Transaction, other_version: u64) -> Result<()> { let op = &self.transaction.operation; + // Run metadata-key conflict detection before per-op checks: those may + // early-return `RetryableCommitConflict` on row/fragment conflicts, + // and a retry on a `table_metadata` collision would silently + // rebase-overwrite the loser's slot instead of surfacing an + // incompatible conflict. + if let Some(self_updates) = operation_table_metadata_updates(&self.transaction.operation) + && let Some(other_updates) = + operation_table_metadata_updates(&other_transaction.operation) + && self_updates.conflicts_with(other_updates) + { + return Err(self.incompatible_conflict_err(other_transaction, other_version)); + } + match op { Operation::Delete { .. } => self.check_delete_txn(other_transaction, other_version), Operation::Update { .. } => self.check_update_txn(other_transaction, other_version), @@ -1677,6 +1690,23 @@ fn wrong_operation_err(op: &Operation) -> Error { Error::internal(format!("function called against a wrong operation: {}", op)) } +/// Both `Operation::UpdateConfig` and `Operation::Update` can write to +/// `manifest.table_metadata`; the conflict resolver compares their maps +/// across either pairing. +fn operation_table_metadata_updates(operation: &Operation) -> Option<&UpdateMap> { + match operation { + Operation::UpdateConfig { + table_metadata_updates, + .. + } => table_metadata_updates.as_ref(), + Operation::Update { + table_metadata_updates, + .. + } => table_metadata_updates.as_ref(), + _ => None, + } +} + #[cfg(test)] mod tests { use std::{num::NonZero, sync::Arc}; @@ -1692,7 +1722,9 @@ mod tests { use lance_table::io::deletion::{deletion_file_path, read_deletion_file}; use super::*; - use crate::dataset::transaction::{DataReplacementGroup, RewriteGroup}; + use crate::dataset::transaction::{ + DataReplacementGroup, RewriteGroup, UpdateMap, UpdateMapEntry, + }; use crate::dataset::write::WriteMode; use crate::session::caches::DeletionFileKey; use crate::{ @@ -1700,6 +1732,7 @@ mod tests { io, }; use lance_table::format::DataFile; + use rstest::rstest; async fn test_dataset(num_rows: usize, num_fragments: usize) -> Dataset { let write_params = WriteParams { @@ -1786,6 +1819,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }; let transaction = Transaction::new_from_version(1, operation); let other_operations = [ @@ -1798,6 +1832,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, Operation::Delete { deleted_fragment_ids: vec![3], @@ -1813,6 +1848,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, ]; let other_transactions = other_operations.map(|op| Transaction::new_from_version(2, op)); @@ -1915,6 +1951,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, Operation::Delete { updated_fragments: vec![apply_deletion(&[1], &mut fragment, &dataset).await], @@ -1930,6 +1967,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, ]; let transactions = @@ -2052,6 +2090,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, ), ( @@ -2065,6 +2104,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, ), ( @@ -2225,6 +2265,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, create_update_config_for_test( Some(HashMap::from_iter(vec![( @@ -2431,6 +2472,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, [ Compatible, // append @@ -2953,6 +2995,7 @@ mod tests { fields_for_preserving_frag_bitmap: vec![], update_mode: None, inserted_rows_filter: None, + table_metadata_updates: None, }, ]; @@ -3606,4 +3649,114 @@ mod tests { assert_eq!(dataset_v2.count_rows(None).await.unwrap(), 5); } + + /// Build a tx that writes `key` to `table_metadata` either via an + /// `Operation::Update`-bundled slot (rows + cursor atomic) or via a + /// dedicated `Operation::UpdateConfig` op. + fn metadata_txn(on_update: bool, key: &str, replace: bool) -> Transaction { + let map = UpdateMap { + update_entries: vec![UpdateMapEntry::from((key, "v"))], + replace, + }; + let op = if on_update { + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + table_metadata_updates: Some(map), + } + } else { + Operation::UpdateConfig { + config_updates: None, + table_metadata_updates: Some(map), + schema_metadata_updates: None, + field_metadata_updates: HashMap::new(), + } + }; + Transaction::new_from_version(1, op) + } + + /// Cross-kind `table_metadata_updates` collisions: `Operation::Update` + /// bundled slot vs. `Operation::UpdateConfig`, and Update-vs-Update, must + /// be surfaced by the centralized check in `check_txn`. + #[rstest] + #[case::update_disjoint(true, true, "c0", "c1", false, true)] + #[case::update_overlap(true, true, "c0", "c0", false, false)] + #[case::update_replace(true, true, "c0", "c1", true, false)] + #[case::cross_disjoint(true, false, "c0", "c1", false, true)] + #[case::cross_overlap(true, false, "c0", "c0", false, false)] + #[tokio::test] + async fn test_table_metadata_conflict( + #[case] lhs_on_update: bool, + #[case] rhs_on_update: bool, + #[case] key1: &str, + #[case] key2: &str, + #[case] replace1: bool, + #[case] expect_ok: bool, + ) { + let dataset = test_dataset(5, 1).await; + let txn1 = metadata_txn(lhs_on_update, key1, replace1); + let txn2 = metadata_txn(rhs_on_update, key2, false); + + let mut rebase = TransactionRebase::try_new(&dataset, txn1, None) + .await + .unwrap(); + let result = rebase.check_txn(&txn2, 2); + if expect_ok { + assert!(result.is_ok(), "expected ok, got {:?}", result); + } else { + assert!( + matches!(result, Err(Error::IncompatibleTransaction { .. })), + "expected IncompatibleTransaction, got {:?}", + result + ); + } + } + + /// Combined: row-level conflict on the same fragment plus a clashing + /// `table_metadata` key on two `Operation::Update` commits must surface as + /// `IncompatibleTransaction`. Otherwise `check_update_txn` early-returns + /// `RetryableCommitConflict` on the row conflict and the retry silently + /// rebase-overwrites the metadata slot. + #[tokio::test] + async fn test_metadata_conflict_beats_row_retry() { + let dataset = test_dataset(5, 1).await; + let map = |k: &str| UpdateMap { + update_entries: vec![UpdateMapEntry::from((k, "v"))], + replace: false, + }; + let mk = |cursor: &str| { + Transaction::new_from_version( + 1, + Operation::Update { + // Same fragment on both sides forces a row-level conflict + // through `check_update_txn`. + removed_fragment_ids: vec![0], + updated_fragments: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + table_metadata_updates: Some(map(cursor)), + }, + ) + }; + + let mut rebase = TransactionRebase::try_new(&dataset, mk("c0"), None) + .await + .unwrap(); + let result = rebase.check_txn(&mk("c0"), 2); + assert!( + matches!(result, Err(Error::IncompatibleTransaction { .. })), + "expected IncompatibleTransaction, got {:?}", + result + ); + } }