Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 99 additions & 32 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ use arrow_schema::DataType;
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::{
JNIEnv,
objects::{JObject, JString},
sys::{jint, jlong},
objects::{JClass, JLongArray, JObject, JString},
sys::{jint, jlong, jstring},
};
use lance::datatypes::Schema;
use lance::table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, RowIdMeta};
use lance::table::format::{
DataFile, DeletionFile, DeletionFileType, Fragment, RowDatasetVersionMeta, RowIdMeta,
};
use lance_io::utils::CachedFileSize;
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::iter::once;

use lance::dataset::fragment::FileFragment;
Expand Down Expand Up @@ -496,6 +499,32 @@ fn inner_update_column<'local>(
result.into_java(env)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_RowIdMeta_nativeEncodeRowIds(
mut env: JNIEnv,
_cls: JClass,
row_ids: JLongArray,
) -> jstring {
ok_or_throw_with_return!(
env,
inner_encode_row_ids(&mut env, &row_ids)
.and_then(|json| env.new_string(json).map_err(Error::from)),
std::ptr::null_mut()
)
.into_raw()
}

fn inner_encode_row_ids(env: &mut JNIEnv, row_ids: &JLongArray) -> Result<String> {
let len = env.get_array_length(row_ids)?;
let mut buf: Vec<i64> = vec![0; len as usize];
env.get_long_array_region(row_ids, 0, buf.as_mut_slice())?;
let ids: Vec<u64> = buf.into_iter().map(|x| x as u64).collect();
let seq = RowIdSequence::from(ids.as_slice());
let meta = RowIdMeta::Inline(write_row_ids(&seq));
let json = serde_json::to_string(&meta)?;
Ok(json)
}

const DATA_FILE_CLASS: &str = "org/lance/fragment/DataFile";
const DATA_FILE_CONSTRUCTOR_SIG: &str =
"(Ljava/lang/String;[I[IIILjava/lang/Long;Ljava/lang/Integer;)V";
Expand All @@ -504,9 +533,11 @@ const DELETE_FILE_CONSTRUCTOR_SIG: &str =
"(JJLjava/lang/Long;Lorg/lance/fragment/DeletionFileType;Ljava/lang/Integer;)V";
const DELETE_FILE_TYPE_CLASS: &str = "org/lance/fragment/DeletionFileType";
const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata";
const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V";
const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;Lorg/lance/fragment/VersionMeta;Lorg/lance/fragment/VersionMeta;)V";
const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta";
const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V";
const VERSION_META_CLASS: &str = "org/lance/fragment/VersionMeta";
const VERSION_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V";
const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult";
const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str =
"(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V";
Expand Down Expand Up @@ -621,6 +652,18 @@ impl IntoJava for &RowIdMeta {
}
}

impl IntoJava for &RowDatasetVersionMeta {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit pick... Let's use 'local for the lifetime, similar to the other Fragment.into_java().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'a is actually the dominant convention across all the other IntoJava impls in this file (DataFile, DeletionFile, RowIdMeta, FragmentMergeResult, etc.). Fragment is actually the outlier. My main concern was about consistency, happy to change it if you think it's the appropriate move

let json_str = serde_json::to_string(self)?;
let json = env.new_string(json_str)?.into();
Ok(env.new_object(
VERSION_META_CLASS,
VERSION_META_CONSTRUCTOR_SIG,
&[JValueGen::Object(&json)],
)?)
}
}

impl IntoJava for &Fragment {
fn into_java<'local>(self, env: &mut JNIEnv<'local>) -> Result<JObject<'local>> {
let files = self.files.clone();
Expand All @@ -634,6 +677,14 @@ impl IntoJava for &Fragment {
Some(m) => m.into_java(env)?,
None => JObject::null(),
};
let created_at = match &self.created_at_version_meta {
Some(m) => m.into_java(env)?,
None => JObject::null(),
};
let last_updated_at = match &self.last_updated_at_version_meta {
Some(m) => m.into_java(env)?,
None => JObject::null(),
};

env.new_object(
FRAGMENT_METADATA_CLASS,
Expand All @@ -644,6 +695,8 @@ impl IntoJava for &Fragment {
JValueGen::Object(physical_rows),
JValueGen::Object(&deletion_file),
JValueGen::Object(&row_id_meta),
JValueGen::Object(&created_at),
JValueGen::Object(&last_updated_at),
],
)
.map_err(|e| {
Expand All @@ -663,6 +716,38 @@ impl FromJObjectWithEnv<RowIdMeta> for JObject<'_> {
}
}

impl FromJObjectWithEnv<RowDatasetVersionMeta> for JObject<'_> {
fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result<RowDatasetVersionMeta> {
let metadata = env
.call_method(self, "getMetadata", "()Ljava/lang/String;", &[])?
.l()?;
let s: String = env.get_string(&JString::from(metadata))?.into();
let meta: RowDatasetVersionMeta = serde_json::from_str(&s)?;
Ok(meta)
}
}

/// Extract an optional field from a Java object by calling a getter method.
/// Returns `None` if the getter returns null, otherwise deserializes the JObject.
fn extract_nullable_field<T>(
env: &mut JNIEnv<'_>,
obj: &JObject<'_>,
method: &str,
class: &str,
) -> Result<Option<T>>
where
for<'a> JObject<'a>: FromJObjectWithEnv<T>,
{
let result = env
.call_method(obj, method, format!("()L{};", class), &[])?
.l()?;
if result.is_null() {
Ok(None)
} else {
Ok(Some(result.extract_object(env)?))
}
}

impl FromJObjectWithEnv<Fragment> for JObject<'_> {
fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result<Fragment> {
let id = env.call_method(self, "getId", "()I", &[])?.i()? as u64;
Expand All @@ -675,41 +760,23 @@ impl FromJObjectWithEnv<Fragment> for JObject<'_> {
for f in file_objs {
files.push(f.extract_object(env)?);
}
let deletion_file = env
.call_method(
self,
"getDeletionFile",
format!("()L{};", DELETE_FILE_CLASS),
&[],
)?
.l()?;
let deletion_file = if deletion_file.is_null() {
None
} else {
Some(deletion_file.extract_object(env)?)
};

let row_id_meta = env
.call_method(
self,
"getRowIdMeta",
format!("()L{};", ROW_ID_META_CLASS),
&[],
)?
.l()?;
let row_id_meta = if row_id_meta.is_null() {
None
} else {
Some(row_id_meta.extract_object(env)?)
};
let deletion_file =
extract_nullable_field(env, self, "getDeletionFile", DELETE_FILE_CLASS)?;
let row_id_meta = extract_nullable_field(env, self, "getRowIdMeta", ROW_ID_META_CLASS)?;
let created_at_version_meta =
extract_nullable_field(env, self, "getCreatedAtVersionMeta", VERSION_META_CLASS)?;
let last_updated_at_version_meta =
extract_nullable_field(env, self, "getLastUpdatedAtVersionMeta", VERSION_META_CLASS)?;

Ok(Fragment {
id,
files,
deletion_file,
physical_rows: Some(physical_rows),
row_id_meta,
created_at_version_meta: None,
last_updated_at_version_meta: None,
created_at_version_meta,
last_updated_at_version_meta,
})
}
}
Expand Down
42 changes: 41 additions & 1 deletion java/src/main/java/org/lance/FragmentMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.lance.fragment.DataFile;
import org.lance.fragment.DeletionFile;
import org.lance.fragment.RowIdMeta;
import org.lance.fragment.VersionMeta;

import com.google.common.base.MoreObjects;

Expand All @@ -31,18 +32,33 @@ public class FragmentMetadata implements Serializable {
private final long physicalRows;
private final DeletionFile deletionFile;
private final RowIdMeta rowIdMeta;
private final VersionMeta createdAtVersionMeta;
private final VersionMeta lastUpdatedAtVersionMeta;

public FragmentMetadata(
int id,
List<DataFile> files,
Long physicalRows,
DeletionFile deletionFile,
RowIdMeta rowIdMeta) {
this(id, files, physicalRows, deletionFile, rowIdMeta, null, null);
}

public FragmentMetadata(
int id,
List<DataFile> files,
Long physicalRows,
DeletionFile deletionFile,
RowIdMeta rowIdMeta,
VersionMeta createdAtVersionMeta,
VersionMeta lastUpdatedAtVersionMeta) {
this.id = id;
this.files = files;
this.physicalRows = physicalRows;
this.deletionFile = deletionFile;
this.rowIdMeta = rowIdMeta;
this.createdAtVersionMeta = createdAtVersionMeta;
this.lastUpdatedAtVersionMeta = lastUpdatedAtVersionMeta;
}

public int getId() {
Expand Down Expand Up @@ -80,6 +96,14 @@ public RowIdMeta getRowIdMeta() {
return rowIdMeta;
}

public VersionMeta getCreatedAtVersionMeta() {
return createdAtVersionMeta;
}

public VersionMeta getLastUpdatedAtVersionMeta() {
return lastUpdatedAtVersionMeta;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -93,7 +117,21 @@ public boolean equals(Object o) {
&& physicalRows == that.physicalRows
&& Objects.equals(this.files, that.files)
&& Objects.equals(deletionFile, that.deletionFile)
&& Objects.equals(rowIdMeta, that.rowIdMeta);
&& Objects.equals(rowIdMeta, that.rowIdMeta)
&& Objects.equals(createdAtVersionMeta, that.createdAtVersionMeta)
&& Objects.equals(lastUpdatedAtVersionMeta, that.lastUpdatedAtVersionMeta);
}

@Override
public int hashCode() {
return Objects.hash(
id,
physicalRows,
files,
deletionFile,
rowIdMeta,
createdAtVersionMeta,
lastUpdatedAtVersionMeta);
}

@Override
Expand All @@ -104,6 +142,8 @@ public String toString() {
.add("files", files)
.add("deletionFile", deletionFile)
.add("rowIdMeta", rowIdMeta)
.add("createdAtVersionMeta", createdAtVersionMeta)
.add("lastUpdatedAtVersionMeta", lastUpdatedAtVersionMeta)
.toString();
}
}
25 changes: 25 additions & 0 deletions java/src/main/java/org/lance/fragment/RowIdMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package org.lance.fragment;

import org.lance.JniLoader;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
Expand All @@ -21,12 +23,30 @@
public class RowIdMeta implements Serializable {
private static final long serialVersionUID = -6532828695072614148L;

static {
JniLoader.ensureLoaded();
}

private final String metadata;

public RowIdMeta(String metadata) {
this.metadata = metadata;
}

/**
* Creates a RowIdMeta from an array of stable row IDs by delegating to the Rust {@code
* write_row_ids} encoder via JNI. The returned metadata is a JSON string wrapping the
* protobuf-encoded RowIdSequence, matching the format expected by lance-core.
*
* @param rowIds stable row IDs to encode
* @return RowIdMeta containing the serialized inline representation
*/
public static RowIdMeta fromRowIds(long[] rowIds) {
return new RowIdMeta(nativeEncodeRowIds(rowIds));
}

private static native String nativeEncodeRowIds(long[] rowIds);

public String getMetadata() {
return metadata;
}
Expand All @@ -43,6 +63,11 @@ public boolean equals(Object obj) {
return Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(metadata);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("metadata", metadata).toString();
Expand Down
Loading
Loading