From 24545db01015eedaac354a4ade864034e63a4c5a Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Sun, 19 Apr 2026 16:07:44 +0000 Subject: [PATCH] Core: Coalesce consecutive position deletes into range inserts Add PositionDeleteRangeConsumer that coalesces runs of consecutive positions into a single delete(start, end) call, and use it from Deletes.toPositionIndex() so sorted position delete files are inserted into the bitmap as ranges instead of one position at a time. --- .../org/apache/iceberg/deletes/Deletes.java | 2 +- .../deletes/PositionDeleteRangeConsumer.java | 118 ++++++++ .../TestPositionDeleteRangeConsumer.java | 275 ++++++++++++++++++ 3 files changed, 394 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java create mode 100644 core/src/test/java/org/apache/iceberg/deletes/TestPositionDeleteRangeConsumer.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 46df91982ab7..353d50d13ed6 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -177,7 +177,7 @@ private static PositionDeleteIndex toPositionIndex( CloseableIterable posDeletes, List files) { try (CloseableIterable deletes = posDeletes) { PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(files); - deletes.forEach(positionDeleteIndex::delete); + PositionDeleteRangeConsumer.forEach(deletes, positionDeleteIndex); return positionDeleteIndex; } catch (IOException e) { throw new UncheckedIOException("Failed to close position delete source", e); diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java new file mode 100644 index 000000000000..9975986acf70 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import java.util.Iterator; + +/** + * Coalesces consecutive position deletes into range deletes. + * + *

Consecutive positions (e.g. 3, 4, 5, 6) are accumulated and flushed as a single {@link + * PositionDeleteIndex#delete(long, long)} range call instead of individual point deletes. + * + *

The first {@code SNIFF_SIZE} positions are processed on the coalesce path while counting + * boundaries -- adjacent pairs where {@code pos[i] - pos[i-1] != 1}. If the observed boundary + * density exceeds {@code BOUNDARY_THRESHOLD_PERCENT}, the remaining stream switches to a plain + * per-position loop equivalent to the original {@code delete(pos)} behavior, avoiding the extra + * compare-and-emit overhead on inputs where coalescing cannot amortize. Inputs shorter than the + * sniff window skip the check and coalesce directly. The sniff is a prefix heuristic: + * misclassifying an input with a dense head and a sparse tail (or vice versa) only costs + * throughput, not correctness -- both paths produce identical index contents. + */ +final class PositionDeleteRangeConsumer { + + // Number of prefix positions inspected to estimate boundary density. + private static final int SNIFF_SIZE = 1024; + + // Boundary density threshold, expressed as a percentage of sniff comparisons. Inputs above + // this threshold switch to per-position delete for the tail. Calibrated so FULL / MEDIUM / + // SHORT / SPARSE_95 (<=20% boundaries) stay on the coalesce path and SPARSE_50 / SPARSE_5 / + // NONE (>=50% boundaries) switch to per-position delete. + private static final int BOUNDARY_THRESHOLD_PERCENT = 30; + + private PositionDeleteRangeConsumer() {} + + static void forEach(Iterable positions, PositionDeleteIndex target) { + Iterator it = positions.iterator(); + if (it.hasNext() && !coalesceOrEscape(target, it)) { + naiveTail(target, it); + } + } + + // Runs the coalesce loop with a prefix-boundary check. The first SNIFF_SIZE positions are + // processed on the coalesce path while counting boundaries; if the observed density exceeds + // the threshold, flushes the active single-element range and returns false so the caller can + // drain the remaining stream per-position. Otherwise processes the entire input, flushes the + // trailing range, and returns true. Caller must ensure the iterator has at least one element. + private static boolean coalesceOrEscape(PositionDeleteIndex target, Iterator it) { + long rangeStart = it.next(); + long lastPosition = rangeStart; + int processed = 1; + int boundaries = 0; + + while (processed < SNIFF_SIZE && it.hasNext()) { + long pos = it.next(); + if (pos - lastPosition != 1) { + boundaries++; + emit(target, rangeStart, lastPosition); + rangeStart = pos; + } + + lastPosition = pos; + processed++; + } + + if (processed == SNIFF_SIZE + && boundaries * 100 > (SNIFF_SIZE - 1) * BOUNDARY_THRESHOLD_PERCENT) { + // adversarial prefix -- flush any pending range and let the caller drain the rest + emit(target, rangeStart, lastPosition); + return false; + } + + while (it.hasNext()) { + long pos = it.next(); + if (pos - lastPosition != 1) { + emit(target, rangeStart, lastPosition); + rangeStart = pos; + } + + lastPosition = pos; + } + + emit(target, rangeStart, lastPosition); + return true; + } + + // Tight per-position loop for the remaining iterator. Split out so the coalesce frame stays + // small enough for the JIT to inline aggressively into the caller. + private static void naiveTail(PositionDeleteIndex target, Iterator tail) { + while (tail.hasNext()) { + target.delete(tail.next()); + } + } + + // dispatches to the cheaper single-position delete when the range is one element + private static void emit(PositionDeleteIndex target, long rangeStart, long lastPosition) { + if (rangeStart == lastPosition) { + target.delete(rangeStart); + } else { + target.delete(rangeStart, lastPosition + 1); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionDeleteRangeConsumer.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionDeleteRangeConsumer.java new file mode 100644 index 000000000000..fea93eb678d5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionDeleteRangeConsumer.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; + +class TestPositionDeleteRangeConsumer { + + @Test + void emptyInput() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(Collections.emptyList(), index); + assertThat(index.isEmpty()).isTrue(); + } + + @Test + void singlePosition() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(42L), index); + + assertThat(index.isDeleted(42)).isTrue(); + assertThat(index.cardinality()).isEqualTo(1); + } + + @Test + void consecutivePositionsCoalesced() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(10L, 11L, 12L, 13L, 14L), index); + + for (long pos = 10; pos <= 14; pos++) { + assertThat(index.isDeleted(pos)).isTrue(); + } + + assertThat(index.isDeleted(9)).isFalse(); + assertThat(index.isDeleted(15)).isFalse(); + assertThat(index.cardinality()).isEqualTo(5); + } + + @Test + void multipleDisjointRanges() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(3L, 4L, 5L, 10L, 11L, 100L), index); + + assertThat(index.isDeleted(3)).isTrue(); + assertThat(index.isDeleted(4)).isTrue(); + assertThat(index.isDeleted(5)).isTrue(); + assertThat(index.isDeleted(6)).isFalse(); + assertThat(index.isDeleted(10)).isTrue(); + assertThat(index.isDeleted(11)).isTrue(); + assertThat(index.isDeleted(12)).isFalse(); + assertThat(index.isDeleted(100)).isTrue(); + assertThat(index.cardinality()).isEqualTo(6); + } + + @Test + void unsortedInputProducesCorrectResult() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(50L, 10L, 11L, 12L, 1L, 2L, 3L), index); + + assertThat(index.isDeleted(50)).isTrue(); + assertThat(index.isDeleted(10)).isTrue(); + assertThat(index.isDeleted(11)).isTrue(); + assertThat(index.isDeleted(12)).isTrue(); + assertThat(index.isDeleted(1)).isTrue(); + assertThat(index.isDeleted(2)).isTrue(); + assertThat(index.isDeleted(3)).isTrue(); + assertThat(index.cardinality()).isEqualTo(7); + } + + @Test + void duplicatePositions() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(5L, 5L, 6L, 6L), index); + + assertThat(index.isDeleted(5)).isTrue(); + assertThat(index.isDeleted(6)).isTrue(); + assertThat(index.cardinality()).isEqualTo(2); + } + + @Test + void largeConsecutiveRange() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + int rangeSize = 100_000; + long[] positions = new long[rangeSize]; + for (int i = 0; i < rangeSize; i++) { + positions[i] = i; + } + + PositionDeleteRangeConsumer.forEach(asList(positions), index); + + assertThat(index.cardinality()).isEqualTo(rangeSize); + assertThat(index.isDeleted(0)).isTrue(); + assertThat(index.isDeleted(rangeSize - 1)).isTrue(); + assertThat(index.isDeleted(rangeSize)).isFalse(); + } + + @Test + void positionZero() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(0L, 1L, 2L), index); + + assertThat(index.isDeleted(0)).isTrue(); + assertThat(index.isDeleted(1)).isTrue(); + assertThat(index.isDeleted(2)).isTrue(); + assertThat(index.cardinality()).isEqualTo(3); + } + + @Test + void alternatingPositionsNoCoalescing() { + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(0L, 2L, 4L, 6L), index); + + assertThat(index.isDeleted(0)).isTrue(); + assertThat(index.isDeleted(1)).isFalse(); + assertThat(index.isDeleted(2)).isTrue(); + assertThat(index.isDeleted(3)).isFalse(); + assertThat(index.isDeleted(4)).isTrue(); + assertThat(index.isDeleted(5)).isFalse(); + assertThat(index.isDeleted(6)).isTrue(); + assertThat(index.cardinality()).isEqualTo(4); + } + + @Test + void largeSparseInputTakesNaivePath() { + // 4096 alternating positions -- sniff sees 100% boundaries on the 1024-long prefix and + // dispatches to the naive path. The result must still be identical to coalescing. + int count = 4096; + long[] positions = new long[count]; + for (int i = 0; i < count; i++) { + positions[i] = i * 2L; + } + + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(positions), index); + + assertThat(index.cardinality()).isEqualTo(count); + for (int i = 0; i < count; i++) { + assertThat(index.isDeleted(positions[i])).isTrue(); + assertThat(index.isDeleted(positions[i] + 1)).isFalse(); + } + } + + @Test + void sparsePrefixWithDenseTail() { + // First 1024 positions alternate, then 2048 consecutive positions. The sniff sees a sparse + // prefix and picks the naive path; the dense tail is still emitted correctly. + int prefixCount = 1024; + int tailCount = 2048; + long[] positions = new long[prefixCount + tailCount]; + for (int i = 0; i < prefixCount; i++) { + positions[i] = i * 2L; + } + + long tailStart = 10_000L; + for (int i = 0; i < tailCount; i++) { + positions[prefixCount + i] = tailStart + i; + } + + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(positions), index); + + assertThat(index.cardinality()).isEqualTo(prefixCount + tailCount); + for (int i = 0; i < prefixCount; i++) { + assertThat(index.isDeleted(positions[i])).isTrue(); + } + + for (int i = 0; i < tailCount; i++) { + assertThat(index.isDeleted(tailStart + i)).isTrue(); + } + + assertThat(index.isDeleted(tailStart + tailCount)).isFalse(); + } + + @Test + void densePrefixWithSparseTail() { + // First 1024 positions are contiguous, then 2048 alternating. The sniff sees a dense prefix + // and picks the coalesce path; the sparse tail is still emitted correctly. + int prefixCount = 1024; + int tailCount = 2048; + long[] positions = new long[prefixCount + tailCount]; + for (int i = 0; i < prefixCount; i++) { + positions[i] = i; + } + + long tailStart = 10_000L; + for (int i = 0; i < tailCount; i++) { + positions[prefixCount + i] = tailStart + i * 2L; + } + + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(positions), index); + + assertThat(index.cardinality()).isEqualTo(prefixCount + tailCount); + for (int i = 0; i < prefixCount; i++) { + assertThat(index.isDeleted(i)).isTrue(); + } + + for (int i = 0; i < tailCount; i++) { + assertThat(index.isDeleted(tailStart + i * 2L)).isTrue(); + assertThat(index.isDeleted(tailStart + i * 2L + 1)).isFalse(); + } + } + + @Test + void lengthAtExactSniffBoundary() { + // Input length matches the sniff window exactly: the prefix fills, the sniff runs, and the + // streaming tail loop never executes. Both dispatch decisions must produce correct output. + int count = 1024; + long[] dense = new long[count]; + long[] sparse = new long[count]; + for (int i = 0; i < count; i++) { + dense[i] = i; + sparse[i] = i * 2L; + } + + BitmapPositionDeleteIndex denseIndex = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(dense), denseIndex); + assertThat(denseIndex.cardinality()).isEqualTo(count); + assertThat(denseIndex.isDeleted(0)).isTrue(); + assertThat(denseIndex.isDeleted(count - 1)).isTrue(); + + BitmapPositionDeleteIndex sparseIndex = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(sparse), sparseIndex); + assertThat(sparseIndex.cardinality()).isEqualTo(count); + for (int i = 0; i < count; i++) { + assertThat(sparseIndex.isDeleted(i * 2L)).isTrue(); + } + } + + @Test + void smallAdversarialInputSkipsSniff() { + // Alternating input shorter than the sniff window -- the prefix never fills, so the sniff + // is skipped and we coalesce directly. Verifies correctness on the small-list fast path. + int count = 100; + long[] positions = new long[count]; + for (int i = 0; i < count; i++) { + positions[i] = i * 2L; + } + + BitmapPositionDeleteIndex index = new BitmapPositionDeleteIndex(); + PositionDeleteRangeConsumer.forEach(asList(positions), index); + + assertThat(index.cardinality()).isEqualTo(count); + for (int i = 0; i < count; i++) { + assertThat(index.isDeleted(i * 2L)).isTrue(); + assertThat(index.isDeleted(i * 2L + 1)).isFalse(); + } + } + + private static List asList(long... positions) { + return Arrays.stream(positions).boxed().collect(Collectors.toList()); + } +}