From 9220329bec7905221c569542612c5265c2766d50 Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Thu, 9 Apr 2026 13:09:52 +0300 Subject: [PATCH 1/9] fix: serialize version metadata through JNI and correct row-ID lookup in Operation::Update --- java/lance-jni/Cargo.lock | 1 + java/lance-jni/src/fragment.rs | 101 ++++-- .../main/java/org/lance/FragmentMetadata.java | 30 +- .../java/org/lance/fragment/RowIdMeta.java | 49 +++ .../java/org/lance/fragment/VersionMeta.java | 59 ++++ .../org/lance/fragment/RowIdMetaTest.java | 137 ++++++++ .../org/lance/fragment/VersionMetaTest.java | 80 +++++ rust/lance/src/dataset/transaction.rs | 329 +++++++++++++++--- 8 files changed, 716 insertions(+), 70 deletions(-) create mode 100644 java/src/main/java/org/lance/fragment/VersionMeta.java create mode 100644 java/src/test/java/org/lance/fragment/RowIdMetaTest.java create mode 100644 java/src/test/java/org/lance/fragment/VersionMetaTest.java diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 09143a6adf0..5bb30513354 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-data", + "arrow-ipc", "arrow-ord", "arrow-schema", "arrow-select", diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 3b2a7ba6b22..fe2941fabae 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -12,7 +12,9 @@ use jni::{ sys::{jint, jlong}, }; use lance::datatypes::Schema; -use lance::table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, RowIdMeta}; +use lance::table::format::{ + DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta, +}; use lance_io::utils::CachedFileSize; use std::iter::once; @@ -494,9 +496,11 @@ const DELETE_FILE_CONSTRUCTOR_SIG: &str = "(JJLjava/lang/Long;Lorg/lance/fragment/DeletionFileType;Ljava/lang/Integer;)V"; const DELETE_FILE_TYPE_CLASS: &str = "org/lance/fragment/DeletionFileType"; const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata"; -const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V"; +const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;Lorg/lance/fragment/VersionMeta;Lorg/lance/fragment/VersionMeta;)V"; const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta"; const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V"; +const VERSION_META_CLASS: &str = "org/lance/fragment/VersionMeta"; +const VERSION_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V"; const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult"; const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V"; @@ -611,6 +615,18 @@ impl IntoJava for &RowIdMeta { } } +impl IntoJava for &RowDatasetVersionMeta { + fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result> { + let json_str = serde_json::to_string(self)?; + let json = env.new_string(json_str)?.into(); + Ok(env.new_object( + VERSION_META_CLASS, + VERSION_META_CONSTRUCTOR_SIG, + &[JValueGen::Object(&json)], + )?) + } +} + impl IntoJava for &Fragment { fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { let files = self.files.clone(); @@ -624,6 +640,14 @@ impl IntoJava for &Fragment { Some(m) => m.into_java(env)?, None => JObject::null(), }; + let created_at = match &self.created_at_version_meta { + Some(m) => m.into_java(env)?, + None => JObject::null(), + }; + let last_updated_at = match &self.last_updated_at_version_meta { + Some(m) => m.into_java(env)?, + None => JObject::null(), + }; env.new_object( FRAGMENT_METADATA_CLASS, @@ -634,6 +658,8 @@ impl IntoJava for &Fragment { JValueGen::Object(physical_rows), JValueGen::Object(&deletion_file), JValueGen::Object(&row_id_meta), + JValueGen::Object(&created_at), + JValueGen::Object(&last_updated_at), ], ) .map_err(|e| { @@ -653,6 +679,38 @@ impl FromJObjectWithEnv for JObject<'_> { } } +impl FromJObjectWithEnv for JObject<'_> { + fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result { + let metadata = env + .call_method(self, "getMetadata", "()Ljava/lang/String;", &[])? + .l()?; + let s: String = env.get_string(&JString::from(metadata))?.into(); + let meta: RowDatasetVersionMeta = serde_json::from_str(&s)?; + Ok(meta) + } +} + +/// Extract an optional field from a Java object by calling a getter method. +/// Returns `None` if the getter returns null, otherwise deserializes the JObject. +fn extract_nullable_field( + env: &mut JNIEnv<'_>, + obj: &JObject<'_>, + method: &str, + class: &str, +) -> Result> +where + for<'a> JObject<'a>: FromJObjectWithEnv, +{ + let result = env + .call_method(obj, method, format!("()L{};", class), &[])? + .l()?; + if result.is_null() { + Ok(None) + } else { + Ok(Some(result.extract_object(env)?)) + } +} + impl FromJObjectWithEnv for JObject<'_> { fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result { let id = env.call_method(self, "getId", "()I", &[])?.i()? as u64; @@ -665,41 +723,24 @@ impl FromJObjectWithEnv for JObject<'_> { for f in file_objs { files.push(f.extract_object(env)?); } - let deletion_file = env - .call_method( - self, - "getDeletionFile", - format!("()L{};", DELETE_FILE_CLASS), - &[], - )? - .l()?; - let deletion_file = if deletion_file.is_null() { - None - } else { - Some(deletion_file.extract_object(env)?) - }; - let row_id_meta = env - .call_method( - self, - "getRowIdMeta", - format!("()L{};", ROW_ID_META_CLASS), - &[], - )? - .l()?; - let row_id_meta = if row_id_meta.is_null() { - None - } else { - Some(row_id_meta.extract_object(env)?) - }; + let deletion_file = + extract_nullable_field(env, self, "getDeletionFile", DELETE_FILE_CLASS)?; + let row_id_meta = + extract_nullable_field(env, self, "getRowIdMeta", ROW_ID_META_CLASS)?; + let created_at_version_meta = + extract_nullable_field(env, self, "getCreatedAtVersionMeta", VERSION_META_CLASS)?; + let last_updated_at_version_meta = + extract_nullable_field(env, self, "getLastUpdatedAtVersionMeta", VERSION_META_CLASS)?; + Ok(Fragment { id, files, deletion_file, physical_rows: Some(physical_rows), row_id_meta, - created_at_version_meta: None, - last_updated_at_version_meta: None, + created_at_version_meta, + last_updated_at_version_meta, }) } } diff --git a/java/src/main/java/org/lance/FragmentMetadata.java b/java/src/main/java/org/lance/FragmentMetadata.java index 8bc701b6351..186745cbcca 100644 --- a/java/src/main/java/org/lance/FragmentMetadata.java +++ b/java/src/main/java/org/lance/FragmentMetadata.java @@ -16,6 +16,7 @@ import org.lance.fragment.DataFile; import org.lance.fragment.DeletionFile; import org.lance.fragment.RowIdMeta; +import org.lance.fragment.VersionMeta; import com.google.common.base.MoreObjects; @@ -31,6 +32,8 @@ public class FragmentMetadata implements Serializable { private final long physicalRows; private final DeletionFile deletionFile; private final RowIdMeta rowIdMeta; + private final VersionMeta createdAtVersionMeta; + private final VersionMeta lastUpdatedAtVersionMeta; public FragmentMetadata( int id, @@ -38,11 +41,24 @@ public FragmentMetadata( Long physicalRows, DeletionFile deletionFile, RowIdMeta rowIdMeta) { + this(id, files, physicalRows, deletionFile, rowIdMeta, null, null); + } + + public FragmentMetadata( + int id, + List files, + Long physicalRows, + DeletionFile deletionFile, + RowIdMeta rowIdMeta, + VersionMeta createdAtVersionMeta, + VersionMeta lastUpdatedAtVersionMeta) { this.id = id; this.files = files; this.physicalRows = physicalRows; this.deletionFile = deletionFile; this.rowIdMeta = rowIdMeta; + this.createdAtVersionMeta = createdAtVersionMeta; + this.lastUpdatedAtVersionMeta = lastUpdatedAtVersionMeta; } public int getId() { @@ -80,6 +96,14 @@ public RowIdMeta getRowIdMeta() { return rowIdMeta; } + public VersionMeta getCreatedAtVersionMeta() { + return createdAtVersionMeta; + } + + public VersionMeta getLastUpdatedAtVersionMeta() { + return lastUpdatedAtVersionMeta; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -93,7 +117,9 @@ public boolean equals(Object o) { && physicalRows == that.physicalRows && Objects.equals(this.files, that.files) && Objects.equals(deletionFile, that.deletionFile) - && Objects.equals(rowIdMeta, that.rowIdMeta); + && Objects.equals(rowIdMeta, that.rowIdMeta) + && Objects.equals(createdAtVersionMeta, that.createdAtVersionMeta) + && Objects.equals(lastUpdatedAtVersionMeta, that.lastUpdatedAtVersionMeta); } @Override @@ -104,6 +130,8 @@ public String toString() { .add("files", files) .add("deletionFile", deletionFile) .add("rowIdMeta", rowIdMeta) + .add("createdAtVersionMeta", createdAtVersionMeta) + .add("lastUpdatedAtVersionMeta", lastUpdatedAtVersionMeta) .toString(); } } diff --git a/java/src/main/java/org/lance/fragment/RowIdMeta.java b/java/src/main/java/org/lance/fragment/RowIdMeta.java index f9c2fa8b4ac..5a3e90ced3c 100644 --- a/java/src/main/java/org/lance/fragment/RowIdMeta.java +++ b/java/src/main/java/org/lance/fragment/RowIdMeta.java @@ -15,6 +15,7 @@ import com.google.common.base.MoreObjects; +import java.io.ByteArrayOutputStream; import java.io.Serializable; import java.util.Objects; @@ -27,6 +28,49 @@ public RowIdMeta(String metadata) { this.metadata = metadata; } + /** + * Creates a RowIdMeta from an array of stable row IDs. Encodes them as an inline RowIdSequence + * protobuf wrapped in the JSON format expected by lance-core. + */ + public static RowIdMeta fromRowIds(long[] rowIds) { + byte[] values = new byte[rowIds.length * 8]; + for (int r = 0; r < rowIds.length; r++) { + long id = rowIds[r]; + int base = r * 8; + for (int i = 0; i < 8; i++) { + values[base + i] = (byte) ((id >>> (8 * i)) & 0xFF); + } + } + // RowIdSequence protobuf nesting: + // segment(1) > encoded(5) > u64array(3) > bytes(2) + byte[] proto = lenDelimited(1, lenDelimited(5, lenDelimited(3, lenDelimited(2, values)))); + StringBuilder sb = new StringBuilder(12 + proto.length * 4); + sb.append("{\"Inline\":["); + for (int i = 0; i < proto.length; i++) { + if (i > 0) sb.append(','); + sb.append(proto[i] & 0xFF); + } + sb.append("]}"); + return new RowIdMeta(sb.toString()); + } + + private static byte[] lenDelimited(int fieldNumber, byte[] data) { + int tag = (fieldNumber << 3) | 2; + ByteArrayOutputStream out = new ByteArrayOutputStream(1 + 5 + data.length); + writeVarint(out, tag); + writeVarint(out, data.length); + out.write(data, 0, data.length); + return out.toByteArray(); + } + + private static void writeVarint(ByteArrayOutputStream out, int value) { + while ((value & ~0x7F) != 0) { + out.write((value & 0x7F) | 0x80); + value >>>= 7; + } + out.write(value); + } + public String getMetadata() { return metadata; } @@ -43,6 +87,11 @@ public boolean equals(Object obj) { return Objects.equals(metadata, that.metadata); } + @Override + public int hashCode() { + return Objects.hash(metadata); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("metadata", metadata).toString(); diff --git a/java/src/main/java/org/lance/fragment/VersionMeta.java b/java/src/main/java/org/lance/fragment/VersionMeta.java new file mode 100644 index 00000000000..71297a282ef --- /dev/null +++ b/java/src/main/java/org/lance/fragment/VersionMeta.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Metadata for per-row dataset version sequences (created_at / last_updated_at). Wraps the + * JSON-serialized Rust RowDatasetVersionMeta enum. + */ +public class VersionMeta implements Serializable { + private static final long serialVersionUID = 1L; + + private final String metadata; + + public VersionMeta(String metadata) { + this.metadata = metadata; + } + + public String getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + VersionMeta that = (VersionMeta) obj; + return Objects.equals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(metadata); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("metadata", metadata).toString(); + } +} diff --git a/java/src/test/java/org/lance/fragment/RowIdMetaTest.java b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java new file mode 100644 index 00000000000..b6e9f47ffcc --- /dev/null +++ b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class RowIdMetaTest { + + @Test + void testFromRowIdsSingleRow() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {42}); + String json = meta.getMetadata(); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsMultipleRows() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {0, 1, 2, 100, Long.MAX_VALUE}); + assertNotNull(meta); + String json = meta.getMetadata(); + assertFalse(json.isEmpty()); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsEmpty() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {}); + String json = meta.getMetadata(); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsRoundTrip() { + long[] ids = {10, 20, 30}; + RowIdMeta first = RowIdMeta.fromRowIds(ids); + RowIdMeta second = RowIdMeta.fromRowIds(ids); + assertEquals(first, second); + } + + @Test + void testFromRowIdsDeterministic() { + long[] ids = {10, 20, 30}; + String a = RowIdMeta.fromRowIds(ids).getMetadata(); + String b = RowIdMeta.fromRowIds(ids).getMetadata(); + assertEquals(a, b); + } + + @Test + void testFromRowIdsProtoStructure() { + long[] rowIds = {1}; + String json = RowIdMeta.fromRowIds(rowIds).getMetadata(); + + int start = json.indexOf('[') + 1; + int end = json.lastIndexOf(']'); + String[] parts = json.substring(start, end).split(","); + byte[] proto = new byte[parts.length]; + for (int i = 0; i < parts.length; i++) { + proto[i] = (byte) Integer.parseInt(parts[i].trim()); + } + + // Outermost: field 1, wire type 2 (length-delimited) → tag byte = 0x0a + assertEquals((byte) 0x0a, proto[0]); + + // Walk 4 nested length-delimited fields to reach the payload + int pos = 0; + for (int level = 0; level < 4; level++) { + int tag = proto[pos++] & 0xFF; + assertEquals(2, tag & 0x07, "wire type must be 2 (length-delimited) at level " + level); + // decode varint length + int len = 0; + int shift = 0; + while (true) { + int b = proto[pos++] & 0xFF; + len |= (b & 0x7F) << shift; + if ((b & 0x80) == 0) break; + shift += 7; + } + if (level < 3) { + assertEquals( + proto.length - pos, len, "length at level " + level + " should span remaining bytes"); + } else { + // innermost: payload is exactly rowIds.length * 8 bytes + assertEquals(rowIds.length * 8, len); + } + } + + // Verify the last 8 bytes are little-endian encoding of 1 + byte[] expected = {1, 0, 0, 0, 0, 0, 0, 0}; + byte[] actual = new byte[8]; + System.arraycopy(proto, proto.length - 8, actual, 0, 8); + assertArrayEquals(expected, actual); + } + + @Test + void testEquals() { + RowIdMeta a = new RowIdMeta("test"); + RowIdMeta b = new RowIdMeta("test"); + RowIdMeta c = new RowIdMeta("other"); + + assertEquals(a, b); + assertNotEquals(a, c); + assertNotEquals(a, null); + assertNotEquals(a, "test"); + assertEquals(a, a); + } + + @Test + void testHashCodeConsistency() { + RowIdMeta a = new RowIdMeta("test"); + RowIdMeta b = new RowIdMeta("test"); + assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void testToString() { + RowIdMeta meta = new RowIdMeta("someMetadata"); + String str = meta.toString(); + assertTrue(str.contains("RowIdMeta")); + assertTrue(str.contains("someMetadata")); + } +} diff --git a/java/src/test/java/org/lance/fragment/VersionMetaTest.java b/java/src/test/java/org/lance/fragment/VersionMetaTest.java new file mode 100644 index 00000000000..a851e9fcbd6 --- /dev/null +++ b/java/src/test/java/org/lance/fragment/VersionMetaTest.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VersionMetaTest { + + @Test + void testConstructorAndGetter() { + VersionMeta meta = new VersionMeta("test"); + assertEquals("test", meta.getMetadata()); + } + + @Test + void testNullMetadata() { + VersionMeta meta = new VersionMeta(null); + assertNull(meta.getMetadata()); + } + + @Test + void testEquals() { + VersionMeta a = new VersionMeta("x"); + VersionMeta b = new VersionMeta("x"); + assertEquals(a, b); + assertEquals(b, a); + + VersionMeta c = new VersionMeta("y"); + assertNotEquals(a, c); + + assertFalse(a.equals(null)); + + assertFalse(a.equals("x")); + } + + @Test + void testHashCode() { + VersionMeta a = new VersionMeta("same"); + VersionMeta b = new VersionMeta("same"); + assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void testHashCodeWithNull() { + VersionMeta meta = new VersionMeta(null); + meta.hashCode(); + } + + @Test + void testToString() { + VersionMeta meta = new VersionMeta("hello"); + String s = meta.toString(); + assertTrue(s.contains("VersionMeta")); + assertTrue(s.contains("hello")); + } + + @Test + void testJsonMetadataRoundTrip() { + String json = "{\"Inline\":[10,20,30]}"; + VersionMeta meta = new VersionMeta(json); + assertEquals(json, meta.getMetadata()); + } +} diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index b5f2e05ff0d..3789ecb0903 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1719,17 +1719,23 @@ impl Transaction { if next_row_id.is_some() { let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); - // Build a map of original fragment ID -> original fragment for lookup - let original_frags_map: std::collections::HashMap = - existing_fragments.iter().map(|f| (f.id, f)).collect(); + // Build a reverse index: stable row ID → (fragment ref, position within fragment) + // by reading each existing fragment's RowIdMeta sequence. + let mut row_id_to_source: std::collections::HashMap = + std::collections::HashMap::new(); + for frag in existing_fragments.iter() { + if let Some(row_id_meta) = &frag.row_id_meta { + if let lance_table::format::RowIdMeta::Inline(data) = row_id_meta { + if let Ok(seq) = lance_table::rowids::read_row_ids(data) { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); + } + } + } + } + } for fragment in new_fragments.iter_mut() { - // For update operations with RewriteRows mode: - // - Rows are deleted from old fragments and rewritten to new fragments - // - last_updated_at should be the current version (when update happened) - // - created_at should be preserved from the original fragment - - // Read row IDs from this fragment to find original fragments let row_ids = if let Some(row_id_meta) = &fragment.row_id_meta { match row_id_meta { lance_table::format::RowIdMeta::Inline(data) => { @@ -1742,44 +1748,37 @@ impl Transaction { }; if let Some(row_ids) = row_ids { - // Extract created_at version for each row from original fragments let physical_rows = fragment.physical_rows.unwrap_or(0); let mut created_at_versions = Vec::with_capacity(physical_rows); for row_id in row_ids.iter() { - // Row ID format: upper 32 bits = fragment ID, lower 32 bits = row offset - let orig_frag_id = row_id >> 32; - let row_offset = (row_id & 0xFFFFFFFF) as usize; - - // Look up the original fragment - if let Some(orig_frag) = original_frags_map.get(&orig_frag_id) { - // Get created_at version from original fragment's metadata - let created_version = if let Some(created_meta) = - &orig_frag.created_at_version_meta + let created_version = + if let Some((orig_frag, row_offset)) = + row_id_to_source.get(&row_id) { - // Load and index into the version sequence - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(row_offset).copied().unwrap_or(1) - } - Err(_e) => { - 1 // Default to version 1 on error + if let Some(created_meta) = + &orig_frag.created_at_version_meta + { + match created_meta.load_sequence() { + Ok(seq) => { + let versions: Vec = + seq.versions().collect(); + versions + .get(*row_offset) + .copied() + .unwrap_or(1) + } + Err(_) => 1, } + } else { + 1 } } else { - // No metadata on original fragment, default to version 1 1 }; - created_at_versions.push(created_version); - } else { - // Original fragment not found, default to version 1 - created_at_versions.push(1); - } + created_at_versions.push(created_version); } - // Build version metadata from the collected versions - // Compress into runs: consecutive identical versions become one run let mut runs = Vec::new(); if !created_at_versions.is_empty() { let mut current_version = created_at_versions[0]; @@ -1788,7 +1787,6 @@ impl Transaction { for (i, &version) in created_at_versions.iter().enumerate().skip(1) { if version != current_version { - // End current run, start new one runs.push(lance_table::format::RowDatasetVersionRun { span: lance_table::rowids::segment::U64Segment::Range( run_start..i as u64, @@ -1799,7 +1797,6 @@ impl Transaction { run_start = i as u64; } } - // Add final run runs.push(lance_table::format::RowDatasetVersionRun { span: lance_table::rowids::segment::U64Segment::Range( run_start..created_at_versions.len() as u64, @@ -1822,7 +1819,6 @@ impl Transaction { })?, ); - // Set last_updated_at to the new version for all rows let last_updated_meta = lance_table::rowids::version::build_version_meta( fragment, @@ -1830,7 +1826,6 @@ impl Transaction { ); fragment.last_updated_at_version_meta = last_updated_meta; } else { - // Fallback: can't read row IDs, set both to new version let version_meta = lance_table::rowids::version::build_version_meta( fragment, new_version, @@ -4527,4 +4522,260 @@ mod tests { // cause strict validation of the new STABLE fragments. validate_operation(Some(&legacy_manifest), &operation).unwrap(); } + + #[test] + fn test_update_version_tracking_preserves_created_at() { + use lance_table::format::{ + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, + }; + use lance_table::rowids::segment::U64Segment; + use lance_table::rowids::write_row_ids; + + let existing_row_ids: Vec = vec![100, 101, 102]; + let existing_seq = RowIdSequence::from(existing_row_ids.as_slice()); + let existing_row_id_bytes = write_row_ids(&existing_seq); + + let created_at_seq = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..3), + version: 5, + }], + }; + let created_at_meta = RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(); + + let existing_fragment = Fragment { + id: 0, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(existing_row_id_bytes)), + physical_rows: Some(3), + created_at_version_meta: Some(created_at_meta.clone()), + last_updated_at_version_meta: None, + }; + + let new_row_ids: Vec = vec![100, 102]; + let new_seq = RowIdSequence::from(new_row_ids.as_slice()); + let new_row_id_bytes = write_row_ids(&new_seq); + + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(new_row_id_bytes)), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let existing_fragments = vec![existing_fragment]; + + let mut row_id_to_source: std::collections::HashMap = + std::collections::HashMap::new(); + for frag in existing_fragments.iter() { + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { + if let Ok(seq) = lance_table::rowids::read_row_ids(data) { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); + } + } + } + } + + let row_ids = if let Some(RowIdMeta::Inline(data)) = &new_fragment.row_id_meta { + lance_table::rowids::read_row_ids(data).ok() + } else { + None + }; + + if let Some(row_ids) = row_ids { + let physical_rows = new_fragment.physical_rows.unwrap_or(0); + let mut created_at_versions = Vec::with_capacity(physical_rows); + + for row_id in row_ids.iter() { + let created_version = + if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { + if let Some(created_meta) = &orig_frag.created_at_version_meta { + match created_meta.load_sequence() { + Ok(seq) => { + let versions: Vec = seq.versions().collect(); + versions.get(*row_offset).copied().unwrap_or(1) + } + Err(_) => 1, + } + } else { + 1 + } + } else { + 1 + }; + created_at_versions.push(created_version); + } + + assert_eq!(created_at_versions, vec![5, 5]); + } else { + panic!("Expected row IDs to be present"); + } + } + + #[test] + fn test_update_version_tracking_mixed_origins() { + use lance_table::format::{ + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, + }; + use lance_table::rowids::segment::U64Segment; + use lance_table::rowids::write_row_ids; + + let frag_a_ids: Vec = vec![10, 11]; + let frag_a_seq = RowIdSequence::from(frag_a_ids.as_slice()); + let frag_a_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..2), + version: 2, + }], + }; + + let frag_b_ids: Vec = vec![20, 21, 22]; + let frag_b_seq = RowIdSequence::from(frag_b_ids.as_slice()); + let frag_b_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..3), + version: 3, + }], + }; + + let existing_fragments = vec![ + Fragment { + id: 0, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_a_seq))), + physical_rows: Some(2), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&frag_a_created).unwrap(), + ), + last_updated_at_version_meta: None, + }, + Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_b_seq))), + physical_rows: Some(3), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&frag_b_created).unwrap(), + ), + last_updated_at_version_meta: None, + }, + ]; + + let mut row_id_to_source: std::collections::HashMap = + std::collections::HashMap::new(); + for frag in existing_fragments.iter() { + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { + if let Ok(seq) = lance_table::rowids::read_row_ids(data) { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); + } + } + } + } + + // New fragment has rows from both original fragments: row 11 from frag_a, row 20 from frag_b + let new_ids: Vec = vec![11, 20]; + let new_seq = RowIdSequence::from(new_ids.as_slice()); + + let row_ids = lance_table::rowids::read_row_ids(&write_row_ids(&new_seq)).unwrap(); + let mut created_at_versions = Vec::new(); + for row_id in row_ids.iter() { + let created_version = + if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { + if let Some(created_meta) = &orig_frag.created_at_version_meta { + match created_meta.load_sequence() { + Ok(seq) => { + let versions: Vec = seq.versions().collect(); + versions.get(*row_offset).copied().unwrap_or(1) + } + Err(_) => 1, + } + } else { + 1 + } + } else { + 1 + }; + created_at_versions.push(created_version); + } + + // Row 11 came from frag_a (offset 1, version 2), row 20 came from frag_b (offset 0, version 3) + assert_eq!(created_at_versions, vec![2, 3]); + } + + #[test] + fn test_update_version_tracking_unknown_row_id_defaults_to_1() { + use lance_table::format::{RowIdMeta, RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence}; + use lance_table::rowids::segment::U64Segment; + use lance_table::rowids::write_row_ids; + + let existing_ids: Vec = vec![10, 11]; + let existing_seq = RowIdSequence::from(existing_ids.as_slice()); + let existing_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..2), + version: 5, + }], + }; + + let existing_fragments = vec![Fragment { + id: 0, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&existing_created).unwrap(), + ), + last_updated_at_version_meta: None, + }]; + + let mut row_id_to_source: std::collections::HashMap = + std::collections::HashMap::new(); + for frag in existing_fragments.iter() { + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { + if let Ok(seq) = lance_table::rowids::read_row_ids(data) { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); + } + } + } + } + + // New fragment has row 10 (known) and row 999 (unknown — freshly inserted) + let new_ids: Vec = vec![10, 999]; + let new_seq = RowIdSequence::from(new_ids.as_slice()); + let row_ids = lance_table::rowids::read_row_ids(&write_row_ids(&new_seq)).unwrap(); + + let mut created_at_versions = Vec::new(); + for row_id in row_ids.iter() { + let created_version = + if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { + if let Some(created_meta) = &orig_frag.created_at_version_meta { + match created_meta.load_sequence() { + Ok(seq) => { + let versions: Vec = seq.versions().collect(); + versions.get(*row_offset).copied().unwrap_or(1) + } + Err(_) => 1, + } + } else { + 1 + } + } else { + 1 + }; + created_at_versions.push(created_version); + } + + // Row 10: offset 0 in frag 0 → version 5. Row 999: unknown → default 1 + assert_eq!(created_at_versions, vec![5, 1]); + } } From 7929b1edd69a2912544f07f649be180c982dc92e Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Thu, 9 Apr 2026 13:31:35 +0300 Subject: [PATCH 2/9] chore: format --- java/lance-jni/src/fragment.rs | 3 +-- rust/lance/src/dataset/transaction.rs | 37 ++++++++++++--------------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index fe2941fabae..38bf2315726 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -726,8 +726,7 @@ impl FromJObjectWithEnv for JObject<'_> { let deletion_file = extract_nullable_field(env, self, "getDeletionFile", DELETE_FILE_CLASS)?; - let row_id_meta = - extract_nullable_field(env, self, "getRowIdMeta", ROW_ID_META_CLASS)?; + let row_id_meta = extract_nullable_field(env, self, "getRowIdMeta", ROW_ID_META_CLASS)?; let created_at_version_meta = extract_nullable_field(env, self, "getCreatedAtVersionMeta", VERSION_META_CLASS)?; let last_updated_at_version_meta = diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 3789ecb0903..cc4c5156b2e 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1752,30 +1752,23 @@ impl Transaction { let mut created_at_versions = Vec::with_capacity(physical_rows); for row_id in row_ids.iter() { - let created_version = - if let Some((orig_frag, row_offset)) = - row_id_to_source.get(&row_id) - { - if let Some(created_meta) = - &orig_frag.created_at_version_meta - { - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = - seq.versions().collect(); - versions - .get(*row_offset) - .copied() - .unwrap_or(1) - } - Err(_) => 1, + let created_version = if let Some((orig_frag, row_offset)) = + row_id_to_source.get(&row_id) + { + if let Some(created_meta) = &orig_frag.created_at_version_meta { + match created_meta.load_sequence() { + Ok(seq) => { + let versions: Vec = seq.versions().collect(); + versions.get(*row_offset).copied().unwrap_or(1) } - } else { - 1 + Err(_) => 1, } } else { 1 - }; + } + } else { + 1 + }; created_at_versions.push(created_version); } @@ -4712,7 +4705,9 @@ mod tests { #[test] fn test_update_version_tracking_unknown_row_id_defaults_to_1() { - use lance_table::format::{RowIdMeta, RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence}; + use lance_table::format::{ + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, + }; use lance_table::rowids::segment::U64Segment; use lance_table::rowids::write_row_ids; From 49166c4f5e4638678699bebefc50ff854ee83bb7 Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Thu, 9 Apr 2026 13:41:05 +0300 Subject: [PATCH 3/9] chore: fix clippy errors --- rust/lance/src/dataset/transaction.rs | 51 +++++++++++++-------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index cc4c5156b2e..96b3090bbf3 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1724,13 +1724,12 @@ impl Transaction { let mut row_id_to_source: std::collections::HashMap = std::collections::HashMap::new(); for frag in existing_fragments.iter() { - if let Some(row_id_meta) = &frag.row_id_meta { - if let lance_table::format::RowIdMeta::Inline(data) = row_id_meta { - if let Ok(seq) = lance_table::rowids::read_row_ids(data) { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } - } + if let Some(lance_table::format::RowIdMeta::Inline(data)) = + &frag.row_id_meta + && let Ok(seq) = lance_table::rowids::read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); } } } @@ -4542,7 +4541,7 @@ mod tests { deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(existing_row_id_bytes)), physical_rows: Some(3), - created_at_version_meta: Some(created_at_meta.clone()), + created_at_version_meta: Some(created_at_meta), last_updated_at_version_meta: None, }; @@ -4560,16 +4559,16 @@ mod tests { last_updated_at_version_meta: None, }; - let existing_fragments = vec![existing_fragment]; + let existing_fragments = [existing_fragment]; let mut row_id_to_source: std::collections::HashMap = std::collections::HashMap::new(); for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { - if let Ok(seq) = lance_table::rowids::read_row_ids(data) { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta + && let Ok(seq) = lance_table::rowids::read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); } } } @@ -4636,7 +4635,7 @@ mod tests { }], }; - let existing_fragments = vec![ + let existing_fragments = [ Fragment { id: 0, files: vec![], @@ -4664,11 +4663,11 @@ mod tests { let mut row_id_to_source: std::collections::HashMap = std::collections::HashMap::new(); for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { - if let Ok(seq) = lance_table::rowids::read_row_ids(data) { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta + && let Ok(seq) = lance_table::rowids::read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); } } } @@ -4720,7 +4719,7 @@ mod tests { }], }; - let existing_fragments = vec![Fragment { + let existing_fragments = [Fragment { id: 0, files: vec![], deletion_file: None, @@ -4735,11 +4734,11 @@ mod tests { let mut row_id_to_source: std::collections::HashMap = std::collections::HashMap::new(); for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta { - if let Ok(seq) = lance_table::rowids::read_row_ids(data) { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta + && let Ok(seq) = lance_table::rowids::read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.insert(rid, (frag, offset)); } } } From 831c48045259ab4b3a980eb76cdda849583ab046 Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Thu, 9 Apr 2026 14:37:30 +0300 Subject: [PATCH 4/9] chore: updated tests --- rust/lance/src/dataset/transaction.rs | 326 +++++++++++++------------- 1 file changed, 157 insertions(+), 169 deletions(-) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 96b3090bbf3..4173f04a2a0 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -3476,6 +3476,11 @@ mod tests { use chrono::Utc; use lance_core::datatypes::Schema as LanceSchema; use lance_io::utils::CachedFileSize; + use lance_table::format::{ + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, + }; + use lance_table::rowids::segment::U64Segment; + use lance_table::rowids::write_row_ids; use std::sync::Arc; use uuid::Uuid; @@ -4515,119 +4520,103 @@ mod tests { validate_operation(Some(&legacy_manifest), &operation).unwrap(); } - #[test] - fn test_update_version_tracking_preserves_created_at() { - use lance_table::format::{ - RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, - }; - use lance_table::rowids::segment::U64Segment; - use lance_table::rowids::write_row_ids; + fn make_stable_row_id_manifest(fragments: Vec) -> Manifest { + let schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let mut manifest = Manifest::new( + LanceSchema::try_from(&schema).unwrap(), + Arc::new(fragments), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.reader_feature_flags = FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 1000; + manifest.version = 4; + manifest + } - let existing_row_ids: Vec = vec![100, 101, 102]; - let existing_seq = RowIdSequence::from(existing_row_ids.as_slice()); - let existing_row_id_bytes = write_row_ids(&existing_seq); + fn update_txn(new_fragments: Vec) -> Transaction { + Transaction::new( + 4, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments, + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + }, + None, + ) + } + + fn created_at_versions(manifest: &Manifest, frag_id: u64) -> Vec { + let frag = manifest.fragments.iter().find(|f| f.id == frag_id).unwrap(); + let seq = frag + .created_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + seq.versions().collect() + } + #[test] + fn test_update_version_tracking_preserves_created_at() { + let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice()); let created_at_seq = RowDatasetVersionSequence { runs: vec![RowDatasetVersionRun { span: U64Segment::Range(0..3), version: 5, }], }; - let created_at_meta = RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(); - let existing_fragment = Fragment { - id: 0, + id: 1, files: vec![], deletion_file: None, - row_id_meta: Some(RowIdMeta::Inline(existing_row_id_bytes)), + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(3), - created_at_version_meta: Some(created_at_meta), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(), + ), last_updated_at_version_meta: None, }; - let new_row_ids: Vec = vec![100, 102]; - let new_seq = RowIdSequence::from(new_row_ids.as_slice()); - let new_row_id_bytes = write_row_ids(&new_seq); - + let new_seq = RowIdSequence::from([100u64, 102].as_slice()); let new_fragment = Fragment { id: 10, files: vec![], deletion_file: None, - row_id_meta: Some(RowIdMeta::Inline(new_row_id_bytes)), + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, }; - let existing_fragments = [existing_fragment]; - - let mut row_id_to_source: std::collections::HashMap = - std::collections::HashMap::new(); - for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta - && let Ok(seq) = lance_table::rowids::read_row_ids(data) - { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } - } - } - - let row_ids = if let Some(RowIdMeta::Inline(data)) = &new_fragment.row_id_meta { - lance_table::rowids::read_row_ids(data).ok() - } else { - None - }; - - if let Some(row_ids) = row_ids { - let physical_rows = new_fragment.physical_rows.unwrap_or(0); - let mut created_at_versions = Vec::with_capacity(physical_rows); - - for row_id in row_ids.iter() { - let created_version = - if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { - if let Some(created_meta) = &orig_frag.created_at_version_meta { - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(*row_offset).copied().unwrap_or(1) - } - Err(_) => 1, - } - } else { - 1 - } - } else { - 1 - }; - created_at_versions.push(created_version); - } + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); - assert_eq!(created_at_versions, vec![5, 5]); - } else { - panic!("Expected row IDs to be present"); - } + assert_eq!(created_at_versions(&result, 10), vec![5, 5]); } #[test] fn test_update_version_tracking_mixed_origins() { - use lance_table::format::{ - RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, - }; - use lance_table::rowids::segment::U64Segment; - use lance_table::rowids::write_row_ids; - - let frag_a_ids: Vec = vec![10, 11]; - let frag_a_seq = RowIdSequence::from(frag_a_ids.as_slice()); + let frag_a_seq = RowIdSequence::from([10u64, 11].as_slice()); let frag_a_created = RowDatasetVersionSequence { runs: vec![RowDatasetVersionRun { span: U64Segment::Range(0..2), version: 2, }], }; - - let frag_b_ids: Vec = vec![20, 21, 22]; - let frag_b_seq = RowIdSequence::from(frag_b_ids.as_slice()); + let frag_b_seq = RowIdSequence::from([20u64, 21, 22].as_slice()); let frag_b_created = RowDatasetVersionSequence { runs: vec![RowDatasetVersionRun { span: U64Segment::Range(0..3), @@ -4635,9 +4624,9 @@ mod tests { }], }; - let existing_fragments = [ + let manifest = make_stable_row_id_manifest(vec![ Fragment { - id: 0, + id: 1, files: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_a_seq))), @@ -4648,7 +4637,7 @@ mod tests { last_updated_at_version_meta: None, }, Fragment { - id: 1, + id: 2, files: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_b_seq))), @@ -4658,69 +4647,44 @@ mod tests { ), last_updated_at_version_meta: None, }, - ]; - - let mut row_id_to_source: std::collections::HashMap = - std::collections::HashMap::new(); - for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta - && let Ok(seq) = lance_table::rowids::read_row_ids(data) - { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } - } - } + ]); // New fragment has rows from both original fragments: row 11 from frag_a, row 20 from frag_b - let new_ids: Vec = vec![11, 20]; - let new_seq = RowIdSequence::from(new_ids.as_slice()); - - let row_ids = lance_table::rowids::read_row_ids(&write_row_ids(&new_seq)).unwrap(); - let mut created_at_versions = Vec::new(); - for row_id in row_ids.iter() { - let created_version = - if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { - if let Some(created_meta) = &orig_frag.created_at_version_meta { - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(*row_offset).copied().unwrap_or(1) - } - Err(_) => 1, - } - } else { - 1 - } - } else { - 1 - }; - created_at_versions.push(created_version); - } + let new_seq = RowIdSequence::from([11u64, 20].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); // Row 11 came from frag_a (offset 1, version 2), row 20 came from frag_b (offset 0, version 3) - assert_eq!(created_at_versions, vec![2, 3]); + assert_eq!(created_at_versions(&result, 10), vec![2, 3]); } #[test] fn test_update_version_tracking_unknown_row_id_defaults_to_1() { - use lance_table::format::{ - RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, - }; - use lance_table::rowids::segment::U64Segment; - use lance_table::rowids::write_row_ids; - - let existing_ids: Vec = vec![10, 11]; - let existing_seq = RowIdSequence::from(existing_ids.as_slice()); + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); let existing_created = RowDatasetVersionSequence { runs: vec![RowDatasetVersionRun { span: U64Segment::Range(0..2), version: 5, }], }; - - let existing_fragments = [Fragment { - id: 0, + let existing_fragment = Fragment { + id: 1, files: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), @@ -4729,47 +4693,71 @@ mod tests { RowDatasetVersionMeta::from_sequence(&existing_created).unwrap(), ), last_updated_at_version_meta: None, - }]; - - let mut row_id_to_source: std::collections::HashMap = - std::collections::HashMap::new(); - for frag in existing_fragments.iter() { - if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta - && let Ok(seq) = lance_table::rowids::read_row_ids(data) - { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } - } - } + }; // New fragment has row 10 (known) and row 999 (unknown — freshly inserted) - let new_ids: Vec = vec![10, 999]; - let new_seq = RowIdSequence::from(new_ids.as_slice()); - let row_ids = lance_table::rowids::read_row_ids(&write_row_ids(&new_seq)).unwrap(); - - let mut created_at_versions = Vec::new(); - for row_id in row_ids.iter() { - let created_version = - if let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) { - if let Some(created_meta) = &orig_frag.created_at_version_meta { - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(*row_offset).copied().unwrap_or(1) - } - Err(_) => 1, - } - } else { - 1 - } - } else { - 1 - }; - created_at_versions.push(created_version); - } + let new_seq = RowIdSequence::from([10u64, 999].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Row 10: offset 0 in frag 1 → version 5. Row 999: unknown → default 1 + assert_eq!(created_at_versions(&result, 10), vec![5, 1]); + } + + #[test] + fn test_update_version_tracking_source_fragment_no_created_at_defaults_to_1() { + // Source fragment has row_id_meta but no created_at_version_meta. + // The row IS found in the lookup, but the version defaults to 1. + let existing_seq = RowIdSequence::from([50u64, 51].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let new_seq = RowIdSequence::from([50u64].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(1), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); - // Row 10: offset 0 in frag 0 → version 5. Row 999: unknown → default 1 - assert_eq!(created_at_versions, vec![5, 1]); + // Row 50 is found in source but source has no created_at_version_meta → default 1 + assert_eq!(created_at_versions(&result, 10), vec![1]); } } From 09350b8f1477ec61ab1d1d125e00ab5deaf248e4 Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Tue, 14 Apr 2026 14:47:51 +0300 Subject: [PATCH 5/9] chore: partially refactor the code --- java/lance-jni/src/fragment.rs | 31 +- .../main/java/org/lance/FragmentMetadata.java | 12 + .../java/org/lance/fragment/RowIdMeta.java | 52 +-- .../java/org/lance/fragment/VersionMeta.java | 3 + .../org/lance/fragment/RowIdMetaTest.java | 46 --- .../org/lance/fragment/VersionMetaTest.java | 2 +- rust/lance/src/dataset/transaction.rs | 366 ++++++++++++------ 7 files changed, 309 insertions(+), 203 deletions(-) diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 51caa2202c9..eb7889d4616 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -8,14 +8,15 @@ use arrow_schema::DataType; use jni::objects::{JIntArray, JValue, JValueGen}; use jni::{ JNIEnv, - objects::{JObject, JString}, - sys::{jint, jlong}, + objects::{JClass, JLongArray, JObject, JString}, + sys::{jint, jlong, jstring}, }; use lance::datatypes::Schema; use lance::table::format::{ DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta, }; use lance_io::utils::CachedFileSize; +use lance_table::rowids::{RowIdSequence, write_row_ids}; use std::iter::once; use lance::dataset::fragment::FileFragment; @@ -488,6 +489,32 @@ fn inner_update_column<'local>( result.into_java(env) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_fragment_RowIdMeta_nativeEncodeRowIds( + mut env: JNIEnv, + _cls: JClass, + row_ids: JLongArray, +) -> jstring { + ok_or_throw_with_return!( + env, + inner_encode_row_ids(&mut env, &row_ids) + .and_then(|json| env.new_string(json).map_err(Error::from)), + std::ptr::null_mut() + ) + .into_raw() +} + +fn inner_encode_row_ids(env: &mut JNIEnv, row_ids: &JLongArray) -> Result { + let len = env.get_array_length(row_ids)?; + let mut buf: Vec = vec![0; len as usize]; + env.get_long_array_region(row_ids, 0, buf.as_mut_slice())?; + let ids: Vec = buf.into_iter().map(|x| x as u64).collect(); + let seq = RowIdSequence::from(ids.as_slice()); + let meta = RowIdMeta::Inline(write_row_ids(&seq)); + let json = serde_json::to_string(&meta)?; + Ok(json) +} + const DATA_FILE_CLASS: &str = "org/lance/fragment/DataFile"; const DATA_FILE_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;[I[IIILjava/lang/Long;Ljava/lang/Integer;)V"; diff --git a/java/src/main/java/org/lance/FragmentMetadata.java b/java/src/main/java/org/lance/FragmentMetadata.java index 186745cbcca..e7fa5cf3c30 100644 --- a/java/src/main/java/org/lance/FragmentMetadata.java +++ b/java/src/main/java/org/lance/FragmentMetadata.java @@ -122,6 +122,18 @@ public boolean equals(Object o) { && Objects.equals(lastUpdatedAtVersionMeta, that.lastUpdatedAtVersionMeta); } + @Override + public int hashCode() { + return Objects.hash( + id, + physicalRows, + files, + deletionFile, + rowIdMeta, + createdAtVersionMeta, + lastUpdatedAtVersionMeta); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/java/src/main/java/org/lance/fragment/RowIdMeta.java b/java/src/main/java/org/lance/fragment/RowIdMeta.java index 5a3e90ced3c..49e41a3192b 100644 --- a/java/src/main/java/org/lance/fragment/RowIdMeta.java +++ b/java/src/main/java/org/lance/fragment/RowIdMeta.java @@ -13,15 +13,20 @@ */ package org.lance.fragment; +import org.lance.JniLoader; + import com.google.common.base.MoreObjects; -import java.io.ByteArrayOutputStream; import java.io.Serializable; import java.util.Objects; public class RowIdMeta implements Serializable { private static final long serialVersionUID = -6532828695072614148L; + static { + JniLoader.ensureLoaded(); + } + private final String metadata; public RowIdMeta(String metadata) { @@ -29,47 +34,18 @@ public RowIdMeta(String metadata) { } /** - * Creates a RowIdMeta from an array of stable row IDs. Encodes them as an inline RowIdSequence - * protobuf wrapped in the JSON format expected by lance-core. + * Creates a RowIdMeta from an array of stable row IDs by delegating to the Rust {@code + * write_row_ids} encoder via JNI. The returned metadata is a JSON string wrapping the + * protobuf-encoded RowIdSequence, matching the format expected by lance-core. + * + * @param rowIds stable row IDs to encode + * @return RowIdMeta containing the serialized inline representation */ public static RowIdMeta fromRowIds(long[] rowIds) { - byte[] values = new byte[rowIds.length * 8]; - for (int r = 0; r < rowIds.length; r++) { - long id = rowIds[r]; - int base = r * 8; - for (int i = 0; i < 8; i++) { - values[base + i] = (byte) ((id >>> (8 * i)) & 0xFF); - } - } - // RowIdSequence protobuf nesting: - // segment(1) > encoded(5) > u64array(3) > bytes(2) - byte[] proto = lenDelimited(1, lenDelimited(5, lenDelimited(3, lenDelimited(2, values)))); - StringBuilder sb = new StringBuilder(12 + proto.length * 4); - sb.append("{\"Inline\":["); - for (int i = 0; i < proto.length; i++) { - if (i > 0) sb.append(','); - sb.append(proto[i] & 0xFF); - } - sb.append("]}"); - return new RowIdMeta(sb.toString()); + return new RowIdMeta(nativeEncodeRowIds(rowIds)); } - private static byte[] lenDelimited(int fieldNumber, byte[] data) { - int tag = (fieldNumber << 3) | 2; - ByteArrayOutputStream out = new ByteArrayOutputStream(1 + 5 + data.length); - writeVarint(out, tag); - writeVarint(out, data.length); - out.write(data, 0, data.length); - return out.toByteArray(); - } - - private static void writeVarint(ByteArrayOutputStream out, int value) { - while ((value & ~0x7F) != 0) { - out.write((value & 0x7F) | 0x80); - value >>>= 7; - } - out.write(value); - } + private static native String nativeEncodeRowIds(long[] rowIds); public String getMetadata() { return metadata; diff --git a/java/src/main/java/org/lance/fragment/VersionMeta.java b/java/src/main/java/org/lance/fragment/VersionMeta.java index 71297a282ef..53f5a5468e3 100644 --- a/java/src/main/java/org/lance/fragment/VersionMeta.java +++ b/java/src/main/java/org/lance/fragment/VersionMeta.java @@ -21,6 +21,9 @@ /** * Metadata for per-row dataset version sequences (created_at / last_updated_at). Wraps the * JSON-serialized Rust RowDatasetVersionMeta enum. + * + *

Structurally identical to {@link RowIdMeta} — kept separate because the two map to distinct + * Rust types with different serialization formats and evolution paths. */ public class VersionMeta implements Serializable { private static final long serialVersionUID = 1L; diff --git a/java/src/test/java/org/lance/fragment/RowIdMetaTest.java b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java index b6e9f47ffcc..ddedd706341 100644 --- a/java/src/test/java/org/lance/fragment/RowIdMetaTest.java +++ b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java @@ -61,52 +61,6 @@ void testFromRowIdsDeterministic() { assertEquals(a, b); } - @Test - void testFromRowIdsProtoStructure() { - long[] rowIds = {1}; - String json = RowIdMeta.fromRowIds(rowIds).getMetadata(); - - int start = json.indexOf('[') + 1; - int end = json.lastIndexOf(']'); - String[] parts = json.substring(start, end).split(","); - byte[] proto = new byte[parts.length]; - for (int i = 0; i < parts.length; i++) { - proto[i] = (byte) Integer.parseInt(parts[i].trim()); - } - - // Outermost: field 1, wire type 2 (length-delimited) → tag byte = 0x0a - assertEquals((byte) 0x0a, proto[0]); - - // Walk 4 nested length-delimited fields to reach the payload - int pos = 0; - for (int level = 0; level < 4; level++) { - int tag = proto[pos++] & 0xFF; - assertEquals(2, tag & 0x07, "wire type must be 2 (length-delimited) at level " + level); - // decode varint length - int len = 0; - int shift = 0; - while (true) { - int b = proto[pos++] & 0xFF; - len |= (b & 0x7F) << shift; - if ((b & 0x80) == 0) break; - shift += 7; - } - if (level < 3) { - assertEquals( - proto.length - pos, len, "length at level " + level + " should span remaining bytes"); - } else { - // innermost: payload is exactly rowIds.length * 8 bytes - assertEquals(rowIds.length * 8, len); - } - } - - // Verify the last 8 bytes are little-endian encoding of 1 - byte[] expected = {1, 0, 0, 0, 0, 0, 0, 0}; - byte[] actual = new byte[8]; - System.arraycopy(proto, proto.length - 8, actual, 0, 8); - assertArrayEquals(expected, actual); - } - @Test void testEquals() { RowIdMeta a = new RowIdMeta("test"); diff --git a/java/src/test/java/org/lance/fragment/VersionMetaTest.java b/java/src/test/java/org/lance/fragment/VersionMetaTest.java index a851e9fcbd6..038d24980d2 100644 --- a/java/src/test/java/org/lance/fragment/VersionMetaTest.java +++ b/java/src/test/java/org/lance/fragment/VersionMetaTest.java @@ -72,7 +72,7 @@ void testToString() { } @Test - void testJsonMetadataRoundTrip() { + void testJsonMetadataPreservation() { String json = "{\"Inline\":[10,20,30]}"; VersionMeta meta = new VersionMeta(json); assertEquals(json, meta.getMetadata()); diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 3bcf231837c..bfda6062e9d 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -28,13 +28,13 @@ use lance_table::rowids::read_row_ids; use lance_table::{ format::{ BasePath, DataFile, DataStorageFormat, Fragment, IndexFile, IndexMetadata, Manifest, - RowIdMeta, pb, + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, pb, }, io::{ commit::CommitHandler, manifest::{read_manifest, read_manifest_indexes}, }, - rowids::{RowIdSequence, write_row_ids}, + rowids::{RowIdSequence, segment::U64Segment, version::build_version_meta, write_row_ids}, }; use object_store::path::Path; use roaring::RoaringBitmap; @@ -45,6 +45,127 @@ use std::{ }; use uuid::Uuid; +/// Fallback version for rows whose original creation version cannot be determined. +/// Version 1 is the initial dataset version in the Lance format. +const UNKNOWN_CREATED_AT_VERSION: u64 = 1; + +/// Look up the `created_at` version for a single row ID in `row_id_to_source`. +/// +/// Walks the original fragment's `created_at_version_meta` to find the version +/// at the given offset. Returns [`UNKNOWN_CREATED_AT_VERSION`] for any failure +/// (missing metadata, decode error, out-of-range offset, unmapped row ID). +fn resolve_created_at_version( + row_id: u64, + row_id_to_source: &HashMap, +) -> u64 { + let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) else { + return UNKNOWN_CREATED_AT_VERSION; + }; + let Some(created_meta) = &orig_frag.created_at_version_meta else { + return UNKNOWN_CREATED_AT_VERSION; + }; + match created_meta.load_sequence() { + Ok(seq) => seq + .versions() + .nth(*row_offset) + .unwrap_or(UNKNOWN_CREATED_AT_VERSION), + Err(_) => UNKNOWN_CREATED_AT_VERSION, + } +} + +/// For each new fragment produced by an update, set `created_at_version_meta` +/// (preserved from the original rows) and `last_updated_at_version_meta`. +fn resolve_update_version_metadata( + existing_fragments: &[Fragment], + new_fragments: &mut [Fragment], + new_version: u64, +) -> Result<()> { + // ~24 bytes per entry (u64 key + &Fragment pointer + usize offset + hash metadata). + // For a 1M-row dataset this is ~24 MB — acceptable for a transient transaction structure. + let mut row_id_to_source: HashMap = HashMap::new(); + for frag in existing_fragments.iter() { + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta + && let Ok(seq) = read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + let prev = row_id_to_source.insert(rid, (frag, offset)); + debug_assert!( + prev.is_none(), + "duplicate row ID {rid} in fragment {}", + frag.id, + ); + } + } + } + + for fragment in new_fragments.iter_mut() { + let row_ids = match &fragment.row_id_meta { + Some(RowIdMeta::Inline(data)) => read_row_ids(data).ok(), + Some(RowIdMeta::External(_)) => { + log::warn!( + "Fragment {} has external row ID metadata; \ + version tracking will use defaults", + fragment.id, + ); + None + } + None => None, + }; + + if let Some(row_ids) = row_ids { + let physical_rows = fragment.physical_rows.unwrap_or(0); + let created_at_versions: Vec = row_ids + .iter() + .map(|rid| resolve_created_at_version(rid, &row_id_to_source)) + .collect(); + debug_assert_eq!(created_at_versions.len(), physical_rows); + + let runs = encode_version_runs(&created_at_versions); + let created_at_seq = RowDatasetVersionSequence { runs }; + fragment.created_at_version_meta = Some( + RowDatasetVersionMeta::from_sequence(&created_at_seq).map_err(|e| { + Error::internal(format!( + "Failed to create created_at version metadata: {}", + e + )) + })?, + ); + + fragment.last_updated_at_version_meta = build_version_meta(fragment, new_version); + } else { + let version_meta = build_version_meta(fragment, new_version); + fragment.last_updated_at_version_meta = version_meta.clone(); + fragment.created_at_version_meta = version_meta; + } + } + Ok(()) +} + +/// Run-length encode a sequence of per-row versions into [`RowDatasetVersionRun`]s. +fn encode_version_runs(versions: &[u64]) -> Vec { + if versions.is_empty() { + return Vec::new(); + } + let mut runs = Vec::new(); + let mut current_version = versions[0]; + let mut run_start = 0u64; + for (i, &version) in versions.iter().enumerate().skip(1) { + if version != current_version { + runs.push(RowDatasetVersionRun { + span: U64Segment::Range(run_start..i as u64), + version: current_version, + }); + current_version = version; + run_start = i as u64; + } + } + runs.push(RowDatasetVersionRun { + span: U64Segment::Range(run_start..versions.len() as u64), + version: current_version, + }); + runs +} + /// A change to a dataset that can be retried /// /// This contains enough information to be able to build the next manifest, @@ -1634,8 +1755,7 @@ impl Transaction { // Add version metadata for all new fragments let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); for fragment in new_fragments.iter_mut() { - let version_meta = - lance_table::rowids::version::build_version_meta(fragment, new_version); + let version_meta = build_version_meta(fragment, new_version); fragment.last_updated_at_version_meta = version_meta.clone(); fragment.created_at_version_meta = version_meta; } @@ -1714,118 +1834,13 @@ impl Transaction { Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; } - // Set version metadata for newly created fragments (updated rows) - // Preserve created_at from original fragments, set last_updated to new version if next_row_id.is_some() { let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); - - // Build a reverse index: stable row ID → (fragment ref, position within fragment) - // by reading each existing fragment's RowIdMeta sequence. - let mut row_id_to_source: std::collections::HashMap = - std::collections::HashMap::new(); - for frag in existing_fragments.iter() { - if let Some(lance_table::format::RowIdMeta::Inline(data)) = - &frag.row_id_meta - && let Ok(seq) = lance_table::rowids::read_row_ids(data) - { - for (offset, rid) in seq.iter().enumerate() { - row_id_to_source.insert(rid, (frag, offset)); - } - } - } - - for fragment in new_fragments.iter_mut() { - let row_ids = if let Some(row_id_meta) = &fragment.row_id_meta { - match row_id_meta { - lance_table::format::RowIdMeta::Inline(data) => { - lance_table::rowids::read_row_ids(data).ok() - } - lance_table::format::RowIdMeta::External(_) => None, - } - } else { - None - }; - - if let Some(row_ids) = row_ids { - let physical_rows = fragment.physical_rows.unwrap_or(0); - let mut created_at_versions = Vec::with_capacity(physical_rows); - - for row_id in row_ids.iter() { - let created_version = if let Some((orig_frag, row_offset)) = - row_id_to_source.get(&row_id) - { - if let Some(created_meta) = &orig_frag.created_at_version_meta { - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(*row_offset).copied().unwrap_or(1) - } - Err(_) => 1, - } - } else { - 1 - } - } else { - 1 - }; - created_at_versions.push(created_version); - } - - let mut runs = Vec::new(); - if !created_at_versions.is_empty() { - let mut current_version = created_at_versions[0]; - let mut run_start = 0u64; - - for (i, &version) in created_at_versions.iter().enumerate().skip(1) - { - if version != current_version { - runs.push(lance_table::format::RowDatasetVersionRun { - span: lance_table::rowids::segment::U64Segment::Range( - run_start..i as u64, - ), - version: current_version, - }); - current_version = version; - run_start = i as u64; - } - } - runs.push(lance_table::format::RowDatasetVersionRun { - span: lance_table::rowids::segment::U64Segment::Range( - run_start..created_at_versions.len() as u64, - ), - version: current_version, - }); - } - - let created_at_seq = - lance_table::format::RowDatasetVersionSequence { runs }; - fragment.created_at_version_meta = Some( - lance_table::format::RowDatasetVersionMeta::from_sequence( - &created_at_seq, - ) - .map_err(|e| { - Error::internal(format!( - "Failed to create created_at version metadata: {}", - e - )) - })?, - ); - - let last_updated_meta = - lance_table::rowids::version::build_version_meta( - fragment, - new_version, - ); - fragment.last_updated_at_version_meta = last_updated_meta; - } else { - let version_meta = lance_table::rowids::version::build_version_meta( - fragment, - new_version, - ); - fragment.last_updated_at_version_meta = version_meta.clone(); - fragment.created_at_version_meta = version_meta; - } - } + resolve_update_version_metadata( + existing_fragments, + new_fragments.as_mut_slice(), + new_version, + )?; } if config.use_stable_row_ids @@ -1879,8 +1894,7 @@ impl Transaction { // Add version metadata for all new fragments let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); for fragment in new_fragments.iter_mut() { - let version_meta = - lance_table::rowids::version::build_version_meta(fragment, new_version); + let version_meta = build_version_meta(fragment, new_version); fragment.last_updated_at_version_meta = version_meta.clone(); fragment.created_at_version_meta = version_meta; } @@ -4520,6 +4534,8 @@ mod tests { validate_operation(Some(&legacy_manifest), &operation).unwrap(); } + /// Existing fragments use id >= 1 to avoid collision with `Fragment::new(0)` + /// used by `sample_manifest`. New (updated) fragments use id = 10. fn make_stable_row_id_manifest(fragments: Vec) -> Manifest { let schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); let mut manifest = Manifest::new( @@ -4562,6 +4578,17 @@ mod tests { seq.versions().collect() } + fn last_updated_at_versions(manifest: &Manifest, frag_id: u64) -> Vec { + let frag = manifest.fragments.iter().find(|f| f.id == frag_id).unwrap(); + let seq = frag + .last_updated_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + seq.versions().collect() + } + #[test] fn test_update_version_tracking_preserves_created_at() { let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice()); @@ -4605,6 +4632,7 @@ mod tests { .unwrap(); assert_eq!(created_at_versions(&result, 10), vec![5, 5]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); } #[test] @@ -4672,6 +4700,7 @@ mod tests { // Row 11 came from frag_a (offset 1, version 2), row 20 came from frag_b (offset 0, version 3) assert_eq!(created_at_versions(&result, 10), vec![2, 3]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); } #[test] @@ -4719,6 +4748,7 @@ mod tests { // Row 10: offset 0 in frag 1 → version 5. Row 999: unknown → default 1 assert_eq!(created_at_versions(&result, 10), vec![5, 1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); } #[test] @@ -4759,5 +4789,109 @@ mod tests { // Row 50 is found in source but source has no created_at_version_meta → default 1 assert_eq!(created_at_versions(&result, 10), vec![1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5]); + } + + #[test] + fn test_update_version_tracking_no_row_id_meta_fallback() { + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(3), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Fragment starts with no row_id_meta → assign_row_ids gives it fresh IDs → + // those IDs aren't found in existing fragments → created_at defaults to 1 + assert_eq!(created_at_versions(&result, 10), vec![1, 1, 1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5, 5]); + } + + #[test] + fn test_update_version_tracking_corrupt_created_at_defaults_to_1() { + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: Some(RowDatasetVersionMeta::Inline(Arc::from( + vec![0xFFu8; 8].as_slice(), + ))), + last_updated_at_version_meta: None, + }; + + let new_seq = RowIdSequence::from([10u64].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(1), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Corrupt metadata causes decode to fail → falls back to UNKNOWN_CREATED_AT_VERSION (1) + assert_eq!(created_at_versions(&result, 10), vec![1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5]); + } + + #[test] + fn test_encode_version_runs_empty() { + let runs = encode_version_runs(&[]); + assert!(runs.is_empty()); + } + + #[test] + fn test_encode_version_runs_single_run() { + let runs = encode_version_runs(&[3, 3, 3]); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].version, 3); + } + + #[test] + fn test_encode_version_runs_alternating() { + let runs = encode_version_runs(&[1, 2, 1, 2]); + assert_eq!(runs.len(), 4); + assert_eq!(runs[0].version, 1); + assert_eq!(runs[1].version, 2); + assert_eq!(runs[2].version, 1); + assert_eq!(runs[3].version, 2); } } From f2ec31493c1c6f127bfae10a28a6b808def14a6b Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Tue, 14 Apr 2026 15:29:57 +0300 Subject: [PATCH 6/9] fix: saturating arithmetic in sorted_sequence_sizes to prevent overflow crash --- rust/lance-table/src/rowids/segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-table/src/rowids/segment.rs b/rust/lance-table/src/rowids/segment.rs index 1c494f20f09..716b5ef2e23 100644 --- a/rust/lance-table/src/rowids/segment.rs +++ b/rust/lance-table/src/rowids/segment.rs @@ -153,7 +153,7 @@ impl U64Segment { let n_holes = stats.n_holes(); let total_slots = stats.max - stats.min + 1; - let range_with_holes = 24 + 4 * n_holes as usize; + let range_with_holes = 24usize.saturating_add(4usize.saturating_mul(n_holes as usize)); let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize; let sorted_array = 24 + 2 * stats.count as usize; From 23ab50851283910393d84990dcd944ef08fc5da8 Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Tue, 14 Apr 2026 16:45:52 +0300 Subject: [PATCH 7/9] fix: updated implementation, fixed corner cases --- rust/lance/src/dataset.rs | 10 +++++----- rust/lance/src/dataset/transaction.rs | 27 +++++++++++++++++++-------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index b45aee6b814..7889374241b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3177,11 +3177,11 @@ pub(crate) async fn write_manifest_file( mut transaction: Option<&Transaction>, ) -> std::result::Result { if config.auto_set_feature_flags { - apply_feature_flags( - manifest, - config.use_stable_row_ids, - config.disable_transaction_file, - )?; + // build_manifest may have already set FLAG_STABLE_ROW_IDS on the manifest. + // Preserve it here so this second apply_feature_flags call does not clear it + // when config.use_stable_row_ids is false (the ManifestWriteConfig default). + let stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); + apply_feature_flags(manifest, stable_row_ids, config.disable_transaction_file)?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index bfda6062e9d..4a97ee98188 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -83,17 +83,21 @@ fn resolve_update_version_metadata( // ~24 bytes per entry (u64 key + &Fragment pointer + usize offset + hash metadata). // For a 1M-row dataset this is ~24 MB — acceptable for a transient transaction structure. let mut row_id_to_source: HashMap = HashMap::new(); - for frag in existing_fragments.iter() { + // Stable row IDs must be globally unique among *live* rows, but after a rewrite-style + // update the same stable ID can appear twice in `existing_fragments`: once in an older + // fragment's inline `row_id_meta` at the original row offset (rows may be soft-deleted + // via a deletion vector) and again in a newer fragment holding rewritten data. For + // `created_at` we need the mapping from the original fragment/offset; that is always the + // first occurrence when fragments are processed in ascending `id` order. + let mut frag_indices: Vec = (0..existing_fragments.len()).collect(); + frag_indices.sort_by_key(|&i| existing_fragments[i].id); + for &i in &frag_indices { + let frag = &existing_fragments[i]; if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta && let Ok(seq) = read_row_ids(data) { for (offset, rid) in seq.iter().enumerate() { - let prev = row_id_to_source.insert(rid, (frag, offset)); - debug_assert!( - prev.is_none(), - "duplicate row ID {rid} in fragment {}", - frag.id, - ); + row_id_to_source.entry(rid).or_insert((frag, offset)); } } } @@ -2173,9 +2177,16 @@ impl Transaction { manifest.tag.clone_from(&self.tag); if config.auto_set_feature_flags { + // Internal operations (e.g. CreateIndex) use ManifestWriteConfig::default() + // which has use_stable_row_ids = false. Without inheriting from the previous + // manifest, apply_feature_flags would clear FLAG_STABLE_ROW_IDS. + let inherited = current_manifest + .map(|m| m.uses_stable_row_ids()) + .unwrap_or(false); + let stable_row_ids = config.use_stable_row_ids || inherited; apply_feature_flags( &mut manifest, - config.use_stable_row_ids, + stable_row_ids, config.disable_transaction_file, )?; } From 2a49ff9119f71a6c6c9ad021e53bfb415da3e06b Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Wed, 22 Apr 2026 08:53:41 +0300 Subject: [PATCH 8/9] chore: addressed comments --- rust/lance/src/dataset.rs | 4 ++-- rust/lance/src/dataset/transaction.rs | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 05176fbc628..7b2e508319d 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3238,8 +3238,8 @@ pub(crate) async fn write_manifest_file( // build_manifest may have already set FLAG_STABLE_ROW_IDS on the manifest. // Preserve it here so this second apply_feature_flags call does not clear it // when config.use_stable_row_ids is false (the ManifestWriteConfig default). - let stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); - apply_feature_flags(manifest, stable_row_ids, config.disable_transaction_file)?; + let use_stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); + apply_feature_flags(manifest, use_stable_row_ids, config.disable_transaction_file)?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4a97ee98188..ce71a0a73d1 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -89,10 +89,9 @@ fn resolve_update_version_metadata( // via a deletion vector) and again in a newer fragment holding rewritten data. For // `created_at` we need the mapping from the original fragment/offset; that is always the // first occurrence when fragments are processed in ascending `id` order. - let mut frag_indices: Vec = (0..existing_fragments.len()).collect(); - frag_indices.sort_by_key(|&i| existing_fragments[i].id); - for &i in &frag_indices { - let frag = &existing_fragments[i]; + let mut sorted_frags: Vec<&Fragment> = existing_fragments.iter().collect(); + sorted_frags.sort_by_key(|f| f.id); + for frag in sorted_frags { if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta && let Ok(seq) = read_row_ids(data) { @@ -2183,10 +2182,10 @@ impl Transaction { let inherited = current_manifest .map(|m| m.uses_stable_row_ids()) .unwrap_or(false); - let stable_row_ids = config.use_stable_row_ids || inherited; + let use_stable_row_ids = config.use_stable_row_ids || inherited; apply_feature_flags( &mut manifest, - stable_row_ids, + use_stable_row_ids, config.disable_transaction_file, )?; } From 04ad74f24446e7104aa9cd51effb4b8750145dbf Mon Sep 17 00:00:00 2001 From: ivscheianu Date: Wed, 22 Apr 2026 09:04:45 +0300 Subject: [PATCH 9/9] chore: format --- rust/lance/src/dataset.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7b2e508319d..8c0c7c3e105 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3239,7 +3239,11 @@ pub(crate) async fn write_manifest_file( // Preserve it here so this second apply_feature_flags call does not clear it // when config.use_stable_row_ids is false (the ManifestWriteConfig default). let use_stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); - apply_feature_flags(manifest, use_stable_row_ids, config.disable_transaction_file)?; + apply_feature_flags( + manifest, + use_stable_row_ids, + config.disable_transaction_file, + )?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp));