Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public List<Event> getEvents() {
return List.copyOf(events);
}

/** Returns the operation ID. */
public String getId() {
return operation.id();
}

/** Returns the operation name. */
public String getName() {
return operation.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ void processUpdate(OperationUpdate update, Operation operation) {
.eventId(eventId.getAndIncrement())
.eventTimestamp(Instant.now())
.id(update.id())
.name(update.name());
.name(update.name())
.parentId(operation.parentId());

Event event =
switch (update.type()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.config;

import java.util.Objects;
import software.amazon.lambda.durable.serde.SerDes;

/**
Expand All @@ -13,11 +14,12 @@ public class MapConfig {
private final Integer maxConcurrency;
private final CompletionConfig completionConfig;
private final SerDes serDes;
private final NestingType nestingType;

private MapConfig(Builder builder) {
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
this.completionConfig =
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
this.completionConfig = Objects.requireNonNullElse(builder.completionConfig, CompletionConfig.allCompleted());
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
this.serDes = builder.serDes;
}

Expand All @@ -36,25 +38,31 @@ public SerDes serDes() {
return serDes;
}

/** @return nesting type, defaults to {@link NestingType#NESTED} */
public NestingType nestingType() {
return nestingType;
}

public static Builder builder() {
return new Builder(null, null, null);
return new Builder();
}

public Builder toBuilder() {
return new Builder(maxConcurrency, completionConfig, serDes);
return new Builder()
.maxConcurrency(maxConcurrency)
.completionConfig(completionConfig)
.serDes(serDes)
.nestingType(nestingType);
}

/** Builder for creating MapConfig instances. */
public static class Builder {
public NestingType nestingType;
private Integer maxConcurrency;
private CompletionConfig completionConfig;
private SerDes serDes;

private Builder(Integer maxConcurrency, CompletionConfig completionConfig, SerDes serDes) {
this.maxConcurrency = maxConcurrency;
this.completionConfig = completionConfig;
this.serDes = serDes;
}
private Builder() {}

public Builder maxConcurrency(Integer maxConcurrency) {
if (maxConcurrency != null && maxConcurrency < 1) {
Expand Down Expand Up @@ -86,6 +94,17 @@ public Builder serDes(SerDes serDes) {
return this;
}

/**
* Sets the nesting type for the map operation.
*
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
* @return this builder for method chaining
*/
public Builder nestingType(NestingType nestingType) {
this.nestingType = nestingType;
return this;
}

public MapConfig build() {
return new MapConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.config;

public enum NestingType {
/**
* Create CONTEXT operations for each branch/iteration with full checkpointing. Operations within each
* branch/iteration are wrapped in their own context. - **Observability**: High - each branch/iteration appears as
* separate operation in execution history - **Cost**: Higher - consumes more operations due to CONTEXT creation
* overhead - **Scale**: Lower maximum iterations due to operation limits
*/
NESTED,

/**
* Skip CONTEXT operations for branches/iterations using virtual contexts. Operations execute directly without
* individual context wrapping. - **Observability**: Lower - branches/iterations don't appear as separate operations
* - **Cost**: ~30% lower - reduces operation consumption by skipping CONTEXT overhead - **Scale**: Higher maximum
* iterations possible within operation limits
*/
FLAT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.config;

import java.util.Objects;

/**
* Configuration options for parallel operations in durable executions.
*
Expand All @@ -11,22 +13,29 @@
public class ParallelConfig {
private final int maxConcurrency;
private final CompletionConfig completionConfig;
private final NestingType nestingType;

private ParallelConfig(Builder builder) {
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
this.completionConfig =
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
this.completionConfig = Objects.requireNonNullElseGet(builder.completionConfig, CompletionConfig::allCompleted);
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
}

/** @return the maximum number of branches running simultaneously, or -1 for unlimited */
public int maxConcurrency() {
return maxConcurrency;
}

/** @return the completion configuration for the parallel operation */
public CompletionConfig completionConfig() {
return completionConfig;
}

/** @return the nesting type for the parallel operation */
public NestingType nestingType() {
return nestingType;
}

/**
* Creates a new builder for ParallelConfig.
*
Expand All @@ -36,10 +45,18 @@ public static Builder builder() {
return new Builder();
}

public Builder toBuilder() {
return new Builder()
.maxConcurrency(maxConcurrency)
.completionConfig(completionConfig)
.nestingType(nestingType);
}

/** Builder for creating ParallelConfig instances. */
public static class Builder {
private Integer maxConcurrency;
private CompletionConfig completionConfig;
private NestingType nestingType;

private Builder() {}

Expand Down Expand Up @@ -71,6 +88,17 @@ public Builder completionConfig(CompletionConfig completionConfig) {
return this;
}

/**
* Sets the nesting type for the parallel operation.
*
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
* @return this builder for method chaining
*/
public Builder nestingType(NestingType nestingType) {
this.nestingType = nestingType;
return this;
}

/**
* Builds the ParallelConfig instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex
private static final int MAX_WAIT_FOR_CALLBACK_NAME_LENGTH = ParameterValidator.MAX_OPERATION_NAME_LENGTH
- Math.max(WAIT_FOR_CALLBACK_CALLBACK_SUFFIX.length(), WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX.length());
private final OperationIdGenerator operationIdGenerator;
private final DurableContextImpl parentContext;
private final boolean isVirtual;
private volatile DurableLogger logger;

/** Shared initialization — sets all fields. */
Expand All @@ -65,9 +67,13 @@ private DurableContextImpl(
DurableConfig durableConfig,
Context lambdaContext,
String contextId,
String contextName) {
String contextName,
boolean isVirtual,
DurableContextImpl parentContext) {
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
operationIdGenerator = new OperationIdGenerator(contextId);
this.parentContext = parentContext;
this.isVirtual = isVirtual;
}

/**
Expand All @@ -82,19 +88,26 @@ private DurableContextImpl(
*/
public static DurableContextImpl createRootContext(
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) {
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null);
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null, false, null);
}

/**
* Creates a child context.
*
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
* @param childContextName the name of the child context
* @param isVirtual whether the context is virtual
* @return a new DurableContext for the child context
*/
public DurableContextImpl createChildContext(String childContextId, String childContextName) {
public DurableContextImpl createChildContext(String childContextId, String childContextName, boolean isVirtual) {
return new DurableContextImpl(
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
getExecutionManager(),
getDurableConfig(),
getLambdaContext(),
childContextId,
childContextName,
isVirtual,
this);
}

/**
Expand Down Expand Up @@ -387,4 +400,13 @@ public void close() {
private String nextOperationId() {
return operationIdGenerator.nextOperationId();
}

/**
* Get the parent context ID for its child operations, which always points to a non-virtual context
*
* @return the parent of this context if virtual, otherwise this context id
*/
public String getParentId() {
return isVirtual ? parentContext.getContextId() : getContextId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.model;

import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.lambda.durable.util.ExceptionHelper;

public record DeserializedOperationResult<T>(OperationStatus status, T result, Throwable throwable) {
public static <T> DeserializedOperationResult<T> succeeded(T result) {
return new DeserializedOperationResult<>(OperationStatus.SUCCEEDED, result, null);
}

public static <T> DeserializedOperationResult<T> failed(Throwable throwable) {
return new DeserializedOperationResult<>(OperationStatus.FAILED, null, throwable);
}

public T get() {
if (status == OperationStatus.SUCCEEDED) {
return result;
}
ExceptionHelper.sneakyThrow(throwable);
return null;
}
}
Loading
Loading