perf: specialized SemiAntiSortMergeJoinStream#20806
Conversation
|
Thanks @mbutrovich, IMO it is great to start replacing current SMJ implementation with more performant and easier to maintain structure. Even by pieces. Good sign fuzz tests passed. |
| WHERE EXISTS ( | ||
| SELECT 1 FROM t2_sorted | ||
| WHERE t2_sorted.key = t1_sorted.key | ||
| AND t2_sorted.data <> t1_sorted.data |
There was a problem hiding this comment.
I added this benchmark in #18875 but neglected to apply the filter correctly. Now this 90% scenario has the same logic as the 10% and 50% filters above it.
|
One thing to mention: to control the flag |
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
Outdated
Show resolved
Hide resolved
I'd like to understand the long-term implications of having the config. I added it since we're used to having opt outs in Comet from operators in case it is not compatible with a workload, and it makes it easier to benchmark right now. However, would we then be subject to API health considerations on removing the old SMJ's semi/anti support for X releases/months? If so, I might just propose removing the config and saying "this is the new semi/anti SMJ implementation" as if we had changed code internal to |
| let result = filter | ||
| .expression() | ||
| .evaluate(&filter_batch)? | ||
| .into_array(num_outer_rows)?; |
There was a problem hiding this comment.
this can be another PR but if the expression returned scalar we can use that optimization later instead of converting to array
There was a problem hiding this comment.
I'm trying to think through if the expression ever could return a scalar in this context.
Let me think about it.
|
🤖 Hi @Dandandan, thanks for the request (#20806 (comment)).
Please choose one or more of these with You can also set environment variables on subsequent lines: Unsupported benchmarks: smj. |
|
run benchmark smj |
|
🤖 Criterion benchmark running (GKE) | trigger |
|
Benchmark for this request failed. Last 20 lines of output: Click to expand |
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
Outdated
Show resolved
Hide resolved
| matched: &mut BooleanBufferBuilder, | ||
| outer_offset: usize, | ||
| outer_group_len: usize, | ||
| mut matched_count: usize, |
There was a problem hiding this comment.
can this be derived from matched ?
There was a problem hiding this comment.
That requires a recounting when we just did it at the call site. I'll add a debug_assert that the arg is correct.
There was a problem hiding this comment.
Thanks @mbutrovich overall it is lgtm, just couple of minors and great that fuzz tests are passing
comphead
left a comment
There was a problem hiding this comment.
Thanks @mbutrovich epic work!
…ins to bitwise stream. Near-unique LEFT and FULL SMJ 20-50x faster (#21184) ## Which issue does this PR close? Partially addresses #20910. Fixes #21197. ## Rationale for this change Sort-merge join with a filter on outer joins (LEFT/RIGHT/FULL) runs `process_filtered_batches()` on every key transition in the Init state. With near-unique keys (1:1 cardinality), this means running the full deferred filtering pipeline (concat + `get_corrected_filter_mask` + `filter_record_batch_by_join_type`) once per row — making filtered LEFT/RIGHT/FULL **55x slower** than INNER for 10M unique keys. Additionally, mark join logic in `MaterializingSortMergeJoinStream` materializes full `(streamed, buffered)` pairs only to discard most of them via `get_corrected_filter_mask()`. Mark joins are structurally identical to semi joins (one output row per outer row with a boolean result) and belong in `BitwiseSortMergeJoinStream`, which avoids pair materialization entirely using a per-outer-batch bitset. ## What changes are included in this PR? Three areas of improvement, building on the specialized semi/anti stream from #20806: **1. Move mark joins to `BitwiseSortMergeJoinStream`** - Match on join type; `emit_outer_batch()` emits all rows with the match bitset as a boolean column (vs semi's filter / anti's invert-and-filter) - Route `LeftMark`/`RightMark` from `SortMergeJoinExec::execute()` to the bitwise stream - Remove all mark-specific logic from `MaterializingSortMergeJoinStream` (`mark_row_as_match`, `is_not_null` column generation, mark arms in filter correction) **2. Batch filter evaluation in `freeze_streamed()`** - Split `freeze_streamed()` into null-joined classification + `freeze_streamed_matched()` for batched materialization - Collect indices across chunks, materialize left/right columns once using tiered Arrow kernels (`slice` → `take` → `interleave`) - Single `RecordBatch` construction and single `expression.evaluate()` per freeze instead of per chunk - Vectorize `append_filter_metadata()` using builder `extend()` instead of per-element loop **3. Batch deferred filtering in Init state** (this is the big win for Q22 and Q23) - Gate `process_filtered_batches()` on accumulated rows >= `batch_size` instead of running on every Init entry - Accumulated data bounded to ~2×batch_size (one from `freeze_dequeuing_buffered`, one accumulating toward next freeze) — does not reintroduce unbounded buffering fixed by PR #20482 - `Exhausted` state flushes any remainder **Cleanup:** - Rename `SortMergeJoinStream` → `MaterializingSortMergeJoinStream` (materializes explicit row pairs for join output) and `SemiAntiMarkSortMergeJoinStream` → `BitwiseSortMergeJoinStream` (tracks matches via boolean bitset) - Consolidate `semi_anti_mark_sort_merge_join/` into `sort_merge_join/` as `bitwise_stream.rs` / `bitwise_tests.rs`; rename `stream.rs` → `materializing_stream.rs` and `tests.rs` → `materializing_tests.rs` - Consolidate `SpillManager` construction into `SortMergeJoinExec::execute()` (shared across both streams); move `peak_mem_used` gauge into `BitwiseSortMergeJoinStream::try_new` - `MaterializingSortMergeJoinStream` now handles only Inner/Left/Right/Full — all semi/anti/mark branching removed - `get_corrected_filter_mask()`: merge identical Left/Right/Full branches; add null-metadata passthrough for already-null-joined rows - `filter_record_batch_by_join_type()`: rewrite from `filter(true) + filter(false) + concat` to `zip()` for in-place null-joining — preserves row ordering and removes `create_null_joined_batch()` entirely; add early return for empty batches - `filter_record_batch_by_join_type()`: use `compute::filter()` directly on `BooleanArray` instead of wrapping in temporary `RecordBatch` ## Benchmarks `cargo run --release --bin dfbench -- smj` | Query | Join Type | Rows | Keys | Filter | Main (ms) | PR (ms) | Speedup | |-------|-----------|------|------|--------|-----------|---------|---------| | Q1 | INNER | 1M×1M | 1:1 | — | 16.3 | 14.4 | 1.1x | | Q2 | INNER | 1M×10M | 1:10 | — | 117.4 | 120.1 | 1.0x | | Q3 | INNER | 1M×1M | 1:100 | — | 74.2 | 66.6 | 1.1x | | Q4 | INNER | 1M×10M | 1:10 | 1% | 17.1 | 15.1 | 1.1x | | Q5 | INNER | 1M×1M | 1:100 | 10% | 18.4 | 14.4 | 1.3x | | Q6 | LEFT | 1M×10M | 1:10 | — | 129.3 | 122.7 | 1.1x | | Q7 | LEFT | 1M×10M | 1:10 | 50% | 150.2 | 142.2 | 1.1x | | Q8 | FULL | 1M×1M | 1:10 | — | 16.6 | 16.7 | 1.0x | | Q9 | FULL | 1M×10M | 1:10 | 10% | 153.5 | 136.2 | 1.1x | | Q10 | LEFT SEMI | 1M×10M | 1:10 | — | 53.1 | 53.1 | 1.0x | | Q11 | LEFT SEMI | 1M×10M | 1:10 | 1% | 15.5 | 14.7 | 1.1x | | Q12 | LEFT SEMI | 1M×10M | 1:10 | 50% | 65.0 | 67.3 | 1.0x | | Q13 | LEFT SEMI | 1M×10M | 1:10 | 90% | 105.7 | 109.8 | 1.0x | | Q14 | LEFT ANTI | 1M×10M | 1:10 | — | 54.3 | 53.9 | 1.0x | | Q15 | LEFT ANTI | 1M×10M | 1:10 | partial | 51.5 | 50.5 | 1.0x | | Q16 | LEFT ANTI | 1M×1M | 1:1 | — | 10.3 | 11.3 | 0.9x | | Q17 | INNER | 1M×50M | 1:50 | 5% | 75.9 | 79.0 | 1.0x | | Q18 | LEFT SEMI | 1M×50M | 1:50 | 2% | 50.2 | 49.0 | 1.0x | | Q19 | LEFT ANTI | 1M×50M | 1:50 | partial | 336.4 | 344.2 | 1.0x | | Q20 | INNER | 1M×10M | 1:100 | GROUP BY | 763.7 | 803.9 | 1.0x | | Q21 | INNER | 10M×10M | 1:1 | 50% | 186.1 | 187.8 | 1.0x | | Q22 | LEFT | 10M×10M | 1:1 | 50% | 10,193.8 | 185.8 | **54.9x** | | Q23 | FULL | 10M×10M | 1:1 | 50% | 10,194.7 | 233.6 | **43.6x** | | Q24 | LEFT MARK | 1M×10M | 1:10 | 1% | FAILS | 15.1 | — | | Q25 | LEFT MARK | 1M×10M | 1:10 | 50% | FAILS | 67.3 | — | | Q26 | LEFT MARK | 1M×10M | 1:10 | 90% | FAILS | 110.0 | — | General workload (Q1-Q20, various join types/cardinalities/selectivities): no regressions. ## Are these changes tested? In addition to existing unit and sqllogictests: - I ran 50 iterations of the fuzz tests (modified to only test against hash join as the baseline because nested loop join takes too long) `cargo test -p datafusion --features extended_tests --test fuzz -- join_fuzz` - One new sqllogictest for #21197 that fails on main - Four new unit tests: three for full join with filter that spills - One new fuzz test to exercise full join with filter that spills - New benchmark queries Q21-Q23: 10M×10M unique keys with 50% join filter for INNER/LEFT/FULL — exercises the degenerate case this PR fixes - New benchmark queries Q24-Q26 duplicated Q11-Q13 but for Mark joins, showing that they have the same performance as other joins (`LeftSemi`) that use this stream ## Are there any user-facing changes? No.
Which issue does this PR close?
Rationale for this change
DataFusion's
SortMergeJoinExechandles semi/anti joins by materializing(outer, inner)row pairs, applying a filter, then deduplicating with a corrected filter mask. Semi/anti joins only need a boolean per outer row — not pairs. The pair-based approach allocates unnecessary intermediate batches and index arrays to materialize output.Recent PRs have improved SMJ performance within the existing pair-based framework — #18875 (BatchCoalescer to reduce concatenation overhead), #20463 (zero-copy slice instead of take for contiguous indices), #20478 (cached row counts to avoid O(n) recalculation) — but the fundamental mismatch remains: semi/anti joins don't need pairs at all. I think we're hitting diminishing returns on filtered semi/anti sort-merge joins (TPC-H Q21) and need a specialized stream.
What changes are included in this PR?
A new
SemiAntiSortMergeJoinStreamused internally bySortMergeJoinExecforLeftSemi,LeftAnti,RightSemi, andRightAntijoins. WhenSortMergeJoinExec::execute()encounters a semi/anti join type, it instantiates this stream instead of the existingSortMergeJoinStream. This is transparent to the rest of the system — no planner changes, no config flags, no new operators.Instead of materializing row pairs, the stream maintains a per-outer-batch bitset (
BooleanBufferBuilder) recording which outer rows have a matching inner row, then emits output viafilter_record_batch.Algorithm: Merge-scan across two sorted inputs. On key match without filter, set matched bits for the outer key group. With filter, buffer the inner key group and evaluate the filter as outer_slice × inner_scalar, OR-ing results into the bitset with
apply_bitwise_binary_op(64 bits per iteration). Short-circuit when all outer rows in the group are matched.Memory management: The inner key group buffer is tracked via
MemoryReservationand spilled to disk (viaSpillManager) when the memory pool limit is exceeded, matching existingSortMergeJoinExecbehavior. Metrics includepeak_mem_used,spill_count,spilled_bytes, andspilled_rows.Benchmark results (best of 3,
dfbench smj):Non-semi/anti queries are unaffected (same stream as before).
Are these changes tested?
PendingStream), batch boundary handling, filtered joins, and spill-to-disk edge casesjoin_fuzz.rs) compareSortMergeJoinExecoutput againstHashJoinExecfor all semi/anti join types, with and without filters, across multiple batch sizes and sort key combinations. Ran 1000+ iterations locally with random seeds.sort_merge_join/tests.rscontinue to exercise semi/anti join types throughSortMergeJoinExec, now hitting the new streamsort_merge_join.sltsqllogic tests pass (the stream change is transparent to the SQL layer)Current operator:
PR #20806:
(metrics and spilling were not hooked up in the version I ported to Comet, but this query does not spill anyway)
Are there any user-facing changes?
No. The new stream is used automatically for semi/anti sort-merge joins. There are no new config flags, no API changes, and no changes to query plans.
Known limitations to address in follow-up PRs:
SortMergeJoinStream(dead code for those join types now).