From c71f2a1578d2f50a2788081707aaa978f927e442 Mon Sep 17 00:00:00 2001 From: Beinan Wang Date: Mon, 1 Jun 2026 13:54:11 -0700 Subject: [PATCH] feat: add distributed ZONEMAP index support and query optimization Enables the ZONEMAP scalar index type to be utilized for optimized point lookup scans in `daft-lance` and Daft. Added test cases to cover creating, querying, and verifying ZONEMAP indices. Co-Authored-By: Beinan Wang --- daft_lance/lance_scan.py | 15 ++-- tests/io/lancedb/test_lancedb_scalar_index.py | 74 +++++++++++++++++++ 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/daft_lance/lance_scan.py b/daft_lance/lance_scan.py index 5fd3878..de3349f 100644 --- a/daft_lance/lance_scan.py +++ b/daft_lance/lance_scan.py @@ -409,7 +409,7 @@ def _compute_limit_pushdown_with_filter(self, pushdowns: PyPushdowns) -> int | N return pushdowns.limit def _should_use_index_for_point_lookup(self) -> bool: - """Use index-driven scan only when all point-lookup columns have BTREE. + """Use index-driven scan only when all point-lookup columns have BTREE or ZONEMAP. Otherwise fall back to fragment enumeration. Passing fragment_ids=None signals index-driven scan; factory omits fragments so Lance selects them using indices. @@ -439,16 +439,17 @@ def _should_use_index_for_point_lookup(self) -> bool: if not indices: return False - # Decision: point-lookup uses index only if each column in the predicate has a BTREE index. + # Decision: point-lookup uses index only if each column in the predicate has a BTREE or ZONEMAP index. # Rationale: avoid partial/non-exact indices (e.g., bitmap/bloom) and Lance lacks composite-prefix semantics. - btree_indexed_columns: set[str] = set() + scalar_indexed_columns: set[str] = set() for index in indices: - if str(index.index_type or "").upper() != "BTREE": + idx_type = str(index.index_type or "").upper() + if idx_type not in ("BTREE", "ZONEMAP"): continue for field in index.field_names or (): - btree_indexed_columns.add(field) - # Use index-driven scan only if every point-lookup column has a BTREE index. - if point_column_set and point_column_set.issubset(btree_indexed_columns): + scalar_indexed_columns.add(field) + # Use index-driven scan only if every point-lookup column has a BTREE or ZONEMAP index. + if point_column_set and point_column_set.issubset(scalar_indexed_columns): return True return False diff --git a/tests/io/lancedb/test_lancedb_scalar_index.py b/tests/io/lancedb/test_lancedb_scalar_index.py index 6548350..4de1836 100644 --- a/tests/io/lancedb/test_lancedb_scalar_index.py +++ b/tests/io/lancedb/test_lancedb_scalar_index.py @@ -522,3 +522,77 @@ def test_build_distributed_index_no_fragments(self, temp_dir): updated_dataset = lance.dataset(path) indices = updated_dataset.list_indices() assert len(indices) == 0, f"Expected no indices for empty dataset, got {len(indices)}" + + def test_build_distributed_index_zonemap_type(self, temp_dir): + """Test building ZONEMAP index on numeric column (falls back to single-threaded).""" + data = { + "id": [1, 2, 3, 4, 5, 6, 7, 8], + "price": [10.5, 20.75, 30.0, 40.25, 50.5, 60.75, 70.0, 80.25], + "name": ["item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8"], + } + dataset = daft.from_pydict(data) + path = Path(temp_dir) / "zonemap_test.lance" + dataset.write_lance(uri=path, max_rows_per_file=2) + + # ZONEMAP is not supported by merge_index_metadata, so it falls back + # to single-threaded creation via Lance's create_scalar_index. + create_scalar_index( + uri=path, + column="price", + index_type="ZONEMAP", + name="price_zonemap_index", + ) + + updated_dataset = lance.dataset(path) + indices = updated_dataset.list_indices() + assert len(indices) > 0, "No indices found after building" + index_names = [idx["name"] for idx in indices] + assert "price_zonemap_index" in index_names, f"ZONEMAP index not found in {index_names}" + + # Test that we can query using the index + results = updated_dataset.scanner( + filter="price > 30.0", + columns=["id", "price", "name"], + ).to_table() + assert results.num_rows > 0, "No results found for ZONEMAP index query" + + def test_build_distributed_index_zonemap_integer_column(self, temp_dir): + """Test building ZONEMAP index on integer column (falls back to single-threaded).""" + data = { + "id": [1, 2, 3, 4, 5, 6, 7, 8], + "score": [100, 200, 300, 400, 500, 600, 700, 800], + } + dataset = daft.from_pydict(data) + path = Path(temp_dir) / "zonemap_int_test.lance" + dataset.write_lance(uri=path, max_rows_per_file=2) + + create_scalar_index( + uri=path, + column="score", + index_type="ZONEMAP", + name="score_zonemap_index", + ) + + updated_dataset = lance.dataset(path) + indices = updated_dataset.list_indices() + index_names = [idx["name"] for idx in indices] + assert "score_zonemap_index" in index_names, f"ZONEMAP index not found in {index_names}" + + def test_build_distributed_index_zonemap_invalid_string_column(self, temp_dir): + """Test that ZONEMAP index rejects string columns via Lance.""" + data = { + "id": [1, 2, 3, 4], + "text": ["a", "b", "c", "d"], + } + dataset = daft.from_pydict(data) + path = Path(temp_dir) / "zonemap_string_test.lance" + dataset.write_lance(uri=path, max_rows_per_file=2) + + # ZONEMAP falls back to single-threaded Lance, which will reject + # unsupported column types at the Lance level. + with pytest.raises(Exception): + create_scalar_index( + uri=path, + column="text", + index_type="ZONEMAP", + )