Skip to content

[runtime] Refactor ActionExecutionOperator into focused manager classes#546

Open
weiqingy wants to merge 14 commits intoapache:mainfrom
weiqingy:issue_545
Open

[runtime] Refactor ActionExecutionOperator into focused manager classes#546
weiqingy wants to merge 14 commits intoapache:mainfrom
weiqingy:issue_545

Conversation

@weiqingy
Copy link
Copy Markdown
Collaborator

@weiqingy weiqingy commented Feb 22, 2026

Linked issue: #545

Purpose of change

Decomposes ActionExecutionOperator (1,166 lines) into 5 package-private manager classes, each owning a single concern:

New Class Lines Responsibility
OperatorStateManager 225 Flink state handles (7 fields), sequence numbers, key-group filtering
DurableExecutionManager 285 ActionStateStore, persistence, recovery, checkpoint pruning (implements ActionStatePersister)
ActionTaskContextManager 215 Runner context creation, memory contexts, continuation executor, context transfer
EventRouter 175 Event wrapping/routing, notification (logger + listeners), watermark management
PythonBridgeManager 177 Python interpreter, action executor, resource adapters lifecycle

The operator shrinks from 1,166 to 562 lines (52% reduction) of pure coordination logic.

Key design decisions:

  • No manager-to-manager references — all cross-cutting dependencies flow through method parameters with the operator as mediator
  • sequenceNumberKState placed in OperatorStateManager (not DurableExecutionManager) because it's used in non-durable mode
  • ActionStatePersister interface moved from the operator to DurableExecutionManager
  • ResourceCache stays operator-owned — closed first before Python stack
  • Close order preserved: resourceCache → contextManager → pythonBridge → eventRouter → durableExecManager → super

Backward compatibility:

  • All new classes are package-private — no public API changes
  • Flink state descriptor names/types unchanged — savepoint compatible
  • ActionExecutionOperatorFactory constructor signature preserved

Tests

  • All 21 existing ActionExecutionOperatorTest tests pass as regression gates
  • ./tools/lint.sh -c — formatting compliance verified
  • ./tools/ut.sh -j — full Java test suite passes
  • ./tools/build.sh -j — full Java build passes

Test reflection accesses to actionStateStore (6 tests) and eventLogger (1 test) updated to use @VisibleForTesting getter chains instead of reflection on private fields.

No new tests added. This is a pure structural refactoring — every code path flows through ActionExecutionOperator which the existing integration tests exercise via KeyedOneInputStreamOperatorTestHarness. The managers are package-private and not independently consumable APIs.

API

No public API changes. All new classes are package-private.

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions Bot added priority/major Default priority of the PR or issue. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. doc-not-needed Your PR changes do not impact docs labels Feb 22, 2026
@xintongsong xintongsong requested a review from Sxnan February 24, 2026 09:32
weiqingy added 3 commits April 5, 2026 22:31
Move 7 Flink state fields, 2 constants, and state management methods
into a package-private OperatorStateManager class. The operator
delegates all state access through the manager.

Moved fields: actionTasksKState, pendingInputEventsKState,
currentProcessingKeysOpState, sequenceNumberKState, sensoryMemState,
shortTermMemState, jobIdentifier.

Part of apache#545.
Move action state persistence, recovery markers, checkpoint maps, and
durable/continuation context maps into a package-private
DurableExecutionManager class. The manager implements ActionStatePersister
(moved from the operator).

Moved fields: actionStateStore, recoveryMarkerOpState,
checkpointIdToSeqNums, actionTaskDurableContexts, continuationContexts,
pythonAwaitableRefs.

Test reflection accesses to actionStateStore updated to use
@VisibleForTesting getter chain.

Part of apache#545.
…idgeManager

Extract the remaining 3 manager classes from ActionExecutionOperator:

- ActionTaskContextManager: runner context creation, memory contexts,
  continuation executor
- EventRouter: event wrapping/routing, notification, watermarks, logging
- PythonBridgeManager: Python env, interpreter, executor, resource adapters

Test reflection for eventLogger updated to use @VisibleForTesting getter.

Part of apache#545.
@github-actions github-actions Bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Apr 6, 2026
Move all 7 state fields, 2 constants, and state methods from
ActionExecutionOperator into OperatorStateManager. All state access
now goes through stateManager delegation calls.

Add transferContexts() to ActionTaskContextManager to encapsulate
the context transfer logic for unfinished async action tasks.

Operator shrinks from 700 to 562 lines.

Part of apache#545.
@github-actions github-actions Bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Apr 6, 2026
@weiqingy
Copy link
Copy Markdown
Collaborator Author

weiqingy commented Apr 6, 2026

Hey @Sxnan Could you please take a look at this PR when you have a chance? Thank you!

OperatorStateManager was created in both initializeState() and open(),
causing the instance with jobIdentifier (set in initializeState) to be
overwritten by a fresh instance in open(). This passed null jobIdentifier
to PythonBridgeManager, causing VectorStoreLongTermMemory validation
failure and visit_count memory scoping issues in e2e tests.

Fix: create stateManager only in initializeState() (which runs first
in Flink lifecycle), then initialize state descriptors in open().

Part of apache#545.
Copy link
Copy Markdown
Collaborator

@wenjin272 wenjin272 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @weiqingy, LGTM, only one comment. Please take a look at your convenience @xintongsong.

actionTask.setRunnerContext(context);
}

Resource getResource(String name, ResourceType type, ResourceCache resourceCache) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method is never invoked.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — removed in 300a0fa. The method was dead code (callers go through RunnerContextImpl.getResource or ResourceCache directly), so I deleted it along with the now-unused Resource/ResourceType imports.

Addresses review feedback on PR apache#546: the getResource method was never
invoked. Callers go through RunnerContextImpl.getResource or directly
through ResourceCache. Also removes the now-unused Resource/ResourceType
imports.
@wenjin272
Copy link
Copy Markdown
Collaborator

LGTM. Please take a look at your convenience @xintongsong.

Copy link
Copy Markdown
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weiqingy Thanks for working on this. The refactor looks good to me in general. I just left a few comments.

In addition to the inline comments, I think we need more JavaDocs to explicitly describe the contract of each component interfaces, and unit tests to verify them. Currently, they are a bit vague relying on the names and implementations to understand.

getRuntimeContext().getJobInfo().getJobId(),
metricGroup,
this::checkMailboxThread,
stateManager.getJobIdentifier());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit against intuitive that job id is fetched from the state manager. I think it should be maintained in the operator.

}
}

ActionState maybeGetActionState(Object key, long sequenceNum, Action action, Event event)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to return an Optional or mark @Nullable for methods that may return null.


private transient ResourceCache resourceCache;

private final Boolean inputIsJava;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never used.

Comment on lines +60 to +61
private final Map<ActionTask, ContinuationContext> continuationContexts;
private final Map<ActionTask, String> pythonAwaitableRefs;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 maps might belong to ActionTaskContextManager. They are not related to durable execution. In this class, they are only initialized, and used in some one-line getter/setters which are all used by ActionTaskContextManager.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also leads to the violation of "no manager-to-manager references" design goal.

Address xintongsong's review feedback (apache#546 comments apache#6, apache#7): the
continuationContexts and pythonAwaitableRefs maps are runner-context
concerns, not durable-execution concerns, and their placement in
DurableExecutionManager forced ActionTaskContextManager to hold a
manager-to-manager reference forbidden by 545-DESIGN.md.

Move both maps and their accessors into ActionTaskContextManager.
actionTaskDurableContexts stays in DEM since DEM consumes it via
setupDurableExecutionContext. Drop the DEM parameter from
createAndSetRunnerContext (no longer needed); transferContexts keeps
the DEM parameter for putDurableContext only.
Address xintongsong's review feedback (apache#546 comments apache#3, apache#5):

apache#5 — `inputIsJava` field on the operator was never read; the constructor
parameter is forwarded directly to EventRouter. Remove the field; keep
the constructor parameter.

apache#3 — `jobIdentifier` is a runtime identity, not a Flink state descriptor.
OperatorStateManager exists to own keyed/operator state; identity belongs
on the operator. Move the `jobIdentifier` field, its initialization
(formerly `OperatorStateManager.initJobIdentifier`), and the accessor
into ActionExecutionOperator. Inline the init body into the operator's
`initializeState()`. Preserve the savepoint state descriptor name
`"identifier_state"` exactly to retain compatibility with existing
savepoints.
Address xintongsong's review feedback (apache#546 comment apache#4): "I'd suggest
to return an Optional or mark @nullable for methods that may return
null."

Apply javax.annotation.Nullable across all 5 managers (consistent with
the existing @nullable ActionStateStore parameter style — Optional
would be inconsistent with the codebase). 16 methods total: getter/poll
methods backed by Map.get/Map.remove/poll-style state operations, plus
fields that are null until a separate lifecycle phase initializes them
(e.g., Python bridge components in no-Python jobs, lazily-set state
descriptors).
Address xintongsong's review feedback (apache#546 top-level): "we need more
JavaDocs to explicitly describe the contract of each component
interfaces... Currently they are a bit vague relying on the names and
implementations to understand."

Add class-level Javadoc to all 5 managers (OperatorStateManager,
DurableExecutionManager, ActionTaskContextManager, EventRouter,
PythonBridgeManager) covering responsibility, owned state, lifecycle
(when constructed, when init/open methods run, when close runs), and
the design constraint that managers do not hold references to one
another — cross-cutting data flows via method parameters with the
operator as mediator.

Add method-level Javadoc to contract-defining methods only — state
initialization, sequence-number semantics, poll semantics, snapshot
and recovery hooks, durable-mode no-op behavior, Java vs Python
branching, watermark draining, and the close ordering inside the
Python bridge.
Address xintongsong's review feedback (apache#546 top-level): "unit tests to
verify them. Currently they are a bit vague relying on the names and
implementations to understand."

Add focused contract tests covering the core promise of each manager:

- DurableExecutionManagerTest (2): no-store mode is a silent no-op for
  every maybe* method; an injected store correctly receives a finished
  task's persisted result with isCompleted() set.
- ActionTaskContextManagerTest (2): per-task maps remain isolated under
  put/get/remove cycles; createOrGetRunnerContext throws
  IllegalStateException with a helpful message when asked for a Python
  context but pythonRunnerContext is null.
- EventRouterTest (2): wrapToInputEvent passes Java input through
  unchanged; getActionsTriggeredBy dispatches by event-class name.
- PythonBridgeManagerTest (1): open() is a no-op when the agent plan
  contains neither Python actions nor Python resources.

OperatorStateManager is left to integration coverage from
ActionExecutionOperatorTest (21 cases) — a dedicated harness-based
test is tracked as follow-up. Comprehensive coverage (snapshot/recovery,
listener notification, watermark draining, full memory-context
lifecycle) is also follow-up.
@weiqingy
Copy link
Copy Markdown
Collaborator Author

weiqingy commented Apr 29, 2026

Thanks for reviewing, @xintongsong . Pushed 300a0fa..e2e4e03 to address all the comments:

  • "job id should be maintained in the operator" — moved jobIdentifier from OperatorStateManager onto ActionExecutionOperator. Savepoint name unchanged. → 1139b9c
  • "@Nullable for methods that may return null" — applied @Nullable (consistent with the existing @Nullable ActionStateStore) across all 5 managers, 16 methods. → 85df684
  • "inputIsJava never used" — removed the field; constructor param is forwarded directly to EventRouter. → 1139b9c
  • "these 2 maps belong to ActionTaskContextManager / no manager-to-manager references" — moved continuationContexts and pythonAwaitableRefs to ActionTaskContextManager. actionTaskDurableContexts stays in DurableExecutionManager (consumed there). createAndSetRunnerContext no longer takes DurableExecutionManager. → ab0c581
  • "more JavaDocs to describe the contract of each component" — added class-level Javadoc on each manager covering responsibilities, owned state, lifecycle, and the no-cross-reference constraint. → 6991b99
  • "unit tests to verify them" — added focused contract tests per manager (4 files, 7 tests). Heavier coverage (Flink-harness OperatorStateManager tests, full snapshot/recovery, watermark draining, listener notification) tracked as a follow-up to keep this PR reviewable. → e2e4e03

PTAL when you have a moment.

Copy link
Copy Markdown
Collaborator

@wenjin272 wenjin272 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @weiqingy. I left a few minor comments, mainly regarding code style and doc.

Additionally, I found a bug that existed before the refactoring during my review. I noticed that you plan to add integration tests in a separate PR; we can fix this issue and add test verification in that PR.

for (Map.Entry<Object, Long> entry : keyToSeqNum.entrySet()) {
actionStateStore.pruneState(entry.getKey(), entry.getValue());
}
checkpointIdToSeqNums.remove(checkpointId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that checkpointIdToSeqNums is only cleaned up when actionStateStore is not null, but there is no check for whether actionStateStore is null in recordCheckpointSequenceNumbers. This seems likely to cause a memory leak.

I checked, and this issue existed before the refactoring. We can open a separate PR to fix it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — confirmed, checkpointIdToSeqNums entries will accumulate without cleanup when durable execution is disabled. This is pre-existing and orthogonal to the current refactoring, so we'll track it in a separate PR as suggested. Thanks for the thorough review.

}

@Nullable
SegmentedQueue getKeySegmentQueue() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the @Nullable annotation here necessary?

For methods like maybeGetActionState and pollNextPendingInputEvent, I think the Nullable annotation is appropriate: It may return null when called, and the caller will perform a check. But for methods like getKeySegmentQueue and getSensoryMemState, they are initialized in the open method of ActionExecutionOperator, and all accesses occur after open.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed @Nullable from getKeySegmentQueue(), getReusedStreamRecord() (EventRouter), getSensoryMemState(), and getShortTermMemState() (OperatorStateManager) in 3699f14. These fields are unconditionally initialized in open() before any caller can reach them. Kept @Nullable where the return value genuinely can be null: getEventLogger() (config-dependent) and the pollNext* methods (poll from list-state returns null when empty).

@@ -121,54 +72,25 @@
* and the resulting output event is collected for further processing.
*/
public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are initialized in the Open method of ActionExecutionOperator, and all calls occur after Open.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR.

getKeySegmentQueue() and getReusedStreamRecord() in EventRouter are
initialized unconditionally in open() before any data flows through.
Similarly, getSensoryMemState() and getShortTermMemState() in
OperatorStateManager are initialized before callers access them.
@nullable on these methods is misleading — retain it only where the
return value genuinely can be null at call time (getEventLogger,
pollNextActionTask, pollNextPendingInputEvent).

Addresses wenjin272's review comments r3165417289 + r3165421647.
@weiqingy
Copy link
Copy Markdown
Collaborator Author

weiqingy commented May 1, 2026

Created separate issues to follow up on the items deferred from this PR:

Will follow up on them separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants