diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 6ef10ee2f7d..0d2f2955724 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -8,12 +8,15 @@ use arrow_schema::DataType; use jni::objects::{JIntArray, JValue, JValueGen}; use jni::{ JNIEnv, - objects::{JObject, JString}, - sys::{jint, jlong}, + objects::{JClass, JLongArray, JObject, JString}, + sys::{jint, jlong, jstring}, }; use lance::datatypes::Schema; -use lance::table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, RowIdMeta}; +use lance::table::format::{ + DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta, +}; use lance_io::utils::CachedFileSize; +use lance_table::rowids::{RowIdSequence, write_row_ids}; use std::iter::once; use lance::dataset::fragment::FileFragment; @@ -496,6 +499,32 @@ fn inner_update_column<'local>( result.into_java(env) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_fragment_RowIdMeta_nativeEncodeRowIds( + mut env: JNIEnv, + _cls: JClass, + row_ids: JLongArray, +) -> jstring { + ok_or_throw_with_return!( + env, + inner_encode_row_ids(&mut env, &row_ids) + .and_then(|json| env.new_string(json).map_err(Error::from)), + std::ptr::null_mut() + ) + .into_raw() +} + +fn inner_encode_row_ids(env: &mut JNIEnv, row_ids: &JLongArray) -> Result { + let len = env.get_array_length(row_ids)?; + let mut buf: Vec = vec![0; len as usize]; + env.get_long_array_region(row_ids, 0, buf.as_mut_slice())?; + let ids: Vec = buf.into_iter().map(|x| x as u64).collect(); + let seq = RowIdSequence::from(ids.as_slice()); + let meta = RowIdMeta::Inline(write_row_ids(&seq)); + let json = serde_json::to_string(&meta)?; + Ok(json) +} + const DATA_FILE_CLASS: &str = "org/lance/fragment/DataFile"; const DATA_FILE_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;[I[IIILjava/lang/Long;Ljava/lang/Integer;)V"; @@ -504,9 +533,11 @@ const DELETE_FILE_CONSTRUCTOR_SIG: &str = "(JJLjava/lang/Long;Lorg/lance/fragment/DeletionFileType;Ljava/lang/Integer;)V"; const DELETE_FILE_TYPE_CLASS: &str = "org/lance/fragment/DeletionFileType"; const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata"; -const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V"; +const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;Lorg/lance/fragment/VersionMeta;Lorg/lance/fragment/VersionMeta;)V"; const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta"; const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V"; +const VERSION_META_CLASS: &str = "org/lance/fragment/VersionMeta"; +const VERSION_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V"; const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult"; const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V"; @@ -621,6 +652,18 @@ impl IntoJava for &RowIdMeta { } } +impl IntoJava for &RowDatasetVersionMeta { + fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result> { + let json_str = serde_json::to_string(self)?; + let json = env.new_string(json_str)?.into(); + Ok(env.new_object( + VERSION_META_CLASS, + VERSION_META_CONSTRUCTOR_SIG, + &[JValueGen::Object(&json)], + )?) + } +} + impl IntoJava for &Fragment { fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result> { let files = self.files.clone(); @@ -634,6 +677,14 @@ impl IntoJava for &Fragment { Some(m) => m.into_java(env)?, None => JObject::null(), }; + let created_at = match &self.created_at_version_meta { + Some(m) => m.into_java(env)?, + None => JObject::null(), + }; + let last_updated_at = match &self.last_updated_at_version_meta { + Some(m) => m.into_java(env)?, + None => JObject::null(), + }; env.new_object( FRAGMENT_METADATA_CLASS, @@ -644,6 +695,8 @@ impl IntoJava for &Fragment { JValueGen::Object(physical_rows), JValueGen::Object(&deletion_file), JValueGen::Object(&row_id_meta), + JValueGen::Object(&created_at), + JValueGen::Object(&last_updated_at), ], ) .map_err(|e| { @@ -663,6 +716,38 @@ impl FromJObjectWithEnv for JObject<'_> { } } +impl FromJObjectWithEnv for JObject<'_> { + fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result { + let metadata = env + .call_method(self, "getMetadata", "()Ljava/lang/String;", &[])? + .l()?; + let s: String = env.get_string(&JString::from(metadata))?.into(); + let meta: RowDatasetVersionMeta = serde_json::from_str(&s)?; + Ok(meta) + } +} + +/// Extract an optional field from a Java object by calling a getter method. +/// Returns `None` if the getter returns null, otherwise deserializes the JObject. +fn extract_nullable_field( + env: &mut JNIEnv<'_>, + obj: &JObject<'_>, + method: &str, + class: &str, +) -> Result> +where + for<'a> JObject<'a>: FromJObjectWithEnv, +{ + let result = env + .call_method(obj, method, format!("()L{};", class), &[])? + .l()?; + if result.is_null() { + Ok(None) + } else { + Ok(Some(result.extract_object(env)?)) + } +} + impl FromJObjectWithEnv for JObject<'_> { fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result { let id = env.call_method(self, "getId", "()I", &[])?.i()? as u64; @@ -675,41 +760,23 @@ impl FromJObjectWithEnv for JObject<'_> { for f in file_objs { files.push(f.extract_object(env)?); } - let deletion_file = env - .call_method( - self, - "getDeletionFile", - format!("()L{};", DELETE_FILE_CLASS), - &[], - )? - .l()?; - let deletion_file = if deletion_file.is_null() { - None - } else { - Some(deletion_file.extract_object(env)?) - }; - let row_id_meta = env - .call_method( - self, - "getRowIdMeta", - format!("()L{};", ROW_ID_META_CLASS), - &[], - )? - .l()?; - let row_id_meta = if row_id_meta.is_null() { - None - } else { - Some(row_id_meta.extract_object(env)?) - }; + let deletion_file = + extract_nullable_field(env, self, "getDeletionFile", DELETE_FILE_CLASS)?; + let row_id_meta = extract_nullable_field(env, self, "getRowIdMeta", ROW_ID_META_CLASS)?; + let created_at_version_meta = + extract_nullable_field(env, self, "getCreatedAtVersionMeta", VERSION_META_CLASS)?; + let last_updated_at_version_meta = + extract_nullable_field(env, self, "getLastUpdatedAtVersionMeta", VERSION_META_CLASS)?; + Ok(Fragment { id, files, deletion_file, physical_rows: Some(physical_rows), row_id_meta, - created_at_version_meta: None, - last_updated_at_version_meta: None, + created_at_version_meta, + last_updated_at_version_meta, }) } } diff --git a/java/src/main/java/org/lance/FragmentMetadata.java b/java/src/main/java/org/lance/FragmentMetadata.java index 8bc701b6351..e7fa5cf3c30 100644 --- a/java/src/main/java/org/lance/FragmentMetadata.java +++ b/java/src/main/java/org/lance/FragmentMetadata.java @@ -16,6 +16,7 @@ import org.lance.fragment.DataFile; import org.lance.fragment.DeletionFile; import org.lance.fragment.RowIdMeta; +import org.lance.fragment.VersionMeta; import com.google.common.base.MoreObjects; @@ -31,6 +32,8 @@ public class FragmentMetadata implements Serializable { private final long physicalRows; private final DeletionFile deletionFile; private final RowIdMeta rowIdMeta; + private final VersionMeta createdAtVersionMeta; + private final VersionMeta lastUpdatedAtVersionMeta; public FragmentMetadata( int id, @@ -38,11 +41,24 @@ public FragmentMetadata( Long physicalRows, DeletionFile deletionFile, RowIdMeta rowIdMeta) { + this(id, files, physicalRows, deletionFile, rowIdMeta, null, null); + } + + public FragmentMetadata( + int id, + List files, + Long physicalRows, + DeletionFile deletionFile, + RowIdMeta rowIdMeta, + VersionMeta createdAtVersionMeta, + VersionMeta lastUpdatedAtVersionMeta) { this.id = id; this.files = files; this.physicalRows = physicalRows; this.deletionFile = deletionFile; this.rowIdMeta = rowIdMeta; + this.createdAtVersionMeta = createdAtVersionMeta; + this.lastUpdatedAtVersionMeta = lastUpdatedAtVersionMeta; } public int getId() { @@ -80,6 +96,14 @@ public RowIdMeta getRowIdMeta() { return rowIdMeta; } + public VersionMeta getCreatedAtVersionMeta() { + return createdAtVersionMeta; + } + + public VersionMeta getLastUpdatedAtVersionMeta() { + return lastUpdatedAtVersionMeta; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -93,7 +117,21 @@ public boolean equals(Object o) { && physicalRows == that.physicalRows && Objects.equals(this.files, that.files) && Objects.equals(deletionFile, that.deletionFile) - && Objects.equals(rowIdMeta, that.rowIdMeta); + && Objects.equals(rowIdMeta, that.rowIdMeta) + && Objects.equals(createdAtVersionMeta, that.createdAtVersionMeta) + && Objects.equals(lastUpdatedAtVersionMeta, that.lastUpdatedAtVersionMeta); + } + + @Override + public int hashCode() { + return Objects.hash( + id, + physicalRows, + files, + deletionFile, + rowIdMeta, + createdAtVersionMeta, + lastUpdatedAtVersionMeta); } @Override @@ -104,6 +142,8 @@ public String toString() { .add("files", files) .add("deletionFile", deletionFile) .add("rowIdMeta", rowIdMeta) + .add("createdAtVersionMeta", createdAtVersionMeta) + .add("lastUpdatedAtVersionMeta", lastUpdatedAtVersionMeta) .toString(); } } diff --git a/java/src/main/java/org/lance/fragment/RowIdMeta.java b/java/src/main/java/org/lance/fragment/RowIdMeta.java index f9c2fa8b4ac..49e41a3192b 100644 --- a/java/src/main/java/org/lance/fragment/RowIdMeta.java +++ b/java/src/main/java/org/lance/fragment/RowIdMeta.java @@ -13,6 +13,8 @@ */ package org.lance.fragment; +import org.lance.JniLoader; + import com.google.common.base.MoreObjects; import java.io.Serializable; @@ -21,12 +23,30 @@ public class RowIdMeta implements Serializable { private static final long serialVersionUID = -6532828695072614148L; + static { + JniLoader.ensureLoaded(); + } + private final String metadata; public RowIdMeta(String metadata) { this.metadata = metadata; } + /** + * Creates a RowIdMeta from an array of stable row IDs by delegating to the Rust {@code + * write_row_ids} encoder via JNI. The returned metadata is a JSON string wrapping the + * protobuf-encoded RowIdSequence, matching the format expected by lance-core. + * + * @param rowIds stable row IDs to encode + * @return RowIdMeta containing the serialized inline representation + */ + public static RowIdMeta fromRowIds(long[] rowIds) { + return new RowIdMeta(nativeEncodeRowIds(rowIds)); + } + + private static native String nativeEncodeRowIds(long[] rowIds); + public String getMetadata() { return metadata; } @@ -43,6 +63,11 @@ public boolean equals(Object obj) { return Objects.equals(metadata, that.metadata); } + @Override + public int hashCode() { + return Objects.hash(metadata); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("metadata", metadata).toString(); diff --git a/java/src/main/java/org/lance/fragment/VersionMeta.java b/java/src/main/java/org/lance/fragment/VersionMeta.java new file mode 100644 index 00000000000..53f5a5468e3 --- /dev/null +++ b/java/src/main/java/org/lance/fragment/VersionMeta.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Metadata for per-row dataset version sequences (created_at / last_updated_at). Wraps the + * JSON-serialized Rust RowDatasetVersionMeta enum. + * + *

Structurally identical to {@link RowIdMeta} — kept separate because the two map to distinct + * Rust types with different serialization formats and evolution paths. + */ +public class VersionMeta implements Serializable { + private static final long serialVersionUID = 1L; + + private final String metadata; + + public VersionMeta(String metadata) { + this.metadata = metadata; + } + + public String getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + VersionMeta that = (VersionMeta) obj; + return Objects.equals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(metadata); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("metadata", metadata).toString(); + } +} diff --git a/java/src/test/java/org/lance/fragment/RowIdMetaTest.java b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java new file mode 100644 index 00000000000..ddedd706341 --- /dev/null +++ b/java/src/test/java/org/lance/fragment/RowIdMetaTest.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class RowIdMetaTest { + + @Test + void testFromRowIdsSingleRow() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {42}); + String json = meta.getMetadata(); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsMultipleRows() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {0, 1, 2, 100, Long.MAX_VALUE}); + assertNotNull(meta); + String json = meta.getMetadata(); + assertFalse(json.isEmpty()); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsEmpty() { + RowIdMeta meta = RowIdMeta.fromRowIds(new long[] {}); + String json = meta.getMetadata(); + assertTrue(json.startsWith("{\"Inline\":[")); + assertTrue(json.endsWith("]}")); + } + + @Test + void testFromRowIdsRoundTrip() { + long[] ids = {10, 20, 30}; + RowIdMeta first = RowIdMeta.fromRowIds(ids); + RowIdMeta second = RowIdMeta.fromRowIds(ids); + assertEquals(first, second); + } + + @Test + void testFromRowIdsDeterministic() { + long[] ids = {10, 20, 30}; + String a = RowIdMeta.fromRowIds(ids).getMetadata(); + String b = RowIdMeta.fromRowIds(ids).getMetadata(); + assertEquals(a, b); + } + + @Test + void testEquals() { + RowIdMeta a = new RowIdMeta("test"); + RowIdMeta b = new RowIdMeta("test"); + RowIdMeta c = new RowIdMeta("other"); + + assertEquals(a, b); + assertNotEquals(a, c); + assertNotEquals(a, null); + assertNotEquals(a, "test"); + assertEquals(a, a); + } + + @Test + void testHashCodeConsistency() { + RowIdMeta a = new RowIdMeta("test"); + RowIdMeta b = new RowIdMeta("test"); + assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void testToString() { + RowIdMeta meta = new RowIdMeta("someMetadata"); + String str = meta.toString(); + assertTrue(str.contains("RowIdMeta")); + assertTrue(str.contains("someMetadata")); + } +} diff --git a/java/src/test/java/org/lance/fragment/VersionMetaTest.java b/java/src/test/java/org/lance/fragment/VersionMetaTest.java new file mode 100644 index 00000000000..038d24980d2 --- /dev/null +++ b/java/src/test/java/org/lance/fragment/VersionMetaTest.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.fragment; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VersionMetaTest { + + @Test + void testConstructorAndGetter() { + VersionMeta meta = new VersionMeta("test"); + assertEquals("test", meta.getMetadata()); + } + + @Test + void testNullMetadata() { + VersionMeta meta = new VersionMeta(null); + assertNull(meta.getMetadata()); + } + + @Test + void testEquals() { + VersionMeta a = new VersionMeta("x"); + VersionMeta b = new VersionMeta("x"); + assertEquals(a, b); + assertEquals(b, a); + + VersionMeta c = new VersionMeta("y"); + assertNotEquals(a, c); + + assertFalse(a.equals(null)); + + assertFalse(a.equals("x")); + } + + @Test + void testHashCode() { + VersionMeta a = new VersionMeta("same"); + VersionMeta b = new VersionMeta("same"); + assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void testHashCodeWithNull() { + VersionMeta meta = new VersionMeta(null); + meta.hashCode(); + } + + @Test + void testToString() { + VersionMeta meta = new VersionMeta("hello"); + String s = meta.toString(); + assertTrue(s.contains("VersionMeta")); + assertTrue(s.contains("hello")); + } + + @Test + void testJsonMetadataPreservation() { + String json = "{\"Inline\":[10,20,30]}"; + VersionMeta meta = new VersionMeta(json); + assertEquals(json, meta.getMetadata()); + } +} diff --git a/rust/lance-table/src/rowids/segment.rs b/rust/lance-table/src/rowids/segment.rs index 1c494f20f09..716b5ef2e23 100644 --- a/rust/lance-table/src/rowids/segment.rs +++ b/rust/lance-table/src/rowids/segment.rs @@ -153,7 +153,7 @@ impl U64Segment { let n_holes = stats.n_holes(); let total_slots = stats.max - stats.min + 1; - let range_with_holes = 24 + 4 * n_holes as usize; + let range_with_holes = 24usize.saturating_add(4usize.saturating_mul(n_holes as usize)); let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize; let sorted_array = 24 + 2 * stats.count as usize; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 8a7a9cf3636..8c0c7c3e105 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3235,9 +3235,13 @@ pub(crate) async fn write_manifest_file( mut transaction: Option<&Transaction>, ) -> std::result::Result { if config.auto_set_feature_flags { + // build_manifest may have already set FLAG_STABLE_ROW_IDS on the manifest. + // Preserve it here so this second apply_feature_flags call does not clear it + // when config.use_stable_row_ids is false (the ManifestWriteConfig default). + let use_stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); apply_feature_flags( manifest, - config.use_stable_row_ids, + use_stable_row_ids, config.disable_transaction_file, )?; } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 62f30696234..ce71a0a73d1 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -28,13 +28,13 @@ use lance_table::rowids::read_row_ids; use lance_table::{ format::{ BasePath, DataFile, DataStorageFormat, Fragment, IndexFile, IndexMetadata, Manifest, - RowIdMeta, pb, + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, pb, }, io::{ commit::CommitHandler, manifest::{read_manifest, read_manifest_indexes}, }, - rowids::{RowIdSequence, write_row_ids}, + rowids::{RowIdSequence, segment::U64Segment, version::build_version_meta, write_row_ids}, }; use object_store::path::Path; use roaring::RoaringBitmap; @@ -45,6 +45,130 @@ use std::{ }; use uuid::Uuid; +/// Fallback version for rows whose original creation version cannot be determined. +/// Version 1 is the initial dataset version in the Lance format. +const UNKNOWN_CREATED_AT_VERSION: u64 = 1; + +/// Look up the `created_at` version for a single row ID in `row_id_to_source`. +/// +/// Walks the original fragment's `created_at_version_meta` to find the version +/// at the given offset. Returns [`UNKNOWN_CREATED_AT_VERSION`] for any failure +/// (missing metadata, decode error, out-of-range offset, unmapped row ID). +fn resolve_created_at_version( + row_id: u64, + row_id_to_source: &HashMap, +) -> u64 { + let Some((orig_frag, row_offset)) = row_id_to_source.get(&row_id) else { + return UNKNOWN_CREATED_AT_VERSION; + }; + let Some(created_meta) = &orig_frag.created_at_version_meta else { + return UNKNOWN_CREATED_AT_VERSION; + }; + match created_meta.load_sequence() { + Ok(seq) => seq + .versions() + .nth(*row_offset) + .unwrap_or(UNKNOWN_CREATED_AT_VERSION), + Err(_) => UNKNOWN_CREATED_AT_VERSION, + } +} + +/// For each new fragment produced by an update, set `created_at_version_meta` +/// (preserved from the original rows) and `last_updated_at_version_meta`. +fn resolve_update_version_metadata( + existing_fragments: &[Fragment], + new_fragments: &mut [Fragment], + new_version: u64, +) -> Result<()> { + // ~24 bytes per entry (u64 key + &Fragment pointer + usize offset + hash metadata). + // For a 1M-row dataset this is ~24 MB — acceptable for a transient transaction structure. + let mut row_id_to_source: HashMap = HashMap::new(); + // Stable row IDs must be globally unique among *live* rows, but after a rewrite-style + // update the same stable ID can appear twice in `existing_fragments`: once in an older + // fragment's inline `row_id_meta` at the original row offset (rows may be soft-deleted + // via a deletion vector) and again in a newer fragment holding rewritten data. For + // `created_at` we need the mapping from the original fragment/offset; that is always the + // first occurrence when fragments are processed in ascending `id` order. + let mut sorted_frags: Vec<&Fragment> = existing_fragments.iter().collect(); + sorted_frags.sort_by_key(|f| f.id); + for frag in sorted_frags { + if let Some(RowIdMeta::Inline(data)) = &frag.row_id_meta + && let Ok(seq) = read_row_ids(data) + { + for (offset, rid) in seq.iter().enumerate() { + row_id_to_source.entry(rid).or_insert((frag, offset)); + } + } + } + + for fragment in new_fragments.iter_mut() { + let row_ids = match &fragment.row_id_meta { + Some(RowIdMeta::Inline(data)) => read_row_ids(data).ok(), + Some(RowIdMeta::External(_)) => { + log::warn!( + "Fragment {} has external row ID metadata; \ + version tracking will use defaults", + fragment.id, + ); + None + } + None => None, + }; + + if let Some(row_ids) = row_ids { + let physical_rows = fragment.physical_rows.unwrap_or(0); + let created_at_versions: Vec = row_ids + .iter() + .map(|rid| resolve_created_at_version(rid, &row_id_to_source)) + .collect(); + debug_assert_eq!(created_at_versions.len(), physical_rows); + + let runs = encode_version_runs(&created_at_versions); + let created_at_seq = RowDatasetVersionSequence { runs }; + fragment.created_at_version_meta = Some( + RowDatasetVersionMeta::from_sequence(&created_at_seq).map_err(|e| { + Error::internal(format!( + "Failed to create created_at version metadata: {}", + e + )) + })?, + ); + + fragment.last_updated_at_version_meta = build_version_meta(fragment, new_version); + } else { + let version_meta = build_version_meta(fragment, new_version); + fragment.last_updated_at_version_meta = version_meta.clone(); + fragment.created_at_version_meta = version_meta; + } + } + Ok(()) +} + +/// Run-length encode a sequence of per-row versions into [`RowDatasetVersionRun`]s. +fn encode_version_runs(versions: &[u64]) -> Vec { + if versions.is_empty() { + return Vec::new(); + } + let mut runs = Vec::new(); + let mut current_version = versions[0]; + let mut run_start = 0u64; + for (i, &version) in versions.iter().enumerate().skip(1) { + if version != current_version { + runs.push(RowDatasetVersionRun { + span: U64Segment::Range(run_start..i as u64), + version: current_version, + }); + current_version = version; + run_start = i as u64; + } + } + runs.push(RowDatasetVersionRun { + span: U64Segment::Range(run_start..versions.len() as u64), + version: current_version, + }); + runs +} + /// A change to a dataset that can be retried /// /// This contains enough information to be able to build the next manifest, @@ -1634,8 +1758,7 @@ impl Transaction { // Add version metadata for all new fragments let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); for fragment in new_fragments.iter_mut() { - let version_meta = - lance_table::rowids::version::build_version_meta(fragment, new_version); + let version_meta = build_version_meta(fragment, new_version); fragment.last_updated_at_version_meta = version_meta.clone(); fragment.created_at_version_meta = version_meta; } @@ -1714,131 +1837,13 @@ impl Transaction { Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?; } - // Set version metadata for newly created fragments (updated rows) - // Preserve created_at from original fragments, set last_updated to new version if next_row_id.is_some() { let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); - - // Build a map of original fragment ID -> original fragment for lookup - let original_frags_map: std::collections::HashMap = - existing_fragments.iter().map(|f| (f.id, f)).collect(); - - for fragment in new_fragments.iter_mut() { - // For update operations with RewriteRows mode: - // - Rows are deleted from old fragments and rewritten to new fragments - // - last_updated_at should be the current version (when update happened) - // - created_at should be preserved from the original fragment - - // Read row IDs from this fragment to find original fragments - let row_ids = if let Some(row_id_meta) = &fragment.row_id_meta { - match row_id_meta { - lance_table::format::RowIdMeta::Inline(data) => { - lance_table::rowids::read_row_ids(data).ok() - } - lance_table::format::RowIdMeta::External(_) => None, - } - } else { - None - }; - - if let Some(row_ids) = row_ids { - // Extract created_at version for each row from original fragments - let physical_rows = fragment.physical_rows.unwrap_or(0); - let mut created_at_versions = Vec::with_capacity(physical_rows); - - for row_id in row_ids.iter() { - // Row ID format: upper 32 bits = fragment ID, lower 32 bits = row offset - let orig_frag_id = row_id >> 32; - let row_offset = (row_id & 0xFFFFFFFF) as usize; - - // Look up the original fragment - if let Some(orig_frag) = original_frags_map.get(&orig_frag_id) { - // Get created_at version from original fragment's metadata - let created_version = if let Some(created_meta) = - &orig_frag.created_at_version_meta - { - // Load and index into the version sequence - match created_meta.load_sequence() { - Ok(seq) => { - let versions: Vec = seq.versions().collect(); - versions.get(row_offset).copied().unwrap_or(1) - } - Err(_e) => { - 1 // Default to version 1 on error - } - } - } else { - // No metadata on original fragment, default to version 1 - 1 - }; - created_at_versions.push(created_version); - } else { - // Original fragment not found, default to version 1 - created_at_versions.push(1); - } - } - - // Build version metadata from the collected versions - // Compress into runs: consecutive identical versions become one run - let mut runs = Vec::new(); - if !created_at_versions.is_empty() { - let mut current_version = created_at_versions[0]; - let mut run_start = 0u64; - - for (i, &version) in created_at_versions.iter().enumerate().skip(1) - { - if version != current_version { - // End current run, start new one - runs.push(lance_table::format::RowDatasetVersionRun { - span: lance_table::rowids::segment::U64Segment::Range( - run_start..i as u64, - ), - version: current_version, - }); - current_version = version; - run_start = i as u64; - } - } - // Add final run - runs.push(lance_table::format::RowDatasetVersionRun { - span: lance_table::rowids::segment::U64Segment::Range( - run_start..created_at_versions.len() as u64, - ), - version: current_version, - }); - } - - let created_at_seq = - lance_table::format::RowDatasetVersionSequence { runs }; - fragment.created_at_version_meta = Some( - lance_table::format::RowDatasetVersionMeta::from_sequence( - &created_at_seq, - ) - .map_err(|e| { - Error::internal(format!( - "Failed to create created_at version metadata: {}", - e - )) - })?, - ); - - // Set last_updated_at to the new version for all rows - let last_updated_meta = - lance_table::rowids::version::build_version_meta( - fragment, - new_version, - ); - fragment.last_updated_at_version_meta = last_updated_meta; - } else { - // Fallback: can't read row IDs, set both to new version - let version_meta = lance_table::rowids::version::build_version_meta( - fragment, - new_version, - ); - fragment.last_updated_at_version_meta = version_meta.clone(); - fragment.created_at_version_meta = version_meta; - } - } + resolve_update_version_metadata( + existing_fragments, + new_fragments.as_mut_slice(), + new_version, + )?; } if config.use_stable_row_ids @@ -1892,8 +1897,7 @@ impl Transaction { // Add version metadata for all new fragments let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1); for fragment in new_fragments.iter_mut() { - let version_meta = - lance_table::rowids::version::build_version_meta(fragment, new_version); + let version_meta = build_version_meta(fragment, new_version); fragment.last_updated_at_version_meta = version_meta.clone(); fragment.created_at_version_meta = version_meta; } @@ -2172,9 +2176,16 @@ impl Transaction { manifest.tag.clone_from(&self.tag); if config.auto_set_feature_flags { + // Internal operations (e.g. CreateIndex) use ManifestWriteConfig::default() + // which has use_stable_row_ids = false. Without inheriting from the previous + // manifest, apply_feature_flags would clear FLAG_STABLE_ROW_IDS. + let inherited = current_manifest + .map(|m| m.uses_stable_row_ids()) + .unwrap_or(false); + let use_stable_row_ids = config.use_stable_row_ids || inherited; apply_feature_flags( &mut manifest, - config.use_stable_row_ids, + use_stable_row_ids, config.disable_transaction_file, )?; } @@ -3489,6 +3500,11 @@ mod tests { use chrono::Utc; use lance_core::datatypes::Schema as LanceSchema; use lance_io::utils::CachedFileSize; + use lance_table::format::{ + RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, + }; + use lance_table::rowids::segment::U64Segment; + use lance_table::rowids::write_row_ids; use std::sync::Arc; use uuid::Uuid; @@ -4527,4 +4543,365 @@ mod tests { // cause strict validation of the new STABLE fragments. validate_operation(Some(&legacy_manifest), &operation).unwrap(); } + + /// Existing fragments use id >= 1 to avoid collision with `Fragment::new(0)` + /// used by `sample_manifest`. New (updated) fragments use id = 10. + fn make_stable_row_id_manifest(fragments: Vec) -> Manifest { + let schema = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, false)]); + let mut manifest = Manifest::new( + LanceSchema::try_from(&schema).unwrap(), + Arc::new(fragments), + DataStorageFormat::new(LanceFileVersion::V2_0), + HashMap::new(), + ); + manifest.reader_feature_flags = FLAG_STABLE_ROW_IDS; + manifest.next_row_id = 1000; + manifest.version = 4; + manifest + } + + fn update_txn(new_fragments: Vec) -> Transaction { + Transaction::new( + 4, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments, + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + }, + None, + ) + } + + fn created_at_versions(manifest: &Manifest, frag_id: u64) -> Vec { + let frag = manifest.fragments.iter().find(|f| f.id == frag_id).unwrap(); + let seq = frag + .created_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + seq.versions().collect() + } + + fn last_updated_at_versions(manifest: &Manifest, frag_id: u64) -> Vec { + let frag = manifest.fragments.iter().find(|f| f.id == frag_id).unwrap(); + let seq = frag + .last_updated_at_version_meta + .as_ref() + .unwrap() + .load_sequence() + .unwrap(); + seq.versions().collect() + } + + #[test] + fn test_update_version_tracking_preserves_created_at() { + let existing_seq = RowIdSequence::from([100u64, 101, 102].as_slice()); + let created_at_seq = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..3), + version: 5, + }], + }; + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(3), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(), + ), + last_updated_at_version_meta: None, + }; + + let new_seq = RowIdSequence::from([100u64, 102].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + assert_eq!(created_at_versions(&result, 10), vec![5, 5]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); + } + + #[test] + fn test_update_version_tracking_mixed_origins() { + let frag_a_seq = RowIdSequence::from([10u64, 11].as_slice()); + let frag_a_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..2), + version: 2, + }], + }; + let frag_b_seq = RowIdSequence::from([20u64, 21, 22].as_slice()); + let frag_b_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..3), + version: 3, + }], + }; + + let manifest = make_stable_row_id_manifest(vec![ + Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_a_seq))), + physical_rows: Some(2), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&frag_a_created).unwrap(), + ), + last_updated_at_version_meta: None, + }, + Fragment { + id: 2, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_b_seq))), + physical_rows: Some(3), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&frag_b_created).unwrap(), + ), + last_updated_at_version_meta: None, + }, + ]); + + // New fragment has rows from both original fragments: row 11 from frag_a, row 20 from frag_b + let new_seq = RowIdSequence::from([11u64, 20].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Row 11 came from frag_a (offset 1, version 2), row 20 came from frag_b (offset 0, version 3) + assert_eq!(created_at_versions(&result, 10), vec![2, 3]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); + } + + #[test] + fn test_update_version_tracking_unknown_row_id_defaults_to_1() { + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); + let existing_created = RowDatasetVersionSequence { + runs: vec![RowDatasetVersionRun { + span: U64Segment::Range(0..2), + version: 5, + }], + }; + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: Some( + RowDatasetVersionMeta::from_sequence(&existing_created).unwrap(), + ), + last_updated_at_version_meta: None, + }; + + // New fragment has row 10 (known) and row 999 (unknown — freshly inserted) + let new_seq = RowIdSequence::from([10u64, 999].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Row 10: offset 0 in frag 1 → version 5. Row 999: unknown → default 1 + assert_eq!(created_at_versions(&result, 10), vec![5, 1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5]); + } + + #[test] + fn test_update_version_tracking_source_fragment_no_created_at_defaults_to_1() { + // Source fragment has row_id_meta but no created_at_version_meta. + // The row IS found in the lookup, but the version defaults to 1. + let existing_seq = RowIdSequence::from([50u64, 51].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let new_seq = RowIdSequence::from([50u64].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(1), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Row 50 is found in source but source has no created_at_version_meta → default 1 + assert_eq!(created_at_versions(&result, 10), vec![1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5]); + } + + #[test] + fn test_update_version_tracking_no_row_id_meta_fallback() { + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(3), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Fragment starts with no row_id_meta → assign_row_ids gives it fresh IDs → + // those IDs aren't found in existing fragments → created_at defaults to 1 + assert_eq!(created_at_versions(&result, 10), vec![1, 1, 1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5, 5, 5]); + } + + #[test] + fn test_update_version_tracking_corrupt_created_at_defaults_to_1() { + let existing_seq = RowIdSequence::from([10u64, 11].as_slice()); + let existing_fragment = Fragment { + id: 1, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), + physical_rows: Some(2), + created_at_version_meta: Some(RowDatasetVersionMeta::Inline(Arc::from( + vec![0xFFu8; 8].as_slice(), + ))), + last_updated_at_version_meta: None, + }; + + let new_seq = RowIdSequence::from([10u64].as_slice()); + let new_fragment = Fragment { + id: 10, + files: vec![], + deletion_file: None, + row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), + physical_rows: Some(1), + created_at_version_meta: None, + last_updated_at_version_meta: None, + }; + + let manifest = make_stable_row_id_manifest(vec![existing_fragment]); + let (result, _) = update_txn(vec![new_fragment]) + .build_manifest( + Some(&manifest), + vec![], + "txn", + &ManifestWriteConfig::default(), + ) + .unwrap(); + + // Corrupt metadata causes decode to fail → falls back to UNKNOWN_CREATED_AT_VERSION (1) + assert_eq!(created_at_versions(&result, 10), vec![1]); + assert_eq!(last_updated_at_versions(&result, 10), vec![5]); + } + + #[test] + fn test_encode_version_runs_empty() { + let runs = encode_version_runs(&[]); + assert!(runs.is_empty()); + } + + #[test] + fn test_encode_version_runs_single_run() { + let runs = encode_version_runs(&[3, 3, 3]); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].version, 3); + } + + #[test] + fn test_encode_version_runs_alternating() { + let runs = encode_version_runs(&[1, 2, 1, 2]); + assert_eq!(runs.len(), 4); + assert_eq!(runs[0].version, 1); + assert_eq!(runs[1].version, 2); + assert_eq!(runs[2].version, 1); + assert_eq!(runs[3].version, 2); + } }