Skip to content

Commit 5e8ae84

Browse files
committed
fix(sdk): enhance invoke handler to wait for running operations (#88)
fix(sdk): enhance invoke handler to wait for running operations before terminating - Wrap invoke logic in while loop to allow re-evaluation of step status - Check for running operations before terminating STARTED invokes - Use waitBeforeContinue utility when other operations are running - Update unit tests to cover new waiting behavior and while loop logic - Add test for scenario where invoke waits for other operations - Remove outdated intercepted execution test that didn't fit new pattern
1 parent 27aa8b1 commit 5e8ae84

File tree

2 files changed

+156
-99
lines changed

2 files changed

+156
-99
lines changed

packages/aws-durable-execution-sdk-js/src/handlers/invoke-handler/invoke-handler.test.ts

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ jest.mock("../../utils/termination-helper");
1414
jest.mock("../../mocks/operation-interceptor");
1515
jest.mock("../../utils/logger/logger");
1616
jest.mock("../../errors/serdes-errors/serdes-errors");
17+
jest.mock("../../utils/wait-before-continue/wait-before-continue");
1718

1819
import { terminate } from "../../utils/termination-helper";
1920
import { log } from "../../utils/logger/logger";
2021
import {
2122
safeSerialize,
2223
safeDeserialize,
2324
} from "../../errors/serdes-errors/serdes-errors";
25+
import { waitBeforeContinue } from "../../utils/wait-before-continue/wait-before-continue";
2426

2527
const mockTerminate = terminate as jest.MockedFunction<typeof terminate>;
2628
const mockLog = log as jest.MockedFunction<typeof log>;
@@ -30,6 +32,9 @@ const mockSafeSerialize = safeSerialize as jest.MockedFunction<
3032
const mockSafeDeserialize = safeDeserialize as jest.MockedFunction<
3133
typeof safeDeserialize
3234
>;
35+
const mockWaitBeforeContinue = waitBeforeContinue as jest.MockedFunction<
36+
typeof waitBeforeContinue
37+
>;
3338

3439
describe("InvokeHandler", () => {
3540
let mockContext: ExecutionContext;
@@ -237,6 +242,7 @@ describe("InvokeHandler", () => {
237242
});
238243

239244
mockContext.getStepData = mockGetStepData;
245+
mockHasRunningOperations.mockReturnValue(false); // No other operations running
240246

241247
const invokeHandler = createInvokeHandler(
242248
mockContext,
@@ -261,7 +267,49 @@ describe("InvokeHandler", () => {
261267
);
262268
});
263269

270+
it("should wait when operation is in progress and other operations are running", async () => {
271+
const mockGetStepData = jest.fn()
272+
.mockReturnValueOnce({ Status: OperationStatus.STARTED })
273+
.mockReturnValueOnce({ Status: OperationStatus.SUCCEEDED, InvokeDetails: { Result: '{"result":"success"}' } });
274+
275+
mockContext.getStepData = mockGetStepData;
276+
mockHasRunningOperations.mockReturnValue(true); // Other operations running
277+
mockWaitBeforeContinue.mockResolvedValue({ reason: "status" });
278+
mockSafeDeserialize.mockResolvedValue({ result: "success" });
279+
280+
const invokeHandler = createInvokeHandler(
281+
mockContext,
282+
mockCheckpointFn,
283+
mockCreateStepId,
284+
mockHasRunningOperations,
285+
);
286+
287+
const result = await invokeHandler("test-function", { test: "data" });
288+
289+
expect(result).toEqual({ result: "success" });
290+
expect(mockLog).toHaveBeenCalledWith(
291+
true,
292+
"⏳",
293+
"Invoke test-function still in progress, waiting for other operations",
294+
);
295+
expect(mockWaitBeforeContinue).toHaveBeenCalledWith({
296+
checkHasRunningOperations: true,
297+
checkStepStatus: true,
298+
checkTimer: false,
299+
stepId: "test-step-1",
300+
context: mockContext,
301+
hasRunningOperations: mockHasRunningOperations,
302+
});
303+
});
304+
264305
it("should create checkpoint and terminate for new invoke without name", async () => {
306+
const mockGetStepData = jest.fn()
307+
.mockReturnValueOnce(undefined) // First call - no step data
308+
.mockReturnValueOnce({ Status: OperationStatus.STARTED }); // After checkpoint
309+
310+
mockContext.getStepData = mockGetStepData;
311+
mockHasRunningOperations.mockReturnValue(false); // No other operations running
312+
265313
mockOperationInterceptor.execute.mockImplementation(
266314
async (name: any, fn: any) => {
267315
return await fn();
@@ -305,11 +353,18 @@ describe("InvokeHandler", () => {
305353
expect(mockLog).toHaveBeenCalledWith(
306354
true,
307355
"🚀",
308-
"Invoke test-function started, terminating for async execution",
356+
"Invoke test-function started, re-checking status",
309357
);
310358
});
311359

312360
it("should create checkpoint and terminate for new invoke with name", async () => {
361+
const mockGetStepData = jest.fn()
362+
.mockReturnValueOnce(undefined) // First call - no step data
363+
.mockReturnValueOnce({ Status: OperationStatus.STARTED }); // After checkpoint
364+
365+
mockContext.getStepData = mockGetStepData;
366+
mockHasRunningOperations.mockReturnValue(false); // No other operations running
367+
313368
mockOperationInterceptor.execute.mockImplementation(
314369
async (name: any, fn: any) => {
315370
return await fn();
@@ -342,6 +397,13 @@ describe("InvokeHandler", () => {
342397
});
343398

344399
it("should handle invoke with options", async () => {
400+
const mockGetStepData = jest.fn()
401+
.mockReturnValueOnce(undefined) // First call - no step data
402+
.mockReturnValueOnce({ Status: OperationStatus.STARTED }); // After checkpoint
403+
404+
mockContext.getStepData = mockGetStepData;
405+
mockHasRunningOperations.mockReturnValue(false); // No other operations running
406+
345407
mockOperationInterceptor.execute.mockImplementation(
346408
async (name: any, fn: any) => {
347409
return await fn();
@@ -381,28 +443,5 @@ describe("InvokeHandler", () => {
381443
},
382444
});
383445
});
384-
385-
it("should handle intercepted execution", async () => {
386-
const interceptedResult = { intercepted: true };
387-
mockOperationInterceptor.execute.mockResolvedValue(interceptedResult);
388-
389-
const invokeHandler = createInvokeHandler(
390-
mockContext,
391-
mockCheckpointFn,
392-
mockCreateStepId,
393-
mockHasRunningOperations,
394-
);
395-
396-
const result = await invokeHandler("test-function", { test: "data" });
397-
398-
expect(result).toBe(interceptedResult);
399-
expect(OperationInterceptor.forExecution).toHaveBeenCalledWith(
400-
"test-arn",
401-
);
402-
expect(mockOperationInterceptor.execute).toHaveBeenCalledWith(
403-
undefined,
404-
expect.any(Function),
405-
);
406-
});
407446
});
408447
});

packages/aws-durable-execution-sdk-js/src/handlers/invoke-handler/invoke-handler.ts

Lines changed: 93 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ import {
1414
safeDeserialize,
1515
} from "../../errors/serdes-errors/serdes-errors";
1616
import { OperationInterceptor } from "../../mocks/operation-interceptor";
17+
import { waitBeforeContinue } from "../../utils/wait-before-continue/wait-before-continue";
1718

1819
export const createInvokeHandler = (
1920
context: ExecutionContext,
2021
checkpoint: ReturnType<typeof createCheckpoint>,
2122
createStepId: () => string,
22-
_hasRunningOperations: () => boolean,
23+
hasRunningOperations: () => boolean,
2324
): {
2425
<I, O>(funcId: string, input: I, config?: InvokeConfig<I, O>): Promise<O>;
2526
<I, O>(
@@ -58,88 +59,105 @@ export const createInvokeHandler = (
5859

5960
log(context.isVerbose, "🔗", `Invoke ${name || funcId} (${stepId})`);
6061

61-
// Check if we have existing step data
62-
const stepData = context.getStepData(stepId);
62+
// Main invoke logic - can be re-executed if step status changes
63+
while (true) {
64+
// Check if we have existing step data
65+
const stepData = context.getStepData(stepId);
6366

64-
if (stepData?.Status === OperationStatus.SUCCEEDED) {
65-
// Return cached result - no need to check for errors in successful operations
66-
const invokeDetails = stepData.InvokeDetails;
67-
return await safeDeserialize(
68-
config?.resultSerdes || defaultSerdes,
69-
invokeDetails?.Result,
70-
stepId,
71-
name,
72-
context.terminationManager,
73-
context.isVerbose,
74-
context.durableExecutionArn,
75-
);
76-
}
67+
if (stepData?.Status === OperationStatus.SUCCEEDED) {
68+
// Return cached result - no need to check for errors in successful operations
69+
const invokeDetails = stepData.InvokeDetails;
70+
return await safeDeserialize(
71+
config?.resultSerdes || defaultSerdes,
72+
invokeDetails?.Result,
73+
stepId,
74+
name,
75+
context.terminationManager,
76+
context.isVerbose,
77+
context.durableExecutionArn,
78+
);
79+
}
7780

78-
if (stepData?.Status === OperationStatus.FAILED) {
79-
// Operation failed, throw error
80-
const invokeDetails = stepData.InvokeDetails;
81-
const error = new Error(
82-
invokeDetails?.Error?.ErrorMessage || "Invoke failed",
83-
);
84-
error.name = invokeDetails?.Error?.ErrorType || "InvokeError";
85-
throw error;
86-
}
81+
if (stepData?.Status === OperationStatus.FAILED) {
82+
// Operation failed, throw error
83+
const invokeDetails = stepData.InvokeDetails;
84+
const error = new Error(
85+
invokeDetails?.Error?.ErrorMessage || "Invoke failed",
86+
);
87+
error.name = invokeDetails?.Error?.ErrorType || "InvokeError";
88+
throw error;
89+
}
8790

88-
if (stepData?.Status === OperationStatus.STARTED) {
89-
// Operation is still running, terminate and wait for completion
90-
// It's a temporary solution until we implement more sopesticated solution
91-
log(
92-
context.isVerbose,
93-
"⏳",
94-
`Invoke ${name || funcId} still in progress, terminating`,
95-
);
96-
return terminate(context, TerminationReason.OPERATION_TERMINATED, stepId);
97-
}
91+
if (stepData?.Status === OperationStatus.STARTED) {
92+
// Operation is still running, check for other operations before terminating
93+
if (hasRunningOperations()) {
94+
log(
95+
context.isVerbose,
96+
"⏳",
97+
`Invoke ${name || funcId} still in progress, waiting for other operations`,
98+
);
99+
await waitBeforeContinue({
100+
checkHasRunningOperations: true,
101+
checkStepStatus: true,
102+
checkTimer: false,
103+
stepId,
104+
context,
105+
hasRunningOperations,
106+
});
107+
continue; // Re-evaluate status after waiting
108+
}
109+
110+
// No other operations running, safe to terminate
111+
log(
112+
context.isVerbose,
113+
"⏳",
114+
`Invoke ${name || funcId} still in progress, terminating`,
115+
);
116+
return terminate(context, TerminationReason.OPERATION_TERMINATED, stepId);
117+
}
98118

99-
// Execute with potential interception (testing)
100-
const result = await OperationInterceptor.forExecution(
101-
context.durableExecutionArn,
102-
).execute(name, async (): Promise<O> => {
103-
// Serialize the input payload
104-
const serializedPayload = await safeSerialize(
105-
config?.payloadSerdes || defaultSerdes,
106-
input,
107-
stepId,
108-
name,
109-
context.terminationManager,
110-
context.isVerbose,
119+
// Execute with potential interception (testing)
120+
await OperationInterceptor.forExecution(
111121
context.durableExecutionArn,
112-
);
122+
).execute(name, async (): Promise<void> => {
123+
// Serialize the input payload
124+
const serializedPayload = await safeSerialize(
125+
config?.payloadSerdes || defaultSerdes,
126+
input,
127+
stepId,
128+
name,
129+
context.terminationManager,
130+
context.isVerbose,
131+
context.durableExecutionArn,
132+
);
113133

114-
// Create checkpoint for the invoke operation
115-
await checkpoint(stepId, {
116-
Id: stepId,
117-
ParentId: context.parentId,
118-
Action: OperationAction.START,
119-
SubType: OperationSubType.INVOKE,
120-
Type: OperationType.INVOKE,
121-
Name: name,
122-
Payload: serializedPayload,
123-
InvokeOptions: {
124-
FunctionName: funcId,
125-
...(config?.timeoutSeconds && {
126-
TimeoutSeconds: config.timeoutSeconds,
127-
}),
128-
},
129-
});
130-
131-
log(
132-
context.isVerbose,
133-
"🚀",
134-
`Invoke ${name || funcId} started, terminating for async execution`,
135-
);
134+
// Create checkpoint for the invoke operation
135+
await checkpoint(stepId, {
136+
Id: stepId,
137+
ParentId: context.parentId,
138+
Action: OperationAction.START,
139+
SubType: OperationSubType.INVOKE,
140+
Type: OperationType.INVOKE,
141+
Name: name,
142+
Payload: serializedPayload,
143+
InvokeOptions: {
144+
FunctionName: funcId,
145+
...(config?.timeoutSeconds && {
146+
TimeoutSeconds: config.timeoutSeconds,
147+
}),
148+
},
149+
});
136150

137-
// Terminate to allow the invoke to execute asynchronously
138-
// It's a temporary solution until we implement more sopesticated solution
139-
return terminate(context, TerminationReason.OPERATION_TERMINATED, stepId);
140-
});
151+
log(
152+
context.isVerbose,
153+
"🚀",
154+
`Invoke ${name || funcId} started, re-checking status`,
155+
);
156+
});
141157

142-
return result;
158+
// Continue the loop to re-evaluate status (will hit STARTED case)
159+
continue;
160+
}
143161
}
144162

145163
return invokeHandler;

0 commit comments

Comments
 (0)