Skip to content

[WIP][FLINK-39519] Allocate pre-filter source buffers from heap in filtering mode#28073

Draft
1996fanrui wants to merge 14 commits intoapache:masterfrom
1996fanrui:39519/spilling
Draft

[WIP][FLINK-39519] Allocate pre-filter source buffers from heap in filtering mode#28073
1996fanrui wants to merge 14 commits intoapache:masterfrom
1996fanrui:39519/spilling

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

What is the purpose of the change

(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

Brief change log

(for example:)

  • The TaskInfo is stored in the blob store on job creation time as a persistent artifact
  • Deployments RPC transmits only the blob storage reference
  • TaskManagers retrieve the TaskInfo from the blob cache

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

… heap fallback from requestBufferBlocking()

RecoveredInputChannel now exposes two buffer-request methods:
- requestBuffer() — non-blocking, returns null when the pool is exhausted.
  Used by the post-filter output path (OutputWriter P1/P3) to attempt pool
  allocation without blocking.
- requestBufferBlocking() — blocking, waits until a pool buffer is
  available. In filtering mode the previous heap-buffer fallback
  (allocateUnpooledSegment + FreeingBufferRecycler) is removed: the
  deadlock it guarded against is already prevented by allocating the
  pre-filter buffer from a dedicated heap segment in
  InputChannelRecoveredStateHandler, so post-filter requests can safely
  block until the task thread recycles a pool buffer.

Non-filtering mode is unchanged.

Note on commit ordering: this commit belongs to FLINK-39524 (Integration)
because removing the heap fallback is only safe once OutputWriter provides
the disk-spilling replacement for the post-filter path. It is kept at this
position in development for reviewability and will be reordered to sit
adjacent to the Integration commit before merge.

Tests:
- testRequestBufferNonBlockingAndBlockingHasNoHeapFallback: drains both
  exclusive buffers (via requestBuffer()) and the gate's floating pool,
  then verifies requestBufferBlocking() blocks (does not fall back to
  heap) until a buffer is recycled.
Introduce the on-disk spill pipeline that backs the new recovered-buffer
flow:

- FilteredSpillFile: append/read the per-task spill file used to offload
  filtered recovery buffers when the in-flight buffer pool is exhausted.
- RecoveredBufferStore (interface + impl + Empty${}): per-channel store
  that owns ready in-memory buffers and coordinates with the cross-channel
  drain pipeline. EmptyRecoveredBufferStore is the no-spill default for
  channels that never recover state.
- RecoveredBufferStoreCoordinator: cross-channel hook surface invoked by
  per-channel stores on checkpoint start/stop, channel release, and
  drain-head queries.
- EntryPosition: lexicographic (fileIndex, offset) cursor used by the
  drain bundle; END is the past-the-last sentinel.

Tests: FilteredSpillFileTest, RecoveredBufferStoreTest, EntryPositionTest.
…ds to buffer, disk, or drain

Introduce the per-channel filtered-record dispatcher that decides whether
a recovered buffer goes to the in-memory ready queue, the on-disk spill
file, or is drained directly during phase-2:

- BufferRequester: small abstraction so the dispatcher can request
  buffers from a buffer pool without depending on the full input gate.
- FilteredBufferDispatcher (interface) + FilteredBufferDispatcherImpl:
  enqueue-or-spill logic; close splits into drainPendingSpill +
  resource release so a checkpoint abort never blocks on disk I/O.
- TestBufferPool / FilteredBufferDispatcherTest: stress the buffer
  ownership, ready-buffer back-pressure, and drain-vs-close ordering.
…Store in input channels

Wire the input-side gate so each channel pulls recovered buffers from a
RecoveredBufferStore before resuming network traffic:

- RecoveredChannelStateHandler: split lifecycle into finishRecovery() +
  close(), so the recovery business step no longer sits inside resource
  release. close() becomes a pure no-op when finishRecovery has already
  fired.
- LocalInputChannel / RemoteInputChannel / Local-/Remote-/Unknown-
  RecoveredInputChannel: buffer-pool conversion, priority-event
  pre-emption, hasPendingPriorityEvent guarding, and the release-race
  handling in pollPendingPriorityEvent.
- ChannelStatePersister / SingleInputGate: cross-channel notify path
  for data availability after recovered-channel conversion.
- Test updates across InputChannelBuilder, RemoteInputChannelTest,
  LocalInputChannelTest, ChannelStatePersisterTest, RecoveredInputChannelTest,
  CreditBasedPartitionRequestClientHandlerTest, PartitionRequestRegistrationTest,
  UnionInputGateTest, SingleInputGateBenchmarkFactory,
  ChannelStateChunkReaderTest (fake handler implements finishRecovery),
  ResultSubpartitionRecoveredStateHandlerTest (AT-FRCV output half).
…tData for spilled recovery data

Extend the channel-state writer with a streaming addInputData variant so
the dispatcher can hand over a spill-file InputStream (instead of an
already-materialised Buffer iterator) without round-tripping through the
network buffer pool:

- ChannelStateWriter / ChannelStateWriterImpl / ChannelStateWriteRequest:
  new addInputData(checkpointId, channelInfo, startSeq, inputStream,
  totalBytes) entry point.
- ChannelStateCheckpointWriter: chunk the InputStream straight into the
  checkpoint output, preserving sequence numbering and EOF accounting.
- MockChannelStateWriter / writer-side tests: cover the new path.
… recovery flow

Plug FilteredBufferDispatcher into the gate-side filter pipeline so the
recovery flow now flushes -> finishRecovery -> drainPendingSpill -> close,
matching the long-term contract documented in close_drain_separation:

- ChannelStateFilteringHandler / SequentialChannelStateReaderImpl: drive
  dispatcher drainPendingSpill before close, so phase-2 disk drain is an
  explicit step instead of a side-effect of cleanup.
- GateFilterHandlerTest / GateFilterHandlerBufferOwnershipTest /
  InputChannelRecoveredStateHandlerTest: cover the new sequence and the
  buffer-ownership invariants across drain and close.
… filter pipeline

Trim verbose JavaDocs and inline comments across the recovery / filter /
dispatcher / store path. Pure documentation cleanup — no code logic changes.

- Remove redundant @param/@throws that just restate parameter names.
- Drop step-by-step narration and section banners that mirror code structure.
- Strip references to requirements/<issue>/... paths and process notes.
- Keep non-obvious "why" comments: lock-order rationale, AB-BA risks,
  capture-then-fire-outside protocol, drainHead visibility / FIFO invariants.
- Apply same pattern to the corresponding tests.
Reformat javadoc and method signatures previously missed by spotless on
files in the unaligned-recovery filter pipeline. Pure formatting; no
behavior change.
google-java-format 1.24.0 (used by spotless) calls Modifier.valueOf on
identifier tokens, expecting Modifier.SEALED to exist. That constant was
added to javax.lang.model.element.Modifier only in Java 17; on Java 11
any source file that uses 'sealed' as a field or method name fails the
formatter. Rename FilteredSpillFile.Reader's state field and helpers to
'frozen' / 'freeze()' / 'isFrozen()' to keep the formatter usable on the
Java 11 toolchain. Callers and surrounding comments updated accordingly.
NetworkBufferPool reported "not empty after destroying all LocalBufferPools"
on every shutdown of UnalignedCheckpointRescaleITCase (~3 leaks per test).
RecoveredInputChannel#releaseAllResources stopped releasing the store after
1b27759 (which removed the call to fix the conversion-path data-loss
bug). The abort path — SingleInputGate#close before conversion runs —
relies on the same method, so any buffer parked in store.readyBuffers at
shutdown leaked.

gate.close and convertRecoveredInputChannels both run under requestLock,
so they are mutually exclusive. From close's view storeTransferred is
either fully true (conversion ran) or fully false (it didn't); release the
store only in the latter case.

Renamed the existing converted/markConverted to storeTransferred/
markStoreTransferred so the flag's name matches what releaseAllResources
actually checks. Removed the unused private releaseAfterDrain.
…created

Symptom
-------
After the abort-path store-release fix (42e548d), running
UnalignedCheckpointRescaleITCase still left ~17 "NetworkBufferPool is
not empty after destroying all LocalBufferPools" warnings per loop
iteration (50 tests). The leak counter `numTotalRequiredBuffers > 0`
indicates RecoveredInputChannel#bufferManager exclusive segments were
never returned to the global pool.

Root cause
----------
RecoveredInputChannel relies on a drainDone+storeTransferred rendezvous
to release its BufferManager (FLINK-39519, 1b27759). On the dispatcher
path, FilteredBufferDispatcher#close fires markDrainDone for every channel
in its channelMap. But ChannelStateFilteringHandler#createFromContext
returns null when a task has no virtual channels for rescaling (e.g. an
upscale-source case where downstream operators keep their parallelism),
so SequentialChannelStateReaderImpl#readInputData skips dispatcher
creation. Without a dispatcher, nothing fires markDrainDone — the
rendezvous never completes, releaseAllBuffers never runs, and the
exclusive segments leak when the test shuts the cluster down.

There is also a race on the dispatcher==null path itself:
finishRecovery() completes bufferFilteringCompleteFuture, which can
trigger a mailbox-driven SingleInputGate#convertRecoveredInputChannels
to replace `channels[i]` with a physical channel before our cleanup
runs. If we look up RecoveredInputChannels via inputGates *after*
finishRecovery, the converted ones look like physical channels and we
miss markDrainDone on them.

Fix
---
On the dispatcher==null path, snapshot the list of RecoveredInputChannel
references before calling stateHandler.finishRecovery(), then iterate
the snapshot to call markDrainDone after finishRecovery. The snapshot
mirrors what FilteredBufferDispatcher does internally with channelMap;
both paths now hold immutable references that are immune to the
mailbox-driven swap.

Verification
------------
UnalignedCheckpointRescaleITCase + UnalignedCheckpointRescaleWithMixed
ExchangesITCase loop:
  before this commit:  ~17 leaks per iteration (down from 184 after the
                       prior abort-path fix)
  after this commit:   0 leaks per iteration

The 1 occasional test failure ("Graph is in globally terminal state")
hits a different parametrized case each run and is independent of the
leak fix.
@1996fanrui 1996fanrui changed the title [WIP] [WIP][FLINK-39519] Allocate pre-filter source buffers from heap in filtering mode Apr 29, 2026
@1996fanrui 1996fanrui marked this pull request as draft April 29, 2026 23:02
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…nnel lock

Replace FilteredBufferDispatcherImpl's instance monitor B with lock-free
primitives (ConcurrentHashMap / AtomicInteger / AtomicLong / volatile /
AtomicReference<CheckpointWindow>) so the per-channel store lock can call
back into the coordinator without forming an AB-BA cycle.

With B gone, restore the master-style wide synchronized(recoveredStore)
around RemoteInputChannel and LocalInputChannel checkpointStarted /
checkpointStopped. ChannelStatePersister now holds RecoveredBufferStore
as a constructor field and asserts Thread.holdsLock(store) on every
state-touching method, eliminating the data-loss race window where
startPersisting was called outside the lock.

FilteredSpillFile.Reader.entries becomes ConcurrentLinkedDeque so the
recovery thread's drainPendingSpill and the task thread's onChannelReleased
no longer race on the underlying ArrayDeque. Per-checkpoint coordination
state is bundled into an immutable CheckpointWindow published via CAS;
snapshot ownership is handed off via an AtomicBoolean drainTriggered guard
so stop / drain / close cannot double-close pinned Reader fds.
Replace the AtomicReference<CheckpointWindow> + CAS retry + drainTriggered
double-claim machinery with a single explicit lock object
(dispatcherLock) and the SMALL → BIG ordering rule. Per-checkpoint
fields go back to plain HashMap / HashSet / long, recovery-thread
fields stay volatile, Reader.entries stays ConcurrentLinkedDeque
(load-bearing for the cross-lock-domain mutation between
drainPendingSpill and onChannelReleased).

The three onChannel* methods and close()'s phase 2 take dispatcherLock
and start with `if (closed) return;`. close() does flushCache outside
the lock (only fires on the abort path, which has no concurrent task
threads) and the deletes/cleanup inside, which closes the
close-vs-snapshot NoSuchFileException race that the previous fully
lock-free implementation introduced: a concurrent
onChannelCheckpointStarted either pins its FileChannels first (POSIX
keeps the file alive after unlink) or sees closed=true and bails before
opening the file.

Also trim redundant javadoc / inline comments per the project
commenting policy: javadocs describe contracts, in-code comments
explain non-obvious "why", code is left to explain "what" and "how".
Net change vs the previous no-lock commit afb903b71b1: -90 lines
across the four touched files.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants