Skip to content

[Bug][runtime] Fix memory leak in DurableExecutionManager.checkpointIdToSeqNums #645

@weiqingy

Description

@weiqingy

Search before asking

  • I searched in the issues and found nothing similar.

Description

Summary

DurableExecutionManager.recordCheckpointSequenceNumbers() unconditionally adds entries to checkpointIdToSeqNums, but notifyCheckpointComplete() only cleans them up when actionStateStore != null. When durable execution is disabled (actionStateStore == null), checkpoint entries accumulate indefinitely — a memory leak.

Root Cause

// Always records — no null guard
void recordCheckpointSequenceNumbers(long checkpointId, Map<Object, Long> seqNums) {
    checkpointIdToSeqNums.put(checkpointId, seqNums);
}

// Only cleans up when actionStateStore is non-null
void notifyCheckpointComplete(long checkpointId) {
    if (actionStateStore != null) {
        ...
        checkpointIdToSeqNums.remove(checkpointId);  // never reached when actionStateStore == null
    }
}

Fix

Add a checkpointIdToSeqNums.remove(checkpointId) call unconditionally in notifyCheckpointComplete, or guard recordCheckpointSequenceNumbers with an early return when actionStateStore == null.

Notes

How to reproduce

This is a structural code bug — no specific input data is needed to trigger it.

  1. Configure an agent job with durable execution disabled (i.e., do not set ActionStateStore — the default).
  2. Enable Flink checkpointing (e.g., env.enableCheckpointing(60_000)).
  3. Run the job and allow several checkpoints to complete.

Expected: checkpointIdToSeqNums stays empty (no entries needed when durable execution is off).
Actual: Each completed checkpoint adds an entry via recordCheckpointSequenceNumbers() that is never removed, because notifyCheckpointComplete() skips cleanup when actionStateStore == null. The map grows unbounded for the lifetime of the job.

Version and environment

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bug[Issue Type] Something isn't working as expected.priority/majorDefault priority of the PR or issue.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions