[runtime] Refactor ActionExecutionOperator into focused manager classes#546
[runtime] Refactor ActionExecutionOperator into focused manager classes#546weiqingy wants to merge 14 commits intoapache:mainfrom
Conversation
Move 7 Flink state fields, 2 constants, and state management methods into a package-private OperatorStateManager class. The operator delegates all state access through the manager. Moved fields: actionTasksKState, pendingInputEventsKState, currentProcessingKeysOpState, sequenceNumberKState, sensoryMemState, shortTermMemState, jobIdentifier. Part of apache#545.
Move action state persistence, recovery markers, checkpoint maps, and durable/continuation context maps into a package-private DurableExecutionManager class. The manager implements ActionStatePersister (moved from the operator). Moved fields: actionStateStore, recoveryMarkerOpState, checkpointIdToSeqNums, actionTaskDurableContexts, continuationContexts, pythonAwaitableRefs. Test reflection accesses to actionStateStore updated to use @VisibleForTesting getter chain. Part of apache#545.
…idgeManager Extract the remaining 3 manager classes from ActionExecutionOperator: - ActionTaskContextManager: runner context creation, memory contexts, continuation executor - EventRouter: event wrapping/routing, notification, watermarks, logging - PythonBridgeManager: Python env, interpreter, executor, resource adapters Test reflection for eventLogger updated to use @VisibleForTesting getter. Part of apache#545.
Move all 7 state fields, 2 constants, and state methods from ActionExecutionOperator into OperatorStateManager. All state access now goes through stateManager delegation calls. Add transferContexts() to ActionTaskContextManager to encapsulate the context transfer logic for unfinished async action tasks. Operator shrinks from 700 to 562 lines. Part of apache#545.
|
Hey @Sxnan Could you please take a look at this PR when you have a chance? Thank you! |
OperatorStateManager was created in both initializeState() and open(), causing the instance with jobIdentifier (set in initializeState) to be overwritten by a fresh instance in open(). This passed null jobIdentifier to PythonBridgeManager, causing VectorStoreLongTermMemory validation failure and visit_count memory scoping issues in e2e tests. Fix: create stateManager only in initializeState() (which runs first in Flink lifecycle), then initialize state descriptors in open(). Part of apache#545.
There was a problem hiding this comment.
Hi, @weiqingy, LGTM, only one comment. Please take a look at your convenience @xintongsong.
| actionTask.setRunnerContext(context); | ||
| } | ||
|
|
||
| Resource getResource(String name, ResourceType type, ResourceCache resourceCache) { |
There was a problem hiding this comment.
Looks like this method is never invoked.
There was a problem hiding this comment.
Good catch — removed in 300a0fa. The method was dead code (callers go through RunnerContextImpl.getResource or ResourceCache directly), so I deleted it along with the now-unused Resource/ResourceType imports.
Addresses review feedback on PR apache#546: the getResource method was never invoked. Callers go through RunnerContextImpl.getResource or directly through ResourceCache. Also removes the now-unused Resource/ResourceType imports.
|
LGTM. Please take a look at your convenience @xintongsong. |
There was a problem hiding this comment.
@weiqingy Thanks for working on this. The refactor looks good to me in general. I just left a few comments.
In addition to the inline comments, I think we need more JavaDocs to explicitly describe the contract of each component interfaces, and unit tests to verify them. Currently, they are a bit vague relying on the names and implementations to understand.
| getRuntimeContext().getJobInfo().getJobId(), | ||
| metricGroup, | ||
| this::checkMailboxThread, | ||
| stateManager.getJobIdentifier()); |
There was a problem hiding this comment.
It's a bit against intuitive that job id is fetched from the state manager. I think it should be maintained in the operator.
| } | ||
| } | ||
|
|
||
| ActionState maybeGetActionState(Object key, long sequenceNum, Action action, Event event) |
There was a problem hiding this comment.
I'd suggest to return an Optional or mark @Nullable for methods that may return null.
|
|
||
| private transient ResourceCache resourceCache; | ||
|
|
||
| private final Boolean inputIsJava; |
| private final Map<ActionTask, ContinuationContext> continuationContexts; | ||
| private final Map<ActionTask, String> pythonAwaitableRefs; |
There was a problem hiding this comment.
These 2 maps might belong to ActionTaskContextManager. They are not related to durable execution. In this class, they are only initialized, and used in some one-line getter/setters which are all used by ActionTaskContextManager.
There was a problem hiding this comment.
This also leads to the violation of "no manager-to-manager references" design goal.
Address xintongsong's review feedback (apache#546 comments apache#6, apache#7): the continuationContexts and pythonAwaitableRefs maps are runner-context concerns, not durable-execution concerns, and their placement in DurableExecutionManager forced ActionTaskContextManager to hold a manager-to-manager reference forbidden by 545-DESIGN.md. Move both maps and their accessors into ActionTaskContextManager. actionTaskDurableContexts stays in DEM since DEM consumes it via setupDurableExecutionContext. Drop the DEM parameter from createAndSetRunnerContext (no longer needed); transferContexts keeps the DEM parameter for putDurableContext only.
Address xintongsong's review feedback (apache#546 comments apache#3, apache#5): apache#5 — `inputIsJava` field on the operator was never read; the constructor parameter is forwarded directly to EventRouter. Remove the field; keep the constructor parameter. apache#3 — `jobIdentifier` is a runtime identity, not a Flink state descriptor. OperatorStateManager exists to own keyed/operator state; identity belongs on the operator. Move the `jobIdentifier` field, its initialization (formerly `OperatorStateManager.initJobIdentifier`), and the accessor into ActionExecutionOperator. Inline the init body into the operator's `initializeState()`. Preserve the savepoint state descriptor name `"identifier_state"` exactly to retain compatibility with existing savepoints.
Address xintongsong's review feedback (apache#546 comment apache#4): "I'd suggest to return an Optional or mark @nullable for methods that may return null." Apply javax.annotation.Nullable across all 5 managers (consistent with the existing @nullable ActionStateStore parameter style — Optional would be inconsistent with the codebase). 16 methods total: getter/poll methods backed by Map.get/Map.remove/poll-style state operations, plus fields that are null until a separate lifecycle phase initializes them (e.g., Python bridge components in no-Python jobs, lazily-set state descriptors).
Address xintongsong's review feedback (apache#546 top-level): "we need more JavaDocs to explicitly describe the contract of each component interfaces... Currently they are a bit vague relying on the names and implementations to understand." Add class-level Javadoc to all 5 managers (OperatorStateManager, DurableExecutionManager, ActionTaskContextManager, EventRouter, PythonBridgeManager) covering responsibility, owned state, lifecycle (when constructed, when init/open methods run, when close runs), and the design constraint that managers do not hold references to one another — cross-cutting data flows via method parameters with the operator as mediator. Add method-level Javadoc to contract-defining methods only — state initialization, sequence-number semantics, poll semantics, snapshot and recovery hooks, durable-mode no-op behavior, Java vs Python branching, watermark draining, and the close ordering inside the Python bridge.
Address xintongsong's review feedback (apache#546 top-level): "unit tests to verify them. Currently they are a bit vague relying on the names and implementations to understand." Add focused contract tests covering the core promise of each manager: - DurableExecutionManagerTest (2): no-store mode is a silent no-op for every maybe* method; an injected store correctly receives a finished task's persisted result with isCompleted() set. - ActionTaskContextManagerTest (2): per-task maps remain isolated under put/get/remove cycles; createOrGetRunnerContext throws IllegalStateException with a helpful message when asked for a Python context but pythonRunnerContext is null. - EventRouterTest (2): wrapToInputEvent passes Java input through unchanged; getActionsTriggeredBy dispatches by event-class name. - PythonBridgeManagerTest (1): open() is a no-op when the agent plan contains neither Python actions nor Python resources. OperatorStateManager is left to integration coverage from ActionExecutionOperatorTest (21 cases) — a dedicated harness-based test is tracked as follow-up. Comprehensive coverage (snapshot/recovery, listener notification, watermark draining, full memory-context lifecycle) is also follow-up.
|
Thanks for reviewing, @xintongsong . Pushed
PTAL when you have a moment. |
wenjin272
left a comment
There was a problem hiding this comment.
Hi, @weiqingy. I left a few minor comments, mainly regarding code style and doc.
Additionally, I found a bug that existed before the refactoring during my review. I noticed that you plan to add integration tests in a separate PR; we can fix this issue and add test verification in that PR.
| for (Map.Entry<Object, Long> entry : keyToSeqNum.entrySet()) { | ||
| actionStateStore.pruneState(entry.getKey(), entry.getValue()); | ||
| } | ||
| checkpointIdToSeqNums.remove(checkpointId); |
There was a problem hiding this comment.
It appears that checkpointIdToSeqNums is only cleaned up when actionStateStore is not null, but there is no check for whether actionStateStore is null in recordCheckpointSequenceNumbers. This seems likely to cause a memory leak.
I checked, and this issue existed before the refactoring. We can open a separate PR to fix it.
There was a problem hiding this comment.
Good catch — confirmed, checkpointIdToSeqNums entries will accumulate without cleanup when durable execution is disabled. This is pre-existing and orthogonal to the current refactoring, so we'll track it in a separate PR as suggested. Thanks for the thorough review.
| } | ||
|
|
||
| @Nullable | ||
| SegmentedQueue getKeySegmentQueue() { |
There was a problem hiding this comment.
Is the @Nullable annotation here necessary?
For methods like maybeGetActionState and pollNextPendingInputEvent, I think the Nullable annotation is appropriate: It may return null when called, and the caller will perform a check. But for methods like getKeySegmentQueue and getSensoryMemState, they are initialized in the open method of ActionExecutionOperator, and all accesses occur after open.
There was a problem hiding this comment.
Removed @Nullable from getKeySegmentQueue(), getReusedStreamRecord() (EventRouter), getSensoryMemState(), and getShortTermMemState() (OperatorStateManager) in 3699f14. These fields are unconditionally initialized in open() before any caller can reach them. Kept @Nullable where the return value genuinely can be null: getEventLogger() (config-dependent) and the pollNext* methods (poll from list-state returns null when empty).
| @@ -121,54 +72,25 @@ | |||
| * and the resulting output event is collected for further processing. | |||
| */ | |||
| public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT> | |||
There was a problem hiding this comment.
They are initialized in the Open method of ActionExecutionOperator, and all calls occur after Open.
getKeySegmentQueue() and getReusedStreamRecord() in EventRouter are initialized unconditionally in open() before any data flows through. Similarly, getSensoryMemState() and getShortTermMemState() in OperatorStateManager are initialized before callers access them. @nullable on these methods is misleading — retain it only where the return value genuinely can be null at call time (getEventLogger, pollNextActionTask, pollNextPendingInputEvent). Addresses wenjin272's review comments r3165417289 + r3165421647.
|
Created separate issues to follow up on the items deferred from this PR:
Will follow up on them separately. |
Linked issue: #545
Purpose of change
Decomposes
ActionExecutionOperator(1,166 lines) into 5 package-private manager classes, each owning a single concern:OperatorStateManagerDurableExecutionManagerActionStatePersister)ActionTaskContextManagerEventRouterPythonBridgeManagerThe operator shrinks from 1,166 to 562 lines (52% reduction) of pure coordination logic.
Key design decisions:
sequenceNumberKStateplaced in OperatorStateManager (not DurableExecutionManager) because it's used in non-durable modeActionStatePersisterinterface moved from the operator toDurableExecutionManagerResourceCachestays operator-owned — closed first before Python stackBackward compatibility:
ActionExecutionOperatorFactoryconstructor signature preservedTests
ActionExecutionOperatorTesttests pass as regression gates./tools/lint.sh -c— formatting compliance verified./tools/ut.sh -j— full Java test suite passes./tools/build.sh -j— full Java build passesTest reflection accesses to
actionStateStore(6 tests) andeventLogger(1 test) updated to use@VisibleForTestinggetter chains instead of reflection on private fields.No new tests added. This is a pure structural refactoring — every code path flows through
ActionExecutionOperatorwhich the existing integration tests exercise viaKeyedOneInputStreamOperatorTestHarness. The managers are package-private and not independently consumable APIs.API
No public API changes. All new classes are package-private.
Documentation
doc-neededdoc-not-neededdoc-included