Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,14 @@ void testMapWithLargeResult_replayChildren() {

var result1 = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
assertEquals(202, result1.getHistoryEvents().size());
var firstRunCount = executionCount.get();
assertTrue(firstRunCount >= 100);

// Replay — large result path: replayChildren=true, children replay from cache
var result2 = runner.run("test");
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
assertEquals(202, result2.getHistoryEvents().size());
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.services.lambda.model.ContextOptions;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
Expand Down Expand Up @@ -42,7 +43,8 @@ public class MapOperation<I, O> extends ConcurrencyOperation<MapResult<O>> {
private final DurableContext.MapFunction<I, O> function;
private final TypeToken<O> itemResultType;
private final SerDes serDes;
private boolean replayFromPayload;
private final AtomicBoolean replayFromPayload = new AtomicBoolean(false);
private final AtomicBoolean replayForLargeResult = new AtomicBoolean(false);
private volatile MapResult<O> cachedResult;

public MapOperation(
Expand Down Expand Up @@ -133,10 +135,11 @@ protected void replay(Operation existing) {
if (existing.contextDetails() != null
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
// Large result: re-execute children to reconstruct MapResult
replayForLargeResult.set(true);
executeItems();
} else {
// Small result: MapResult is in the payload, skip child replay
replayFromPayload = true;
replayFromPayload.set(true);
markAlreadyCompleted();
}
}
Expand Down Expand Up @@ -181,6 +184,10 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
}

this.cachedResult = new MapResult<>(resultItems, concurrencyCompletionStatus);
if (replayForLargeResult.get()) {
markAlreadyCompleted();
return;
}
var serialized = serializeResult(cachedResult);
var serializedBytes = serialized.getBytes(java.nio.charset.StandardCharsets.UTF_8);

Expand All @@ -205,7 +212,7 @@ public MapResult<O> get() {
if (items.isEmpty()) {
return MapResult.empty();
}
if (replayFromPayload) {
if (replayFromPayload.get()) {
// Small result replay: deserialize MapResult directly from checkpoint payload
var op = waitForOperationCompletion();
var result = (op.contextDetails() != null) ? op.contextDetails().result() : null;
Expand Down
Loading