@@ -14,12 +14,13 @@ import {
1414 safeDeserialize ,
1515} from "../../errors/serdes-errors/serdes-errors" ;
1616import { OperationInterceptor } from "../../mocks/operation-interceptor" ;
17+ import { waitBeforeContinue } from "../../utils/wait-before-continue/wait-before-continue" ;
1718
1819export const createInvokeHandler = (
1920 context : ExecutionContext ,
2021 checkpoint : ReturnType < typeof createCheckpoint > ,
2122 createStepId : ( ) => string ,
22- _hasRunningOperations : ( ) => boolean ,
23+ hasRunningOperations : ( ) => boolean ,
2324) : {
2425 < I , O > ( funcId : string , input : I , config ?: InvokeConfig < I , O > ) : Promise < O > ;
2526 < I , O > (
@@ -58,88 +59,105 @@ export const createInvokeHandler = (
5859
5960 log ( context . isVerbose , "🔗" , `Invoke ${ name || funcId } (${ stepId } )` ) ;
6061
61- // Check if we have existing step data
62- const stepData = context . getStepData ( stepId ) ;
62+ // Main invoke logic - can be re-executed if step status changes
63+ while ( true ) {
64+ // Check if we have existing step data
65+ const stepData = context . getStepData ( stepId ) ;
6366
64- if ( stepData ?. Status === OperationStatus . SUCCEEDED ) {
65- // Return cached result - no need to check for errors in successful operations
66- const invokeDetails = stepData . InvokeDetails ;
67- return await safeDeserialize (
68- config ?. resultSerdes || defaultSerdes ,
69- invokeDetails ?. Result ,
70- stepId ,
71- name ,
72- context . terminationManager ,
73- context . isVerbose ,
74- context . durableExecutionArn ,
75- ) ;
76- }
67+ if ( stepData ?. Status === OperationStatus . SUCCEEDED ) {
68+ // Return cached result - no need to check for errors in successful operations
69+ const invokeDetails = stepData . InvokeDetails ;
70+ return await safeDeserialize (
71+ config ?. resultSerdes || defaultSerdes ,
72+ invokeDetails ?. Result ,
73+ stepId ,
74+ name ,
75+ context . terminationManager ,
76+ context . isVerbose ,
77+ context . durableExecutionArn ,
78+ ) ;
79+ }
7780
78- if ( stepData ?. Status === OperationStatus . FAILED ) {
79- // Operation failed, throw error
80- const invokeDetails = stepData . InvokeDetails ;
81- const error = new Error (
82- invokeDetails ?. Error ?. ErrorMessage || "Invoke failed" ,
83- ) ;
84- error . name = invokeDetails ?. Error ?. ErrorType || "InvokeError" ;
85- throw error ;
86- }
81+ if ( stepData ?. Status === OperationStatus . FAILED ) {
82+ // Operation failed, throw error
83+ const invokeDetails = stepData . InvokeDetails ;
84+ const error = new Error (
85+ invokeDetails ?. Error ?. ErrorMessage || "Invoke failed" ,
86+ ) ;
87+ error . name = invokeDetails ?. Error ?. ErrorType || "InvokeError" ;
88+ throw error ;
89+ }
8790
88- if ( stepData ?. Status === OperationStatus . STARTED ) {
89- // Operation is still running, terminate and wait for completion
90- // It's a temporary solution until we implement more sopesticated solution
91- log (
92- context . isVerbose ,
93- "⏳" ,
94- `Invoke ${ name || funcId } still in progress, terminating` ,
95- ) ;
96- return terminate ( context , TerminationReason . OPERATION_TERMINATED , stepId ) ;
97- }
91+ if ( stepData ?. Status === OperationStatus . STARTED ) {
92+ // Operation is still running, check for other operations before terminating
93+ if ( hasRunningOperations ( ) ) {
94+ log (
95+ context . isVerbose ,
96+ "⏳" ,
97+ `Invoke ${ name || funcId } still in progress, waiting for other operations` ,
98+ ) ;
99+ await waitBeforeContinue ( {
100+ checkHasRunningOperations : true ,
101+ checkStepStatus : true ,
102+ checkTimer : false ,
103+ stepId,
104+ context,
105+ hasRunningOperations,
106+ } ) ;
107+ continue ; // Re-evaluate status after waiting
108+ }
109+
110+ // No other operations running, safe to terminate
111+ log (
112+ context . isVerbose ,
113+ "⏳" ,
114+ `Invoke ${ name || funcId } still in progress, terminating` ,
115+ ) ;
116+ return terminate ( context , TerminationReason . OPERATION_TERMINATED , stepId ) ;
117+ }
98118
99- // Execute with potential interception (testing)
100- const result = await OperationInterceptor . forExecution (
101- context . durableExecutionArn ,
102- ) . execute ( name , async ( ) : Promise < O > => {
103- // Serialize the input payload
104- const serializedPayload = await safeSerialize (
105- config ?. payloadSerdes || defaultSerdes ,
106- input ,
107- stepId ,
108- name ,
109- context . terminationManager ,
110- context . isVerbose ,
119+ // Execute with potential interception (testing)
120+ await OperationInterceptor . forExecution (
111121 context . durableExecutionArn ,
112- ) ;
122+ ) . execute ( name , async ( ) : Promise < void > => {
123+ // Serialize the input payload
124+ const serializedPayload = await safeSerialize (
125+ config ?. payloadSerdes || defaultSerdes ,
126+ input ,
127+ stepId ,
128+ name ,
129+ context . terminationManager ,
130+ context . isVerbose ,
131+ context . durableExecutionArn ,
132+ ) ;
113133
114- // Create checkpoint for the invoke operation
115- await checkpoint ( stepId , {
116- Id : stepId ,
117- ParentId : context . parentId ,
118- Action : OperationAction . START ,
119- SubType : OperationSubType . INVOKE ,
120- Type : OperationType . INVOKE ,
121- Name : name ,
122- Payload : serializedPayload ,
123- InvokeOptions : {
124- FunctionName : funcId ,
125- ...( config ?. timeoutSeconds && {
126- TimeoutSeconds : config . timeoutSeconds ,
127- } ) ,
128- } ,
129- } ) ;
130-
131- log (
132- context . isVerbose ,
133- "🚀" ,
134- `Invoke ${ name || funcId } started, terminating for async execution` ,
135- ) ;
134+ // Create checkpoint for the invoke operation
135+ await checkpoint ( stepId , {
136+ Id : stepId ,
137+ ParentId : context . parentId ,
138+ Action : OperationAction . START ,
139+ SubType : OperationSubType . INVOKE ,
140+ Type : OperationType . INVOKE ,
141+ Name : name ,
142+ Payload : serializedPayload ,
143+ InvokeOptions : {
144+ FunctionName : funcId ,
145+ ...( config ?. timeoutSeconds && {
146+ TimeoutSeconds : config . timeoutSeconds ,
147+ } ) ,
148+ } ,
149+ } ) ;
136150
137- // Terminate to allow the invoke to execute asynchronously
138- // It's a temporary solution until we implement more sopesticated solution
139- return terminate ( context , TerminationReason . OPERATION_TERMINATED , stepId ) ;
140- } ) ;
151+ log (
152+ context . isVerbose ,
153+ "🚀" ,
154+ `Invoke ${ name || funcId } started, re-checking status` ,
155+ ) ;
156+ } ) ;
141157
142- return result ;
158+ // Continue the loop to re-evaluate status (will hit STARTED case)
159+ continue ;
160+ }
143161 }
144162
145163 return invokeHandler ;
0 commit comments