Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ fn convert_to_java_operation_inner<'local>(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: _,
..
} => {
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
.iter()
Expand Down Expand Up @@ -1222,6 +1223,7 @@ fn convert_to_rust_operation(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
table_metadata_updates: None,
}
}
"DataReplacement" => {
Expand Down
6 changes: 6 additions & 0 deletions protos/transaction.proto
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue(blocking): all updates to the protobuf need to first be voted on by the PMC, since they are a format change. https://lance.org/community/voting/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @wjones127, thanks for the voting pointer.

After re-reading #5960 I'm fully sold on the Action direction — our
append+cursor use case here is just one UserOperation with AddFragments + UpdateTableMetadata, strictly cleaner than the field this PR introduces. Any rough timeline, and is there a piece I could pick up? UpdateTableMetadata feels like a natural first action.

One thought while #5960 is being shaped: would a non-format-changing overlay work as an interim? Reserve a prefix in transaction_properties (e.g. __table_metadata__.*) and have build_manifest() promote those entries into manifest.table_metadata — atomicity still rides on the manifest CAS, proto stays unchanged.

Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ message Transaction {
// Filter for checking existence of keys in newly inserted rows, used for conflict detection.
// Only tracks keys from INSERT operations during merge insert, not updates.
optional KeyExistenceFilter inserted_rows = 8;
// Atomic updates to `manifest.table_metadata`, applied alongside the row
// changes in this Update commit. Intended for user-constructed Update
// transactions that bundle a small durable slot (e.g. a consumer cursor)
// with the data they produce, avoiding a separate UpdateConfig round-trip.
// Lance's internal write paths (`update`, `merge_insert`) leave this unset.
optional UpdateMap table_metadata_updates = 9;
}

// The mode of update operation
Expand Down
6 changes: 6 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4966,6 +4966,11 @@ class Update(BaseOperation):
fields_for_preserving_frag_bitmap: list[int]
The fields that used to judge whether to preserve the new frag's id into
the frag bitmap of the specified indices.
table_metadata_updates: Optional[UpdateMap]
Atomic updates to the manifest's ``table_metadata`` applied
alongside the row changes in this commit. Lance's internal Update
writers leave this ``None``; set it to bundle a small durable slot
(e.g. a consumer cursor) with the data you produce.
"""

removed_fragment_ids: List[int] = dataclasses.field(default_factory=list)
Expand All @@ -4978,6 +4983,7 @@ class Update(BaseOperation):
default_factory=list
)
update_mode: str = ""
table_metadata_updates: Optional["LanceOperation.UpdateMap"] = None

def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)
Expand Down
7 changes: 7 additions & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ impl FromPyObject<'_> for PyLance<Operation> {
.ok()
.map(|py_mode| py_mode.0);

let table_metadata_updates =
extract_update_map(&ob.getattr("table_metadata_updates")?)?;

let op = Operation::Update {
removed_fragment_ids,
updated_fragments,
Expand All @@ -294,6 +297,7 @@ impl FromPyObject<'_> for PyLance<Operation> {
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
table_metadata_updates,
};
Ok(Self(op))
}
Expand Down Expand Up @@ -445,6 +449,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
fields_modified,
fields_for_preserving_frag_bitmap,
update_mode,
table_metadata_updates,
..
} => {
let removed_fragment_ids = removed_fragment_ids.into_pyobject(py)?;
Expand All @@ -465,13 +470,15 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
let cls = namespace
.getattr("Update")
.expect("Failed to get Update class");
let table_metadata_updates = export_update_map(py, table_metadata_updates)?;
cls.call1((
removed_fragment_ids,
updated_fragments,
new_fragments,
fields_modified,
fields_for_preserving_frag_bitmap,
update_mode,
table_metadata_updates,
))
}
Operation::DataReplacement { replacements } => {
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2863,6 +2863,7 @@ mod tests {
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
table_metadata_updates: None,
};
let mut dataset1 = Dataset::commit(
test_uri,
Expand Down Expand Up @@ -2936,6 +2937,7 @@ mod tests {
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
table_metadata_updates: None,
};
let dataset2 = Dataset::commit(
test_uri,
Expand Down
42 changes: 42 additions & 0 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ pub struct UpdateMap {
pub replace: bool,
}

impl UpdateMap {
/// Any `replace: true` wipes the target, so it conflicts with everything;
/// otherwise two maps conflict when they touch the same key.
pub(crate) fn conflicts_with(&self, other: &Self) -> bool {
if self.replace || other.replace {
return true;
}
let self_keys: std::collections::HashSet<&str> =
self.update_entries.iter().map(|e| e.key.as_str()).collect();
other
.update_entries
.iter()
.any(|e| self_keys.contains(e.key.as_str()))
}
}

/// An operation on a dataset.
#[derive(Debug, Clone, DeepSizeOf)]
pub enum Operation {
Expand Down Expand Up @@ -222,6 +238,12 @@ pub enum Operation {
/// Optional filter for detecting conflicts on inserted row keys.
/// Only tracks keys from INSERT operations during merge insert, not updates.
inserted_rows_filter: Option<KeyExistenceFilter>,
/// Atomic updates to `manifest.table_metadata`, applied alongside the
/// row changes in this commit. Lance's internal write paths
/// (`merge_insert`, `update`) leave this `None`; user-constructed
/// Update commits may set it to bundle a small durable slot (e.g. a
/// consumer cursor) with the data they produce.
table_metadata_updates: Option<UpdateMap>,
},

/// Project to a new schema. This only changes the schema, not the data.
Expand Down Expand Up @@ -421,6 +443,7 @@ impl PartialEq for Operation {
fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap,
update_mode: a_update_mode,
inserted_rows_filter: a_inserted_rows_filter,
table_metadata_updates: a_table_metadata_updates,
},
Self::Update {
removed_fragment_ids: b_removed,
Expand All @@ -431,6 +454,7 @@ impl PartialEq for Operation {
fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap,
update_mode: b_update_mode,
inserted_rows_filter: b_inserted_rows_filter,
table_metadata_updates: b_table_metadata_updates,
},
) => {
compare_vec(a_removed, b_removed)
Expand All @@ -444,6 +468,7 @@ impl PartialEq for Operation {
)
&& a_update_mode == b_update_mode
&& a_inserted_rows_filter == b_inserted_rows_filter
&& a_table_metadata_updates == b_table_metadata_updates
}
(Self::Project { schema: a }, Self::Project { schema: b }) => a == b,
(
Expand Down Expand Up @@ -1219,6 +1244,9 @@ impl Operation {
}

pub(crate) fn modifies_same_metadata(&self, other: &Self) -> bool {
// `table_metadata_updates` collisions are handled centrally in
// `TransactionRebase::check_txn`, since both `Update` and
// `UpdateConfig` can carry them and must be compared across kinds.
match (self, other) {
(
Self::UpdateConfig {
Expand Down Expand Up @@ -2220,6 +2248,14 @@ impl Transaction {
}
}
}
Operation::Update {
table_metadata_updates: Some(table_metadata_updates),
..
} => {
let mut table_metadata = manifest.table_metadata.clone();
apply_update_map(&mut table_metadata, table_metadata_updates);
manifest.table_metadata = table_metadata;
}
_ => {}
}

Expand Down Expand Up @@ -2857,6 +2893,7 @@ impl TryFrom<pb::Transaction> for Transaction {
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows,
table_metadata_updates,
})) => Operation::Update {
removed_fragment_ids,
updated_fragments: updated_fragments
Expand All @@ -2881,6 +2918,7 @@ impl TryFrom<pb::Transaction> for Transaction {
inserted_rows_filter: inserted_rows
.map(|ik| KeyExistenceFilter::try_from(&ik))
.transpose()?,
table_metadata_updates: table_metadata_updates.as_ref().map(UpdateMap::from),
},
Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => {
Operation::Project {
Expand Down Expand Up @@ -3183,6 +3221,7 @@ impl From<&Transaction> for pb::Transaction {
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter,
table_metadata_updates,
} => pb::transaction::Operation::Update(pb::transaction::Update {
removed_fragment_ids: removed_fragment_ids.clone(),
updated_fragments: updated_fragments
Expand All @@ -3196,6 +3235,9 @@ impl From<&Transaction> for pb::Transaction {
.map(pb::MergedGeneration::from)
.collect(),
fields_for_preserving_frag_bitmap: fields_for_preserving_frag_bitmap.clone(),
table_metadata_updates: table_metadata_updates
.as_ref()
.map(pb::transaction::UpdateMap::from),
update_mode: update_mode
.as_ref()
.map(|mode| match mode {
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ mod tests {
fields_for_preserving_frag_bitmap: vec![],
update_mode: None,
inserted_rows_filter: None,
table_metadata_updates: None,
},
read_version: 1,
tag: None,
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ impl MergeInsertJob {
fields_for_preserving_frag_bitmap: vec![], // in-place update do not affect preserving frag bitmap
update_mode: Some(RewriteColumns),
inserted_rows_filter: None, // not implemented for v1
table_metadata_updates: None,
};
// We have rewritten the fragments, not just the deletion files, so
// we can't use affected rows here.
Expand Down Expand Up @@ -1723,6 +1724,7 @@ impl MergeInsertJob {
.collect(),
update_mode: Some(RewriteRows),
inserted_rows_filter: None, // not implemented for v1
table_metadata_updates: None,
};

let affected_rows = Some(RowAddrTreeMap::from(removed_row_addrs));
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/merge_insert/exec/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec {
.collect(),
update_mode: None,
inserted_rows_filter: None, // Delete-only operations don't insert rows
table_metadata_updates: None,
};

let transaction = Transaction::new(dataset.manifest.version, operation, None);
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/merge_insert/exec/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec {
.collect(),
update_mode: Some(RewriteRows),
inserted_rows_filter: inserted_rows_filter.clone(),
table_metadata_updates: None,
};

// Step 5: Create and store the transaction
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl UpdateJob {
fields_for_preserving_frag_bitmap,
update_mode: Some(RewriteRows),
inserted_rows_filter: None,
table_metadata_updates: None,
};

let transaction = Transaction::new(dataset.manifest.version, operation, None);
Expand Down
Loading
Loading