Skip to content

[WIP][SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk#55275

Draft
ivoson wants to merge 1 commit intoapache:masterfrom
ivoson:SPARK-56410-k-way-merge
Draft

[WIP][SPARK-56410][SQL][CORE] Add bounded k-way merge support in UnsafeExternalSorter to reduce OOM risk#55275
ivoson wants to merge 1 commit intoapache:masterfrom
ivoson:SPARK-56410-k-way-merge

Conversation

@ivoson
Copy link
Copy Markdown
Contributor

@ivoson ivoson commented Apr 9, 2026

What changes were proposed in this pull request?

Added bounded multi-round k-way merge to UnsafeExternalSorter to prevent OOM during sort-merge when there are many spill files. Previously, getSortedIterator() opened all spill readers simultaneously (~3MB per reader), causing OOM with hundreds of spills.

  • New UnsafeSorterBoundedSpillMerger class that merges spill files in rounds of at most K files (configurable merge factor, default 64), writing intermediate results to temp spill files, then merging those until the count fits in a single final round.
  • New internal config spark.unsafe.sorter.spill.merge.factor (default 64, set -1 to disable) that controls the maximum number of concurrent spill readers during merge. At 64 readers x 3MB = ~192MB, well within typical executor heap sizes.
  • Added merge observability logging — logs spill count, merge factor, and round information at merge time to aid future debugging.

How It Works

For example with 680 spills:

                    BEFORE (Current Behavior - OOM)
                    ================================

  680 spill files opened ALL AT ONCE
  +------+ +------+ +------+ +------+       +------+
  |Spill | |Spill | |Spill | |Spill |  ...  |Spill |
  |  1   | |  2   | |  3   | |  4   |       | 680  |
  +--+---+ +--+---+ +--+---+ +--+---+       +--+---+
     |        |        |        |               |
     |  3MB   |  3MB   |  3MB   |  3MB    3MB   |
     v        v        v        v               v
  +-----------------------------------------------------+
  |          PriorityQueue (680 readers)                 |
  |          680 x 3MB = ~2 GB buffers                  |
  |                   OOM!                               |
  +-----------------------------------------------------+


                    AFTER (Bounded Merge - Safe)
                    ============================

  Example: 680 spill files, merge factor K = 64

  --- Round 1: merge groups of 64 ----------------------

  Group 1 (64 files)    Group 2 (64 files)       Group 11 (remaining)
  +--++--+    +--+     +--++--+    +---+         +--++--+  +---+
  |S1||S2|... |S64|    |S65||S66|...|S128|  ...  |  ||  |..|680|
  +-++ +-+    +-++     +-++-+-+    +-+-+         +-++-++  +-+-+
    |   |       |        |   |       |             |   |     |
    v   v       v        v   v       v             v   v     v
  +--------------+     +--------------+          +--------------+
  | Merge (<=64  |     | Merge (<=64  |          | Merge (<=64  |
  |  readers)    |     |  readers)    |          |  readers)    |
  | ~192MB max   |     | ~192MB max   |          | ~192MB max   |
  +------+-------+     +------+-------+          +------+-------+
         |                    |                         |
         v                    v                         v
     +--------+          +--------+                +--------+
     |Temp    |          |Temp    |                |Temp    |
     |File 1  |          |File 2  |       ...      |File 11 |
     +--------+          +--------+                +--------+

  --- Round 2 (Final): 11 files <= 64, merge directly ---

     +--------+ +--------+           +--------+  +---------+
     |Temp    | |Temp    |    ...    |Temp    |  |In-Memory|
     |File 1  | |File 2  |           |File 11 |  |  Data   |
     +---+----+ +---+----+           +---+----+  +----+----+
         |          |                    |             |
         v          v                    v             v
     +------------------------------------------------------+
     |        Final PriorityQueue (<= 12 readers)           |
     |         12 x 3MB = ~36 MB  Safe!                     |
     +------------------------+-----------------------------+
                              |
                              v
                      +---------------+
                      | Sorted Output |
                      +---------------+


  --- Memory Comparison ------------------------------------

  Before:  680 readers x 3MB = ~2,040 MB  ->  OOM
  After:    64 readers x 3MB =   ~192 MB  ->  Safe

Why are the changes needed?

When UnsafeExternalSorter accumulates a large number of spill files, the merge phase opens all spill readers simultaneously. Each UnsafeSorterSpillReader allocates ~3MB of buffers (1MB NioBufferedFileInputStream + 1MB ReadAheadInputStream + 1MB record byte array). With hundreds of spills, this means ~GB+ of merge buffers alone, causing OOM even when the executor has sufficient heap for normal operation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UTs added.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-6)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant