[WIP][FLINK-39519] Allocate pre-filter source buffers from heap in filtering mode#28073
Draft
1996fanrui wants to merge 14 commits intoapache:masterfrom
Draft
[WIP][FLINK-39519] Allocate pre-filter source buffers from heap in filtering mode#280731996fanrui wants to merge 14 commits intoapache:masterfrom
1996fanrui wants to merge 14 commits intoapache:masterfrom
Conversation
… heap fallback from requestBufferBlocking() RecoveredInputChannel now exposes two buffer-request methods: - requestBuffer() — non-blocking, returns null when the pool is exhausted. Used by the post-filter output path (OutputWriter P1/P3) to attempt pool allocation without blocking. - requestBufferBlocking() — blocking, waits until a pool buffer is available. In filtering mode the previous heap-buffer fallback (allocateUnpooledSegment + FreeingBufferRecycler) is removed: the deadlock it guarded against is already prevented by allocating the pre-filter buffer from a dedicated heap segment in InputChannelRecoveredStateHandler, so post-filter requests can safely block until the task thread recycles a pool buffer. Non-filtering mode is unchanged. Note on commit ordering: this commit belongs to FLINK-39524 (Integration) because removing the heap fallback is only safe once OutputWriter provides the disk-spilling replacement for the post-filter path. It is kept at this position in development for reviewability and will be reordered to sit adjacent to the Integration commit before merge. Tests: - testRequestBufferNonBlockingAndBlockingHasNoHeapFallback: drains both exclusive buffers (via requestBuffer()) and the gate's floating pool, then verifies requestBufferBlocking() blocks (does not fall back to heap) until a buffer is recycled.
Introduce the on-disk spill pipeline that backs the new recovered-buffer
flow:
- FilteredSpillFile: append/read the per-task spill file used to offload
filtered recovery buffers when the in-flight buffer pool is exhausted.
- RecoveredBufferStore (interface + impl + Empty${}): per-channel store
that owns ready in-memory buffers and coordinates with the cross-channel
drain pipeline. EmptyRecoveredBufferStore is the no-spill default for
channels that never recover state.
- RecoveredBufferStoreCoordinator: cross-channel hook surface invoked by
per-channel stores on checkpoint start/stop, channel release, and
drain-head queries.
- EntryPosition: lexicographic (fileIndex, offset) cursor used by the
drain bundle; END is the past-the-last sentinel.
Tests: FilteredSpillFileTest, RecoveredBufferStoreTest, EntryPositionTest.
…ds to buffer, disk, or drain Introduce the per-channel filtered-record dispatcher that decides whether a recovered buffer goes to the in-memory ready queue, the on-disk spill file, or is drained directly during phase-2: - BufferRequester: small abstraction so the dispatcher can request buffers from a buffer pool without depending on the full input gate. - FilteredBufferDispatcher (interface) + FilteredBufferDispatcherImpl: enqueue-or-spill logic; close splits into drainPendingSpill + resource release so a checkpoint abort never blocks on disk I/O. - TestBufferPool / FilteredBufferDispatcherTest: stress the buffer ownership, ready-buffer back-pressure, and drain-vs-close ordering.
…Store in input channels Wire the input-side gate so each channel pulls recovered buffers from a RecoveredBufferStore before resuming network traffic: - RecoveredChannelStateHandler: split lifecycle into finishRecovery() + close(), so the recovery business step no longer sits inside resource release. close() becomes a pure no-op when finishRecovery has already fired. - LocalInputChannel / RemoteInputChannel / Local-/Remote-/Unknown- RecoveredInputChannel: buffer-pool conversion, priority-event pre-emption, hasPendingPriorityEvent guarding, and the release-race handling in pollPendingPriorityEvent. - ChannelStatePersister / SingleInputGate: cross-channel notify path for data availability after recovered-channel conversion. - Test updates across InputChannelBuilder, RemoteInputChannelTest, LocalInputChannelTest, ChannelStatePersisterTest, RecoveredInputChannelTest, CreditBasedPartitionRequestClientHandlerTest, PartitionRequestRegistrationTest, UnionInputGateTest, SingleInputGateBenchmarkFactory, ChannelStateChunkReaderTest (fake handler implements finishRecovery), ResultSubpartitionRecoveredStateHandlerTest (AT-FRCV output half).
…tData for spilled recovery data Extend the channel-state writer with a streaming addInputData variant so the dispatcher can hand over a spill-file InputStream (instead of an already-materialised Buffer iterator) without round-tripping through the network buffer pool: - ChannelStateWriter / ChannelStateWriterImpl / ChannelStateWriteRequest: new addInputData(checkpointId, channelInfo, startSeq, inputStream, totalBytes) entry point. - ChannelStateCheckpointWriter: chunk the InputStream straight into the checkpoint output, preserving sequence numbering and EOF accounting. - MockChannelStateWriter / writer-side tests: cover the new path.
… recovery flow Plug FilteredBufferDispatcher into the gate-side filter pipeline so the recovery flow now flushes -> finishRecovery -> drainPendingSpill -> close, matching the long-term contract documented in close_drain_separation: - ChannelStateFilteringHandler / SequentialChannelStateReaderImpl: drive dispatcher drainPendingSpill before close, so phase-2 disk drain is an explicit step instead of a side-effect of cleanup. - GateFilterHandlerTest / GateFilterHandlerBufferOwnershipTest / InputChannelRecoveredStateHandlerTest: cover the new sequence and the buffer-ownership invariants across drain and close.
… filter pipeline Trim verbose JavaDocs and inline comments across the recovery / filter / dispatcher / store path. Pure documentation cleanup — no code logic changes. - Remove redundant @param/@throws that just restate parameter names. - Drop step-by-step narration and section banners that mirror code structure. - Strip references to requirements/<issue>/... paths and process notes. - Keep non-obvious "why" comments: lock-order rationale, AB-BA risks, capture-then-fire-outside protocol, drainHead visibility / FIFO invariants. - Apply same pattern to the corresponding tests.
Reformat javadoc and method signatures previously missed by spotless on files in the unaligned-recovery filter pipeline. Pure formatting; no behavior change.
google-java-format 1.24.0 (used by spotless) calls Modifier.valueOf on identifier tokens, expecting Modifier.SEALED to exist. That constant was added to javax.lang.model.element.Modifier only in Java 17; on Java 11 any source file that uses 'sealed' as a field or method name fails the formatter. Rename FilteredSpillFile.Reader's state field and helpers to 'frozen' / 'freeze()' / 'isFrozen()' to keep the formatter usable on the Java 11 toolchain. Callers and surrounding comments updated accordingly.
NetworkBufferPool reported "not empty after destroying all LocalBufferPools" on every shutdown of UnalignedCheckpointRescaleITCase (~3 leaks per test). RecoveredInputChannel#releaseAllResources stopped releasing the store after 1b27759 (which removed the call to fix the conversion-path data-loss bug). The abort path — SingleInputGate#close before conversion runs — relies on the same method, so any buffer parked in store.readyBuffers at shutdown leaked. gate.close and convertRecoveredInputChannels both run under requestLock, so they are mutually exclusive. From close's view storeTransferred is either fully true (conversion ran) or fully false (it didn't); release the store only in the latter case. Renamed the existing converted/markConverted to storeTransferred/ markStoreTransferred so the flag's name matches what releaseAllResources actually checks. Removed the unused private releaseAfterDrain.
…created Symptom ------- After the abort-path store-release fix (42e548d), running UnalignedCheckpointRescaleITCase still left ~17 "NetworkBufferPool is not empty after destroying all LocalBufferPools" warnings per loop iteration (50 tests). The leak counter `numTotalRequiredBuffers > 0` indicates RecoveredInputChannel#bufferManager exclusive segments were never returned to the global pool. Root cause ---------- RecoveredInputChannel relies on a drainDone+storeTransferred rendezvous to release its BufferManager (FLINK-39519, 1b27759). On the dispatcher path, FilteredBufferDispatcher#close fires markDrainDone for every channel in its channelMap. But ChannelStateFilteringHandler#createFromContext returns null when a task has no virtual channels for rescaling (e.g. an upscale-source case where downstream operators keep their parallelism), so SequentialChannelStateReaderImpl#readInputData skips dispatcher creation. Without a dispatcher, nothing fires markDrainDone — the rendezvous never completes, releaseAllBuffers never runs, and the exclusive segments leak when the test shuts the cluster down. There is also a race on the dispatcher==null path itself: finishRecovery() completes bufferFilteringCompleteFuture, which can trigger a mailbox-driven SingleInputGate#convertRecoveredInputChannels to replace `channels[i]` with a physical channel before our cleanup runs. If we look up RecoveredInputChannels via inputGates *after* finishRecovery, the converted ones look like physical channels and we miss markDrainDone on them. Fix --- On the dispatcher==null path, snapshot the list of RecoveredInputChannel references before calling stateHandler.finishRecovery(), then iterate the snapshot to call markDrainDone after finishRecovery. The snapshot mirrors what FilteredBufferDispatcher does internally with channelMap; both paths now hold immutable references that are immune to the mailbox-driven swap. Verification ------------ UnalignedCheckpointRescaleITCase + UnalignedCheckpointRescaleWithMixed ExchangesITCase loop: before this commit: ~17 leaks per iteration (down from 184 after the prior abort-path fix) after this commit: 0 leaks per iteration The 1 occasional test failure ("Graph is in globally terminal state") hits a different parametrized case each run and is independent of the leak fix.
Collaborator
…nnel lock Replace FilteredBufferDispatcherImpl's instance monitor B with lock-free primitives (ConcurrentHashMap / AtomicInteger / AtomicLong / volatile / AtomicReference<CheckpointWindow>) so the per-channel store lock can call back into the coordinator without forming an AB-BA cycle. With B gone, restore the master-style wide synchronized(recoveredStore) around RemoteInputChannel and LocalInputChannel checkpointStarted / checkpointStopped. ChannelStatePersister now holds RecoveredBufferStore as a constructor field and asserts Thread.holdsLock(store) on every state-touching method, eliminating the data-loss race window where startPersisting was called outside the lock. FilteredSpillFile.Reader.entries becomes ConcurrentLinkedDeque so the recovery thread's drainPendingSpill and the task thread's onChannelReleased no longer race on the underlying ArrayDeque. Per-checkpoint coordination state is bundled into an immutable CheckpointWindow published via CAS; snapshot ownership is handed off via an AtomicBoolean drainTriggered guard so stop / drain / close cannot double-close pinned Reader fds.
Replace the AtomicReference<CheckpointWindow> + CAS retry + drainTriggered double-claim machinery with a single explicit lock object (dispatcherLock) and the SMALL → BIG ordering rule. Per-checkpoint fields go back to plain HashMap / HashSet / long, recovery-thread fields stay volatile, Reader.entries stays ConcurrentLinkedDeque (load-bearing for the cross-lock-domain mutation between drainPendingSpill and onChannelReleased). The three onChannel* methods and close()'s phase 2 take dispatcherLock and start with `if (closed) return;`. close() does flushCache outside the lock (only fires on the abort path, which has no concurrent task threads) and the deletes/cleanup inside, which closes the close-vs-snapshot NoSuchFileException race that the previous fully lock-free implementation introduced: a concurrent onChannelCheckpointStarted either pins its FileChannels first (POSIX keeps the file alive after unlink) or sees closed=true and bails before opening the file. Also trim redundant javadoc / inline comments per the project commenting policy: javadocs describe contracts, in-code comments explain non-obvious "why", code is left to explain "what" and "how". Net change vs the previous no-lock commit afb903b71b1: -90 lines across the four touched files.
ad14255 to
6d16324
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)
Brief change log
(for example:)
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation
Was generative AI tooling used to co-author this PR?