From 2bb570e72c94de67b95db89ae8baeb6562339001 Mon Sep 17 00:00:00 2001 From: Beinan Wang Date: Fri, 5 Jun 2026 14:33:52 -0700 Subject: [PATCH 1/2] feat: integrate MemWAL with deterministic sharding in daft-lance Integrates Lance's log-structured ingestion framework (MemWAL) with daft-lance to enable high-throughput parallel writes. When enabled, each write task shards records deterministically by writing to a unique Memory Write-Ahead Log region. Post-write, a distributed compaction phase commits WAL records to standard Copy-on-Write (COW) fragments. Co-Authored-By: Beinan Wang --- daft_lance/lance_data_sink.py | 87 ++++++ pyproject.toml | 2 +- tests/io/lancedb/test_mem_wal_writes.py | 334 ++++++++++++++++++++++++ 3 files changed, 422 insertions(+), 1 deletion(-) create mode 100644 tests/io/lancedb/test_mem_wal_writes.py diff --git a/daft_lance/lance_data_sink.py b/daft_lance/lance_data_sink.py index 37f7b55..2c82424 100644 --- a/daft_lance/lance_data_sink.py +++ b/daft_lance/lance_data_sink.py @@ -1,6 +1,8 @@ from __future__ import annotations +import logging import pathlib +import uuid import warnings from itertools import chain from typing import TYPE_CHECKING, Literal @@ -30,6 +32,8 @@ from daft.daft import IOConfig +logger = logging.getLogger(__name__) + class LanceDataSink(DataSink[list[FragmentMetadata]]): """WriteSink for writing data to a Lance dataset.""" @@ -49,6 +53,8 @@ def __init__( use_legacy_format: bool | None = None, enable_stable_row_ids: bool = False, storage_options: dict[str, str] | None = None, + use_mem_wal: bool = False, + compact_after_write: bool = True, ) -> None: self._reject_unsupported_modes(mode, use_legacy_format) if not isinstance(uri, (str, pathlib.Path)): @@ -72,6 +78,11 @@ def __init__( self._pyarrow_schema = self._normalize_schema(schema) self._init_blob_policy(blob_columns) + self._use_mem_wal = use_mem_wal + self._compact_after_write = compact_after_write + self._mem_wal_total_rows: int = 0 + self._mem_wal_total_bytes: int = 0 + self._version: int = 0 self._table_schema: pa.Schema | None = None existing = self._absorb_existing_dataset() @@ -227,8 +238,45 @@ def _write_arrow_table(self, table: pa.Table) -> WriteResult[list[FragmentMetada ) return WriteResult(result=fragments, bytes_written=bytes_written, rows_written=wrapped.num_rows) + def _ensure_mem_wal_dataset(self) -> lance.LanceDataset: + try: + ds = lance.dataset(self._table_uri, storage_options=self._storage_options) + except (ValueError, FileNotFoundError, OSError): + ds = None + + if ds is None: + ds = lance.write_dataset( + pa.table({f.name: pa.array([], type=f.type) for f in self._effective_pyarrow_schema}, + schema=self._effective_pyarrow_schema), + self._table_uri, + mode="create", + storage_options=self._storage_options, + data_storage_version=self._data_storage_version, + use_legacy_format=self._use_legacy_format, + ) + + details = ds.mem_wal_index_details() + if details is None or details.get("num_shards", -1) < 0: + ds.initialize_mem_wal(unsharded=True) + + return ds + + def _write_arrow_table_mem_wal(self, table: pa.Table, ds: lance.LanceDataset) -> WriteResult[list[FragmentMetadata]]: + shard_id = str(uuid.uuid4()) + with ds.mem_wal_writer(shard_id) as writer: + writer.put(table) + stats = writer.stats() + bytes_written = stats.get("wal_flush_bytes", 0) + return WriteResult(result=[], bytes_written=bytes_written, rows_written=table.num_rows) + def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[list[FragmentMetadata]]]: """Writes fragments from the given micropartitions.""" + if self._use_mem_wal: + yield from self._write_mem_wal(micropartitions) + else: + yield from self._write_cow(micropartitions) + + def _write_cow(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[list[FragmentMetadata]]]: buffer = _LanceFragmentBuffer( max_rows=self._max_rows_per_file, max_bytes=self._max_bytes_per_file, @@ -251,8 +299,23 @@ def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResu if buffer.has_rows(): yield self._write_arrow_table(buffer.drain()) + def _write_mem_wal(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[list[FragmentMetadata]]]: + ds = self._ensure_mem_wal_dataset() + for micropartition in micropartitions: + arrow_table = self._prepare_arrow_table(micropartition.to_arrow()) + wrapped = self._blob.wrap_table(arrow_table) + result = self._write_arrow_table_mem_wal(wrapped, ds) + self._mem_wal_total_rows += wrapped.num_rows + self._mem_wal_total_bytes += result.bytes_written + yield result + def finalize(self, write_results: list[WriteResult[list[FragmentMetadata]]]) -> MicroPartition: """Commits the fragments to the Lance dataset. Returns a DataFrame with the stats of the dataset.""" + if self._use_mem_wal: + return self._finalize_mem_wal(write_results) + return self._finalize_cow(write_results) + + def _finalize_cow(self, write_results: list[WriteResult[list[FragmentMetadata]]]) -> MicroPartition: fragments = list(chain.from_iterable(write_result.result for write_result in write_results)) operation: lance.LanceOperation.BaseOperation @@ -278,6 +341,30 @@ def finalize(self, write_results: list[WriteResult[list[FragmentMetadata]]]) -> ) return stats_dict + def _finalize_mem_wal(self, write_results: list[WriteResult[list[FragmentMetadata]]]) -> MicroPartition: + dataset = lance.dataset(self._table_uri, storage_options=self._storage_options) + + if self._compact_after_write: + logger.info( + "MemWAL write complete (%d rows, %d bytes). Running compaction.", + self._mem_wal_total_rows, + self._mem_wal_total_bytes, + ) + from daft_lance.lance_compaction import compact_files_internal + + compact_files_internal(dataset) + dataset = lance.dataset(self._table_uri, storage_options=self._storage_options) + + stats = dataset.stats.dataset_stats() + return MicroPartition.from_pydict( + { + "num_fragments": pa.array([stats["num_fragments"]], type=pa.int64()), + "num_deleted_rows": pa.array([stats["num_deleted_rows"]], type=pa.int64()), + "num_small_files": pa.array([stats["num_small_files"]], type=pa.int64()), + "version": pa.array([dataset.version], type=pa.int64()), + } + ) + class _LanceFragmentBuffer: """Accumulates pyarrow tables until a row-count or byte-size threshold is hit.""" diff --git a/pyproject.toml b/pyproject.toml index c2dcdc4..35779b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" dependencies = [ "lance-namespace>=0.6.0", "lance-namespace-urllib3-client>=0.6.0", - "pylance>=6.0.0" + "pylance>=7.0.0" ] [dependency-groups] diff --git a/tests/io/lancedb/test_mem_wal_writes.py b/tests/io/lancedb/test_mem_wal_writes.py new file mode 100644 index 0000000..f0db7e8 --- /dev/null +++ b/tests/io/lancedb/test_mem_wal_writes.py @@ -0,0 +1,334 @@ +from __future__ import annotations + +import os + +import lance +import pyarrow as pa +import pytest + +import daft +from daft_lance.lance_data_sink import LanceDataSink + + +@pytest.fixture(scope="function") +def lance_dataset_path(tmp_path_factory): + tmp_dir = tmp_path_factory.mktemp("lance_mem_wal") + yield str(tmp_dir) + + +data_simple = { + "id": [1, 2, 3], + "value": [10.0, 20.0, 30.0], +} + + +class TestMemWalSinkConstruction: + def test_defaults(self, lance_dataset_path): + schema = pa.schema([("id", pa.int64()), ("value", pa.float64())]) + sink = LanceDataSink(uri=lance_dataset_path, schema=schema, mode="create") + assert sink._use_mem_wal is False + assert sink._compact_after_write is True + + def test_use_mem_wal_flag(self, lance_dataset_path): + schema = pa.schema([("id", pa.int64()), ("value", pa.float64())]) + sink = LanceDataSink( + uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True + ) + assert sink._use_mem_wal is True + assert sink._compact_after_write is True + + def test_compact_after_write_flag(self, lance_dataset_path): + schema = pa.schema([("id", pa.int64()), ("value", pa.float64())]) + sink = LanceDataSink( + uri=lance_dataset_path, + schema=schema, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + assert sink._use_mem_wal is True + assert sink._compact_after_write is False + + +class TestMemWalCreatePath: + def test_create_writes_to_wal_directory(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create", use_mem_wal=True) + + mem_wal_dir = os.path.join(lance_dataset_path, "_mem_wal") + assert os.path.isdir(mem_wal_dir) + + def test_create_dataset_exists_after_write(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create", use_mem_wal=True) + + ds = lance.dataset(lance_dataset_path) + assert ds is not None + assert ds.schema.names == ["id", "value"] + + def test_create_mem_wal_initialized(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create", use_mem_wal=True) + + ds = lance.dataset(lance_dataset_path) + details = ds.mem_wal_index_details() + assert details is not None + + def test_create_returns_stats(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + result = df.write_lance( + lance_dataset_path, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + stats = result.to_pydict() + assert "num_fragments" in stats + assert "version" in stats + assert stats["version"][0] >= 1 + + +class TestMemWalAppendPath: + def test_append_to_existing_dataset(self, lance_dataset_path): + df1 = daft.from_pydict({"id": [1, 2], "value": [10.0, 20.0]}) + df1.write_lance(lance_dataset_path, mode="create") + + df2 = daft.from_pydict({"id": [3, 4], "value": [30.0, 40.0]}) + df2.write_lance( + lance_dataset_path, + mode="append", + use_mem_wal=True, + compact_after_write=False, + ) + + ds = lance.dataset(lance_dataset_path) + assert ds.mem_wal_index_details() is not None + + def test_append_initializes_mem_wal_on_existing(self, lance_dataset_path): + df1 = daft.from_pydict(data_simple) + df1.write_lance(lance_dataset_path, mode="create") + + ds_before = lance.dataset(lance_dataset_path) + assert ds_before.mem_wal_index_details() is None + + df2 = daft.from_pydict(data_simple) + df2.write_lance( + lance_dataset_path, + mode="append", + use_mem_wal=True, + compact_after_write=False, + ) + + ds_after = lance.dataset(lance_dataset_path) + assert ds_after.mem_wal_index_details() is not None + + +class TestMemWalDeterministicSharding: + def test_each_micropartition_gets_unique_shard(self, lance_dataset_path): + df = daft.from_pydict( + {"id": list(range(100)), "value": [float(x) for x in range(100)]} + ).repartition(4) + df.write_lance( + lance_dataset_path, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + + mem_wal_dir = os.path.join(lance_dataset_path, "_mem_wal") + assert os.path.isdir(mem_wal_dir) + shard_dirs = [ + d + for d in os.listdir(mem_wal_dir) + if os.path.isdir(os.path.join(mem_wal_dir, d)) + ] + assert len(shard_dirs) >= 1 + + def test_parallel_shards_no_contention(self, lance_dataset_path): + df = daft.from_pydict( + {"id": list(range(1000)), "value": [float(x) for x in range(1000)]} + ).repartition(8) + result = df.write_lance( + lance_dataset_path, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + stats = result.to_pydict() + assert stats["version"][0] >= 1 + + +class TestMemWalCompactAfterWrite: + def test_compact_after_write_true(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + result = df.write_lance( + lance_dataset_path, mode="create", use_mem_wal=True, compact_after_write=True + ) + stats = result.to_pydict() + assert "num_fragments" in stats + assert "version" in stats + + def test_compact_after_write_false(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + result = df.write_lance( + lance_dataset_path, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + stats = result.to_pydict() + assert "num_fragments" in stats + + def test_cow_data_visible_after_standard_write(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create") + + ds = lance.dataset(lance_dataset_path) + assert ds.count_rows() == 3 + loaded = ds.to_table().to_pydict() + assert sorted(loaded["id"]) == [1, 2, 3] + + +class TestMemWalMultipleWrites: + def test_multiple_sequential_writes(self, lance_dataset_path): + df1 = daft.from_pydict({"id": [1, 2], "value": [10.0, 20.0]}) + df1.write_lance( + lance_dataset_path, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + + df2 = daft.from_pydict({"id": [3, 4], "value": [30.0, 40.0]}) + df2.write_lance( + lance_dataset_path, + mode="append", + use_mem_wal=True, + compact_after_write=False, + ) + + ds = lance.dataset(lance_dataset_path) + details = ds.mem_wal_index_details() + assert details is not None + + +class TestMemWalEnsureDataset: + def test_ensure_creates_dataset_if_missing(self, lance_dataset_path): + target = os.path.join(lance_dataset_path, "new_ds") + schema = pa.schema([("a", pa.int64())]) + sink = LanceDataSink( + uri=target, schema=schema, mode="create", use_mem_wal=True + ) + ds = sink._ensure_mem_wal_dataset() + assert ds is not None + assert ds.mem_wal_index_details() is not None + + def test_ensure_reuses_existing_dataset(self, lance_dataset_path): + schema = pa.schema([("a", pa.int64())]) + lance.write_dataset(pa.table({"a": [1]}, schema=schema), lance_dataset_path) + + sink = LanceDataSink( + uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True + ) + ds = sink._ensure_mem_wal_dataset() + assert ds is not None + assert ds.mem_wal_index_details() is not None + + def test_ensure_idempotent_initialization(self, lance_dataset_path): + schema = pa.schema([("a", pa.int64())]) + lance.write_dataset(pa.table({"a": [1]}, schema=schema), lance_dataset_path) + + ds = lance.dataset(lance_dataset_path) + ds.initialize_mem_wal(unsharded=True) + + sink = LanceDataSink( + uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True + ) + ds2 = sink._ensure_mem_wal_dataset() + assert ds2.mem_wal_index_details() is not None + + +class TestMemWalWriteResult: + def test_write_result_has_empty_fragments(self, lance_dataset_path): + from daft.recordbatch import MicroPartition + + schema = pa.schema([("a", pa.int64())]) + sink = LanceDataSink( + uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True + ) + mp = MicroPartition.from_pydict({"a": [1, 2, 3]}) + results = list(sink.write(iter([mp]))) + assert len(results) == 1 + assert results[0].result == [] + assert results[0].rows_written == 3 + assert results[0].bytes_written >= 0 + + def test_finalize_mem_wal_returns_stats(self, lance_dataset_path): + from daft.io.sink import WriteResult + from daft.recordbatch import MicroPartition + + schema = pa.schema([("a", pa.int64())]) + sink = LanceDataSink( + uri=lance_dataset_path, + schema=schema, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + mp = MicroPartition.from_pydict({"a": [1, 2, 3]}) + results = list(sink.write(iter([mp]))) + stats_mp = sink.finalize(results) + stats = stats_mp.to_pydict() + assert "num_fragments" in stats + assert "num_deleted_rows" in stats + assert "num_small_files" in stats + assert "version" in stats + + +class TestMemWalSchemaPreservation: + def test_schema_types_preserved(self, lance_dataset_path): + schema = pa.schema( + [ + pa.field("int_col", pa.int64()), + pa.field("float_col", pa.float64()), + pa.field("str_col", pa.large_string()), + ] + ) + df = daft.from_pydict( + {"int_col": [1, 2], "float_col": [1.5, 2.5], "str_col": ["a", "b"]} + ) + df.write_lance( + lance_dataset_path, + schema=schema, + mode="create", + use_mem_wal=True, + compact_after_write=False, + ) + + ds = lance.dataset(lance_dataset_path) + ds_schema = ds.schema + assert ds_schema.field("int_col").type == pa.int64() + assert ds_schema.field("float_col").type == pa.float64() + assert ds_schema.field("str_col").type == pa.large_string() + + +class TestMemWalCowFallback: + def test_cow_path_unaffected(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create", use_mem_wal=False) + + ds = lance.dataset(lance_dataset_path) + assert ds.count_rows() == 3 + loaded = ds.to_table().to_pydict() + assert sorted(loaded["id"]) == [1, 2, 3] + + mem_wal_dir = os.path.join(lance_dataset_path, "_mem_wal") + assert not os.path.exists(mem_wal_dir) + + def test_default_is_cow(self, lance_dataset_path): + df = daft.from_pydict(data_simple) + df.write_lance(lance_dataset_path, mode="create") + + ds = lance.dataset(lance_dataset_path) + assert ds.count_rows() == 3 + assert ds.mem_wal_index_details() is None From 60fa6aa2df612a56c99ce4be06a0f7d74868b3bb Mon Sep 17 00:00:00 2001 From: Beinan Wang Date: Fri, 5 Jun 2026 14:49:02 -0700 Subject: [PATCH 2/2] style: run pre-commit to fix formatting and unused imports Addresses linting/styling issues reported by CI pre-commit checks: - Removes unused WriteResult import in test_mem_wal_writes.py - Re-formats multi-line arrays and parameters using black/ruff style guidelines - Formats uv.lock metadata Co-Authored-By: Beinan Wang --- daft_lance/lance_data_sink.py | 14 +++++--- tests/io/lancedb/test_mem_wal_writes.py | 43 ++++++------------------- uv.lock | 28 ++++++++-------- 3 files changed, 34 insertions(+), 51 deletions(-) diff --git a/daft_lance/lance_data_sink.py b/daft_lance/lance_data_sink.py index 2c82424..62c36bb 100644 --- a/daft_lance/lance_data_sink.py +++ b/daft_lance/lance_data_sink.py @@ -246,8 +246,10 @@ def _ensure_mem_wal_dataset(self) -> lance.LanceDataset: if ds is None: ds = lance.write_dataset( - pa.table({f.name: pa.array([], type=f.type) for f in self._effective_pyarrow_schema}, - schema=self._effective_pyarrow_schema), + pa.table( + {f.name: pa.array([], type=f.type) for f in self._effective_pyarrow_schema}, + schema=self._effective_pyarrow_schema, + ), self._table_uri, mode="create", storage_options=self._storage_options, @@ -261,7 +263,9 @@ def _ensure_mem_wal_dataset(self) -> lance.LanceDataset: return ds - def _write_arrow_table_mem_wal(self, table: pa.Table, ds: lance.LanceDataset) -> WriteResult[list[FragmentMetadata]]: + def _write_arrow_table_mem_wal( + self, table: pa.Table, ds: lance.LanceDataset + ) -> WriteResult[list[FragmentMetadata]]: shard_id = str(uuid.uuid4()) with ds.mem_wal_writer(shard_id) as writer: writer.put(table) @@ -299,7 +303,9 @@ def _write_cow(self, micropartitions: Iterator[MicroPartition]) -> Iterator[Writ if buffer.has_rows(): yield self._write_arrow_table(buffer.drain()) - def _write_mem_wal(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[list[FragmentMetadata]]]: + def _write_mem_wal( + self, micropartitions: Iterator[MicroPartition] + ) -> Iterator[WriteResult[list[FragmentMetadata]]]: ds = self._ensure_mem_wal_dataset() for micropartition in micropartitions: arrow_table = self._prepare_arrow_table(micropartition.to_arrow()) diff --git a/tests/io/lancedb/test_mem_wal_writes.py b/tests/io/lancedb/test_mem_wal_writes.py index f0db7e8..dd51b80 100644 --- a/tests/io/lancedb/test_mem_wal_writes.py +++ b/tests/io/lancedb/test_mem_wal_writes.py @@ -31,9 +31,7 @@ def test_defaults(self, lance_dataset_path): def test_use_mem_wal_flag(self, lance_dataset_path): schema = pa.schema([("id", pa.int64()), ("value", pa.float64())]) - sink = LanceDataSink( - uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True - ) + sink = LanceDataSink(uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True) assert sink._use_mem_wal is True assert sink._compact_after_write is True @@ -125,9 +123,7 @@ def test_append_initializes_mem_wal_on_existing(self, lance_dataset_path): class TestMemWalDeterministicSharding: def test_each_micropartition_gets_unique_shard(self, lance_dataset_path): - df = daft.from_pydict( - {"id": list(range(100)), "value": [float(x) for x in range(100)]} - ).repartition(4) + df = daft.from_pydict({"id": list(range(100)), "value": [float(x) for x in range(100)]}).repartition(4) df.write_lance( lance_dataset_path, mode="create", @@ -137,17 +133,11 @@ def test_each_micropartition_gets_unique_shard(self, lance_dataset_path): mem_wal_dir = os.path.join(lance_dataset_path, "_mem_wal") assert os.path.isdir(mem_wal_dir) - shard_dirs = [ - d - for d in os.listdir(mem_wal_dir) - if os.path.isdir(os.path.join(mem_wal_dir, d)) - ] + shard_dirs = [d for d in os.listdir(mem_wal_dir) if os.path.isdir(os.path.join(mem_wal_dir, d))] assert len(shard_dirs) >= 1 def test_parallel_shards_no_contention(self, lance_dataset_path): - df = daft.from_pydict( - {"id": list(range(1000)), "value": [float(x) for x in range(1000)]} - ).repartition(8) + df = daft.from_pydict({"id": list(range(1000)), "value": [float(x) for x in range(1000)]}).repartition(8) result = df.write_lance( lance_dataset_path, mode="create", @@ -161,9 +151,7 @@ def test_parallel_shards_no_contention(self, lance_dataset_path): class TestMemWalCompactAfterWrite: def test_compact_after_write_true(self, lance_dataset_path): df = daft.from_pydict(data_simple) - result = df.write_lance( - lance_dataset_path, mode="create", use_mem_wal=True, compact_after_write=True - ) + result = df.write_lance(lance_dataset_path, mode="create", use_mem_wal=True, compact_after_write=True) stats = result.to_pydict() assert "num_fragments" in stats assert "version" in stats @@ -216,9 +204,7 @@ class TestMemWalEnsureDataset: def test_ensure_creates_dataset_if_missing(self, lance_dataset_path): target = os.path.join(lance_dataset_path, "new_ds") schema = pa.schema([("a", pa.int64())]) - sink = LanceDataSink( - uri=target, schema=schema, mode="create", use_mem_wal=True - ) + sink = LanceDataSink(uri=target, schema=schema, mode="create", use_mem_wal=True) ds = sink._ensure_mem_wal_dataset() assert ds is not None assert ds.mem_wal_index_details() is not None @@ -227,9 +213,7 @@ def test_ensure_reuses_existing_dataset(self, lance_dataset_path): schema = pa.schema([("a", pa.int64())]) lance.write_dataset(pa.table({"a": [1]}, schema=schema), lance_dataset_path) - sink = LanceDataSink( - uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True - ) + sink = LanceDataSink(uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True) ds = sink._ensure_mem_wal_dataset() assert ds is not None assert ds.mem_wal_index_details() is not None @@ -241,9 +225,7 @@ def test_ensure_idempotent_initialization(self, lance_dataset_path): ds = lance.dataset(lance_dataset_path) ds.initialize_mem_wal(unsharded=True) - sink = LanceDataSink( - uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True - ) + sink = LanceDataSink(uri=lance_dataset_path, schema=schema, mode="append", use_mem_wal=True) ds2 = sink._ensure_mem_wal_dataset() assert ds2.mem_wal_index_details() is not None @@ -253,9 +235,7 @@ def test_write_result_has_empty_fragments(self, lance_dataset_path): from daft.recordbatch import MicroPartition schema = pa.schema([("a", pa.int64())]) - sink = LanceDataSink( - uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True - ) + sink = LanceDataSink(uri=lance_dataset_path, schema=schema, mode="create", use_mem_wal=True) mp = MicroPartition.from_pydict({"a": [1, 2, 3]}) results = list(sink.write(iter([mp]))) assert len(results) == 1 @@ -264,7 +244,6 @@ def test_write_result_has_empty_fragments(self, lance_dataset_path): assert results[0].bytes_written >= 0 def test_finalize_mem_wal_returns_stats(self, lance_dataset_path): - from daft.io.sink import WriteResult from daft.recordbatch import MicroPartition schema = pa.schema([("a", pa.int64())]) @@ -294,9 +273,7 @@ def test_schema_types_preserved(self, lance_dataset_path): pa.field("str_col", pa.large_string()), ] ) - df = daft.from_pydict( - {"int_col": [1, 2], "float_col": [1.5, 2.5], "str_col": ["a", "b"]} - ) + df = daft.from_pydict({"int_col": [1, 2], "float_col": [1.5, 2.5], "str_col": ["a", "b"]}) df.write_lance( lance_dataset_path, schema=schema, diff --git a/uv.lock b/uv.lock index 8544765..c7b50fb 100644 --- a/uv.lock +++ b/uv.lock @@ -135,7 +135,7 @@ lint = [ requires-dist = [ {name = "lance-namespace", specifier = ">=0.6.0"}, {name = "lance-namespace-urllib3-client", specifier = ">=0.6.0"}, - {name = "pylance", specifier = ">=6.0.0"} + {name = "pylance", specifier = ">=7.0.0"} ] [package.metadata.requires-dev] @@ -194,19 +194,19 @@ wheels = [ [[package]] name = "lance-namespace" -version = "0.7.6" +version = "0.7.7" source = {registry = "https://pypi.org/simple"} dependencies = [ {name = "lance-namespace-urllib3-client"} ] -sdist = {url = "https://files.pythonhosted.org/packages/2b/da/134670003173881bed44af656badffd91e0b2e0232c083eeacc5923d7335/lance_namespace-0.7.6.tar.gz", hash = "sha256:4e12094005d105ef1b44346c9d7feda4a0f733b127dab90c1a5ffbf7cd433770", size = 10686, upload-time = "2026-05-05T18:26:38.885Z"} +sdist = {url = "https://files.pythonhosted.org/packages/06/5c/9822af615fc1bd3ee1073994696c739aecde377be32435ec3303aed1bc5d/lance_namespace-0.7.7.tar.gz", hash = "sha256:d00b525f2e26993a6c61668e798bca6c808605ab8a79f29f86a1a1af92d91ae2", size = 10754, upload-time = "2026-05-20T17:32:59.45Z"} wheels = [ - {url = "https://files.pythonhosted.org/packages/83/88/44463a5f41f7077b2ea641f2afded72eaceb6a6a1b4a55c11b22318fed74/lance_namespace-0.7.6-py3-none-any.whl", hash = "sha256:c94a1b8a6aab127e55a20cbf44d927ae3a9b7d435656d2130dccf84ccf7c9999", size = 12519, upload-time = "2026-05-05T18:26:36.425Z"} + {url = "https://files.pythonhosted.org/packages/11/43/186acc1156da20c351db196e2b6241b2453b16dc1b4cc8e0a626667ca471/lance_namespace-0.7.7-py3-none-any.whl", hash = "sha256:477a7ca6b5e1f673a2c9ba52f42d6e8e3ff7c27a601392a21eb90fba98d0309b", size = 12581, upload-time = "2026-05-20T17:32:57.389Z"} ] [[package]] name = "lance-namespace-urllib3-client" -version = "0.7.6" +version = "0.7.7" source = {registry = "https://pypi.org/simple"} dependencies = [ {name = "pydantic"}, @@ -214,9 +214,9 @@ dependencies = [ {name = "typing-extensions"}, {name = "urllib3"} ] -sdist = {url = "https://files.pythonhosted.org/packages/01/44/024aae184c08b3800482cd9b832d534249e25de145af732d4e4c8dff38a8/lance_namespace_urllib3_client-0.7.6.tar.gz", hash = "sha256:15ae7f0d8d56fa34d837f7f6ec5c80a327a905e89ccfed05f7b409d6fe704cdf", size = 195551, upload-time = "2026-05-05T18:26:37.808Z"} +sdist = {url = "https://files.pythonhosted.org/packages/07/95/38ab81ccc1e09beeecd8ddfc61b8bc73831dc5053db1e3f9021f64a4896b/lance_namespace_urllib3_client-0.7.7.tar.gz", hash = "sha256:4d8c066628c17c6a10cf643b51a7f7ae1bfb8a614d9cc54a5af38a4ba2b4b102", size = 202930, upload-time = "2026-05-20T17:32:58.308Z"} wheels = [ - {url = "https://files.pythonhosted.org/packages/00/50/60c983cc8180772c82370dfad2104b7e788aaacc3bf9a84e8b42bb1ae6a7/lance_namespace_urllib3_client-0.7.6-py3-none-any.whl", hash = "sha256:fb884d8afff8af3aae04a3270624694a189d7ea79225dd349e6c555a1a1d6b52", size = 324603, upload-time = "2026-05-05T18:26:39.718Z"} + {url = "https://files.pythonhosted.org/packages/35/96/5483e48e40433b1d078183c15a92c99e59a156041b0260e7f18ee34e7c08/lance_namespace_urllib3_client-0.7.7-py3-none-any.whl", hash = "sha256:9221c3e00fd89f0c811953d94b32d2ea527765280460a174f5872dc8a74c0ed6", size = 334767, upload-time = "2026-05-20T17:32:55.883Z"} ] [[package]] @@ -899,7 +899,7 @@ wheels = [ [[package]] name = "pylance" -version = "6.0.0" +version = "7.0.0" source = {registry = "https://pypi.org/simple"} dependencies = [ {name = "lance-namespace"}, @@ -908,12 +908,12 @@ dependencies = [ {name = "pyarrow"} ] wheels = [ - {url = "https://files.pythonhosted.org/packages/16/b1/cb6aeb437cdf91cce19046a16306bafa497bfbe77afe042c8fbd20bdffb5/pylance-6.0.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:5e2a5bd8f585c28b0a1d3a96eb004402b6145e7f1b91fa762820b2793da1e965", size = 56430685, upload-time = "2026-05-11T18:23:19.099Z"}, - {url = "https://files.pythonhosted.org/packages/e1/71/c708942aba9572e937de63db473d6b386d1cade1874a62ceda24d6c8e8e4/pylance-6.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7a09adbf14f55565a0be257776648825b18093ab44bec4ca41e2fd57828b1519", size = 59481518, upload-time = "2026-05-11T18:23:26.435Z"}, - {url = "https://files.pythonhosted.org/packages/f3/58/2ea559e1c28564a00aa51fb707000358568cd8f61de49ec4f2e32dca4686/pylance-6.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78877b76ae2182ea17987e2b5ca939ca4fce8fe1f7984b42ca4a4b7c06ecb49c", size = 63945086, upload-time = "2026-05-11T18:35:30.09Z"}, - {url = "https://files.pythonhosted.org/packages/4e/52/fb0bae5175157e2ec9e0d5a0470bb24d9f4d66bf9ec70cd7bb5a123e0576/pylance-6.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:ff09200172e1680be6e364a791e6561f553611019daa375c00ea8b0a84dfbb55", size = 59472473, upload-time = "2026-05-11T18:23:25.737Z"}, - {url = "https://files.pythonhosted.org/packages/00/0b/cd86cb7544ae815cf33195220cb83a958e87cb92717753e5a36bc128c9f0/pylance-6.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:7d225554a9c999dc14fdfa217f8255a81a05f6a5420fa245ba443b1e22ead879", size = 63927567, upload-time = "2026-05-11T18:35:37.481Z"}, - {url = "https://files.pythonhosted.org/packages/20/16/a00db2cdd7e59b95a803f130f1f29e84f02526100fa4e77b071bf89a8847/pylance-6.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:588e15b7c7562d2c10cc7a153fef15ac29f76e41ca33d38e41c5b4f55203c1c8", size = 68653887, upload-time = "2026-05-11T18:38:02.927Z"} + {url = "https://files.pythonhosted.org/packages/ac/ad/2f64921bf346e7075aef24a72595db44821724a3d89a9a92dd24e79632aa/pylance-7.0.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:98422021975be76e72b1572f41b8c9abb3bee5bdc9bfa5e9ce731110a65ed4d1", size = 62134146, upload-time = "2026-05-27T21:59:37.459Z"}, + {url = "https://files.pythonhosted.org/packages/73/1c/c5a01bee0160b55d9a98895cbd33091d038f0a0995b121ab72e629008d02/pylance-7.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4bec86ee5b6fbd8bfc493e653f0a1fba0303cfe5492b9b46fc25ab908edc7183", size = 65373684, upload-time = "2026-05-27T22:04:01.584Z"}, + {url = "https://files.pythonhosted.org/packages/eb/da/1fe8b8f7dbfe734d76af76acc994fc360a0d0c79a4874ef69f5a72a58fe3/pylance-7.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:881491432c53184e52f8d1db8d5f872f39a03f36fb104bec77b33d379519d8b5", size = 69458555, upload-time = "2026-05-27T22:16:50.567Z"}, + {url = "https://files.pythonhosted.org/packages/76/f0/dd505cf3fd0226ab9d94759acd713125af1d3bfacfd80bbd52e3b9f89509/pylance-7.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:18453999e7fff4f76b16d6b7882c9df0628bd142ff95e2461bd7dd5ee3fe0af3", size = 65394430, upload-time = "2026-05-27T22:05:30.923Z"}, + {url = "https://files.pythonhosted.org/packages/17/ba/2357b81034f28eb00790e258ed140289a6a887a7468ca9df6349fd186b27/pylance-7.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:04a58051d408c60fe76d41a220dcaf8fea8fb6d1aa0ca78a709b60bc3cc8d19a", size = 69473470, upload-time = "2026-05-27T22:17:18.935Z"}, + {url = "https://files.pythonhosted.org/packages/1f/ec/5c00b6303a67d787f9475141832cbdc513d674ac3dcaeef8a7b169905e65/pylance-7.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:467d4864af047eaab4e1370e2f1e88e2c6f507c079874421116cb41d78bc3629", size = 74792863, upload-time = "2026-05-27T22:19:23.875Z"} ] [[package]]