Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,26 @@ message Transaction {
repeated MergedGeneration merged_generations = 1;
}

// Assigns a base path to all data files within a fragment.
// Used as part of UpdateBases to atomically register new base paths and
// reassign existing fragment data files in a single commit.
message FragmentBaseAssignment {
// The ID of the fragment whose data files should be reassigned.
uint64 fragment_id = 1;
// The base path ID to assign to this fragment's data files.
// When absent, the fragment's data files are assigned to the dataset root
// (any existing base_id is cleared).
optional uint32 base_id = 2;
}

// An operation that updates base paths in the dataset.
message UpdateBases {
// The new base paths to add to the manifest.
repeated BasePath new_bases = 1;
// Optional per-fragment base reassignments applied atomically with the new
// base registrations. Each entry reassigns all data files in the named
// fragment to the specified base. Fragments not listed are unchanged.
repeated FragmentBaseAssignment fragment_assignments = 2;
}

// The operation of this transaction.
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use fragment::*;
pub use index::{IndexFile, IndexMetadata, index_metadata_codec, list_index_files_with_sizes};

pub use manifest::{
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader,
WriterVersion, is_detached_version,
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, FragmentBaseAssignment, Manifest,
SelfDescribingFileReader, WriterVersion, is_detached_version,
};
pub use transaction::Transaction;

Expand Down
33 changes: 33 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,39 @@ impl From<BasePath> for pb::BasePath {
}
}

/// Assigns a base path to all data files within a fragment.
///
/// Used as part of `UpdateBases` to atomically register new base paths and
/// reassign existing fragment data files in a single commit. No data is moved;
/// the caller is responsible for ensuring the data already exists at the
/// target base location.
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct FragmentBaseAssignment {
/// ID of the fragment whose data files should be reassigned.
pub fragment_id: u64,
/// The base path ID to assign. `None` clears any existing base_id,
/// causing the fragment's data files to resolve against the dataset root.
pub base_id: Option<u32>,
}

impl From<pb::transaction::FragmentBaseAssignment> for FragmentBaseAssignment {
fn from(p: pb::transaction::FragmentBaseAssignment) -> Self {
Self {
fragment_id: p.fragment_id,
base_id: p.base_id,
}
}
}

impl From<FragmentBaseAssignment> for pb::transaction::FragmentBaseAssignment {
fn from(a: FragmentBaseAssignment) -> Self {
Self {
fragment_id: a.fragment_id,
base_id: a.base_id,
}
}
}

impl TryFrom<pb::Manifest> for Manifest {
type Error = Error;

Expand Down
1 change: 1 addition & 0 deletions rust/lance-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ path = "src/main.rs"

[dependencies]
clap = { workspace = true, features = ["derive"] }
lance = { workspace = true, default-features = false }
lance-core.workspace = true
lance-file.workspace = true
lance-io.workspace = true
Expand Down
38 changes: 38 additions & 0 deletions rust/lance-tools/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct LanceToolsArgs {
pub enum LanceToolsCommand {
/// Commands for interacting with Lance files.
File(LanceFileArgs),
/// Commands for interacting with Lance tables.
Table(LanceTableArgs),
}

#[derive(Parser, Debug)]
Expand All @@ -41,12 +43,48 @@ pub struct LanceFileMetaArgs {
pub(crate) source: String,
}

#[derive(Parser, Debug)]
pub struct LanceTableArgs {
#[command(subcommand)]
pub(crate) command: LanceTableCommand,
}

#[derive(Subcommand, Debug)]
pub enum LanceTableCommand {
/// Convert a single-base Lance table into a multi-base table.
///
/// The caller must have already copied the full dataset directory to each
/// additional base URI (e.g. with `azcopy` or `gsutil rsync`) before
/// running this command. Only metadata is updated; no data is moved.
///
/// Example:
/// lance-tools table to-multi-base \
/// --source az://container1/mydata \
/// --additional-base az://container2/mydata \
/// --additional-base az://container3/mydata
ToMultiBase(LanceTableToMultiBaseArgs),
}

#[derive(Args, Debug)]
pub struct LanceTableToMultiBaseArgs {
/// URI of the existing (source) Lance dataset.
#[arg(short = 's', long, value_name = "source")]
pub source: String,

/// URI of an additional copy of the dataset. Specify once per copy.
#[arg(long = "additional-base", value_name = "URI")]
pub additional_base: Vec<String>,
}

impl LanceToolsArgs {
pub async fn run(&self, writer: impl std::io::Write) -> Result<()> {
match &self.command {
LanceToolsCommand::File(args) => match &args.command {
LanceFileCommand::Meta(args) => crate::meta::show_file_meta(writer, args).await,
},
LanceToolsCommand::Table(args) => match &args.command {
LanceTableCommand::ToMultiBase(args) => crate::table::to_multi_base(args).await,
},
}
}
}
1 change: 1 addition & 0 deletions rust/lance-tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@

pub mod cli;
pub mod meta;
pub mod table;
pub mod util;
24 changes: 24 additions & 0 deletions rust/lance-tools/src/table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use crate::cli::LanceTableToMultiBaseArgs;
use lance::Dataset;
use lance_core::Result;
use std::sync::Arc;

pub(crate) async fn to_multi_base(args: &LanceTableToMultiBaseArgs) -> Result<()> {
let dataset = Arc::new(Dataset::open(&args.source).await?);
let result = dataset
.to_multi_base(args.additional_base.clone(), None)
.await?;

let n_bases = result.manifest.base_paths.len();
let n_frags = result.fragments().len();

// Print a brief summary to stdout.
println!(
"Converted '{}' to multi-base: {} fragments distributed across {} additional base(s).",
args.source, n_frags, n_bases,
);
Ok(())
}
92 changes: 91 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,10 @@
new_bases: Vec<lance_table::format::BasePath>,
transaction_properties: Option<HashMap<String, String>>,
) -> Result<Self> {
let operation = Operation::UpdateBases { new_bases };
let operation = Operation::UpdateBases {
new_bases,
fragment_assignments: vec![],
};

let transaction = TransactionBuilder::new(self.manifest.version, operation)
.transaction_properties(transaction_properties.map(Arc::new))
Expand All @@ -1646,6 +1649,93 @@
Ok(new_dataset)
}

/// Convert a single-base dataset into a multi-base dataset.
///
/// This is a **metadata-only** operation. The caller must have already
/// copied the dataset directory to each additional URI (e.g. with `azcopy`
/// or `gsutil rsync`) before calling this method.
///
/// Existing fragments are distributed round-robin across the original base
/// and each additional base, giving each base approximately
/// `total_fragments / (1 + additional_uris.len())` fragments.
///
/// # Arguments
/// * `additional_uris` – Full URIs of the additional dataset copies, one per
/// new base (e.g. `"az://container2/path"`, `"/local/copy"`).
/// * `transaction_properties` – Optional key-value metadata forwarded to the
/// commit transaction.
///
/// # Returns
/// A new `Dataset` handle pointing at the updated manifest.
pub async fn to_multi_base(
self: &Arc<Self>,
additional_uris: Vec<String>,
transaction_properties: Option<HashMap<String, String>>,
) -> Result<Self> {
if additional_uris.is_empty() {
return Err(Error::invalid_input(
"to_multi_base requires at least one additional URI".to_string(),
));
}

// Choose IDs for the new bases, starting above the current maximum.
let start_id: u32 = self
.manifest
.base_paths
.keys()
.max()
.map(|&m| m + 1)
.unwrap_or(1);

let new_bases: Vec<lance_table::format::BasePath> = additional_uris
.iter()
.enumerate()
.map(|(i, uri)| {
lance_table::format::BasePath::new(
start_id + i as u32,
uri.clone(),
None,
true, // is_dataset_root: data lives under <uri>/data/
)
})
.collect();

// Distribute fragments round-robin.
// slot 0 → keep default base (no base_id change)
// slot 1 → new base with id `start_id`
// slot 2 → new base with id `start_id + 1`
// …
let n_slots = (additional_uris.len() + 1) as u64;
let fragment_assignments: Vec<lance_table::format::FragmentBaseAssignment> = self
.fragments()
.iter()
.filter_map(|frag| {
let slot = frag.id % n_slots;
if slot == 0 {
None // stays on the original/default base
} else {
Some(lance_table::format::FragmentBaseAssignment {
fragment_id: frag.id,
base_id: Some(start_id + (slot - 1) as u32),
})
}
})
.collect();

let operation = Operation::UpdateBases {
new_bases,
fragment_assignments,
};

let transaction = TransactionBuilder::new(self.manifest.version, operation)
.transaction_properties(transaction_properties.map(Arc::new))

Check warning on line 1731 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset.rs
.build();

CommitBuilder::new(self.clone())
.execute(transaction)
.await
}

pub async fn count_deleted_rows(&self) -> Result<usize> {
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.count_deletions().await })
Expand Down
Loading
Loading