From 24cf1ac3fa058327181796b411048cd7afaeacf4 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Sat, 18 Apr 2026 20:00:03 -0700 Subject: [PATCH] avoid checkpoint when replay in large result mode --- .../amazon/lambda/durable/MapIntegrationTest.java | 2 ++ .../lambda/durable/operation/MapOperation.java | 13 ++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java index d85bfea4a..84e0f09a0 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java @@ -862,12 +862,14 @@ void testMapWithLargeResult_replayChildren() { var result1 = runner.runUntilComplete("test"); assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals(202, result1.getHistoryEvents().size()); var firstRunCount = executionCount.get(); assertTrue(firstRunCount >= 100); // Replay — large result path: replayChildren=true, children replay from cache var result2 = runner.run("test"); assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(202, result2.getHistoryEvents().size()); assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index 9f47463c3..7f26116d7 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import software.amazon.awssdk.services.lambda.model.ContextOptions; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; @@ -42,7 +43,8 @@ public class MapOperation extends ConcurrencyOperation> { private final DurableContext.MapFunction function; private final TypeToken itemResultType; private final SerDes serDes; - private boolean replayFromPayload; + private final AtomicBoolean replayFromPayload = new AtomicBoolean(false); + private final AtomicBoolean replayForLargeResult = new AtomicBoolean(false); private volatile MapResult cachedResult; public MapOperation( @@ -133,10 +135,11 @@ protected void replay(Operation existing) { if (existing.contextDetails() != null && Boolean.TRUE.equals(existing.contextDetails().replayChildren())) { // Large result: re-execute children to reconstruct MapResult + replayForLargeResult.set(true); executeItems(); } else { // Small result: MapResult is in the payload, skip child replay - replayFromPayload = true; + replayFromPayload.set(true); markAlreadyCompleted(); } } @@ -181,6 +184,10 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio } this.cachedResult = new MapResult<>(resultItems, concurrencyCompletionStatus); + if (replayForLargeResult.get()) { + markAlreadyCompleted(); + return; + } var serialized = serializeResult(cachedResult); var serializedBytes = serialized.getBytes(java.nio.charset.StandardCharsets.UTF_8); @@ -205,7 +212,7 @@ public MapResult get() { if (items.isEmpty()) { return MapResult.empty(); } - if (replayFromPayload) { + if (replayFromPayload.get()) { // Small result replay: deserialize MapResult directly from checkpoint payload var op = waitForOperationCompletion(); var result = (op.contextDetails() != null) ? op.contextDetails().result() : null;