Search before asking
Description
Summary
DurableExecutionManager.recordCheckpointSequenceNumbers() unconditionally adds entries to checkpointIdToSeqNums, but notifyCheckpointComplete() only cleans them up when actionStateStore != null. When durable execution is disabled (actionStateStore == null), checkpoint entries accumulate indefinitely — a memory leak.
Root Cause
// Always records — no null guard
void recordCheckpointSequenceNumbers(long checkpointId, Map<Object, Long> seqNums) {
checkpointIdToSeqNums.put(checkpointId, seqNums);
}
// Only cleans up when actionStateStore is non-null
void notifyCheckpointComplete(long checkpointId) {
if (actionStateStore != null) {
...
checkpointIdToSeqNums.remove(checkpointId); // never reached when actionStateStore == null
}
}
Fix
Add a checkpointIdToSeqNums.remove(checkpointId) call unconditionally in notifyCheckpointComplete, or guard recordCheckpointSequenceNumbers with an early return when actionStateStore == null.
Notes
How to reproduce
This is a structural code bug — no specific input data is needed to trigger it.
- Configure an agent job with durable execution disabled (i.e., do not set
ActionStateStore — the default).
- Enable Flink checkpointing (e.g.,
env.enableCheckpointing(60_000)).
- Run the job and allow several checkpoints to complete.
Expected: checkpointIdToSeqNums stays empty (no entries needed when durable execution is off).
Actual: Each completed checkpoint adds an entry via recordCheckpointSequenceNumbers() that is never removed, because notifyCheckpointComplete() skips cleanup when actionStateStore == null. The map grows unbounded for the lifetime of the job.
Version and environment
Are you willing to submit a PR?
Search before asking
Description
Summary
DurableExecutionManager.recordCheckpointSequenceNumbers()unconditionally adds entries tocheckpointIdToSeqNums, butnotifyCheckpointComplete()only cleans them up whenactionStateStore != null. When durable execution is disabled (actionStateStore == null), checkpoint entries accumulate indefinitely — a memory leak.Root Cause
Fix
Add a
checkpointIdToSeqNums.remove(checkpointId)call unconditionally innotifyCheckpointComplete, or guardrecordCheckpointSequenceNumberswith an early return whenactionStateStore == null.Notes
ActionExecutionOperatorrefactoring (PR [runtime] Refactor ActionExecutionOperator into focused manager classes #546); it was surfaced during review of that PR by @wenjin272.How to reproduce
This is a structural code bug — no specific input data is needed to trigger it.
ActionStateStore— the default).env.enableCheckpointing(60_000)).Expected:
checkpointIdToSeqNumsstays empty (no entries needed when durable execution is off).Actual: Each completed checkpoint adds an entry via
recordCheckpointSequenceNumbers()that is never removed, becausenotifyCheckpointComplete()skips cleanup whenactionStateStore == null. The map grows unbounded for the lifetime of the job.Version and environment
Are you willing to submit a PR?