Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.common;

import java.util.Map;
import java.util.Objects;

/** When thrown in a Restate service method, it will complete the invocation with an error. */
public class TerminalException extends RuntimeException {

Expand All @@ -17,6 +20,7 @@ public class TerminalException extends RuntimeException {
public static final int INTERNAL_SERVER_ERROR_CODE = 500;

private final int code;
private final Map<String, String> metadata;

public TerminalException() {
this(INTERNAL_SERVER_ERROR_CODE);
Expand All @@ -34,16 +38,37 @@ public TerminalException(int code) {
* @param message error message
*/
public TerminalException(int code, String message) {
super(message);
this.code = code;
this(code, message, null);
}

/**
* Like {@link #TerminalException(int, String)}, with code {@link #INTERNAL_SERVER_ERROR_CODE}.
*/
public TerminalException(String message) {
this(INTERNAL_SERVER_ERROR_CODE, message, null);
}

/**
* Create a new {@link TerminalException}.
*
* @param code HTTP response status code
* @param message error message
* @param metadata error metadata (supported only from Restate > 1.6)
*/
public TerminalException(int code, String message, Map<String, String> metadata) {
super(message);
this.code = INTERNAL_SERVER_ERROR_CODE;
this.code = code;
this.metadata = Objects.requireNonNullElse(metadata, Map.of());
}

/**
* Create a new {@link TerminalException}.
*
* @param message error message
* @param metadata error metadata (supported only from Restate > 1.6)
*/
public TerminalException(String message, Map<String, String> metadata) {
this(INTERNAL_SERVER_ERROR_CODE, message, metadata);
}

/**
Expand All @@ -52,4 +77,11 @@ public TerminalException(String message) {
public int getCode() {
return code;
}

/**
* @return the error metadata
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
17 changes: 17 additions & 0 deletions sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.protobuf.MessageLite;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.core.generated.protocol.Protocol;
import dev.restate.sdk.core.statemachine.NotificationId;

public class ProtocolException extends RuntimeException {
Expand All @@ -20,6 +21,7 @@ public class ProtocolException extends RuntimeException {
public static final int INTERNAL_CODE = 500;
public static final int JOURNAL_MISMATCH_CODE = 570;
static final int PROTOCOL_VIOLATION_CODE = 571;
static final int UNSUPPORTED_FEATURE = 573;

private final int code;

Expand Down Expand Up @@ -129,4 +131,19 @@ public static ProtocolException idempotencyKeyIsEmpty() {
public static ProtocolException unauthorized(Throwable e) {
return new ProtocolException("Unauthorized", UNAUTHORIZED_CODE, e);
}

public static ProtocolException unsupportedFeature(
String featureName,
Protocol.ServiceProtocolVersion requiredVersion,
Protocol.ServiceProtocolVersion negotiatedVersion) {
return new ProtocolException(
"Current service protocol version does not support "
+ featureName
+ ". "
+ "Negotiated version: "
+ negotiatedVersion.getNumber()
+ ", minimum required: "
+ requiredVersion.getNumber(),
UNSUPPORTED_FEATURE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ public void completeAwakeable(String awakeableId, Slice value) {
@Override
public void completeAwakeable(String awakeableId, TerminalException exception) {
LOG.debug("Executing 'Complete awakeable {} with failure'", awakeableId);
verifyErrorMetadataFeatureSupport(exception);
completeAwakeable(awakeableId, builder -> builder.setFailure(toProtocolFailure(exception)));
}

Expand Down Expand Up @@ -455,6 +456,7 @@ public void completeSignal(
"Executing 'Complete signal {} to invocation {} with failure'",
signalName,
targetInvocationId);
verifyErrorMetadataFeatureSupport(exception);
this.completeSignal(
targetInvocationId,
signalName,
Expand Down Expand Up @@ -520,6 +522,7 @@ public int promiseComplete(String key, Slice value) {
@Override
public int promiseComplete(String key, TerminalException exception) {
LOG.debug("Executing 'Complete promise {} with failure'", key);
verifyErrorMetadataFeatureSupport(exception);
return this.promiseComplete(
key, builder -> builder.setCompletionFailure(toProtocolFailure(exception)));
}
Expand Down Expand Up @@ -567,6 +570,9 @@ public void proposeRunCompletion(
Duration attemptDuration,
@Nullable RetryPolicy retryPolicy) {
LOG.debug("Executing 'Run completed with failure'");
if (exception instanceof TerminalException) {
verifyErrorMetadataFeatureSupport((TerminalException) exception);
}
try {
this.stateContext
.getCurrentState()
Expand Down Expand Up @@ -639,6 +645,7 @@ public void writeOutput(Slice value) {
@Override
public void writeOutput(TerminalException exception) {
LOG.debug("Executing 'Write invocation output with failure'");
verifyErrorMetadataFeatureSupport(exception);
this.stateContext
.getCurrentState()
.processNonCompletableCommand(
Expand Down Expand Up @@ -666,4 +673,15 @@ private void cancelInputSubscription() {
this.inputSubscription = null;
}
}

private void verifyErrorMetadataFeatureSupport(TerminalException exception) {
if (!exception.getMetadata().isEmpty()
&& stateContext.getNegotiatedProtocolVersion().getNumber()
< Protocol.ServiceProtocolVersion.V6.getNumber()) {
throw ProtocolException.unsupportedFeature(
"terminal error metadata",
Protocol.ServiceProtocolVersion.V6,
stateContext.getNegotiatedProtocolVersion());
}
}
}
29 changes: 25 additions & 4 deletions sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,48 @@
import dev.restate.sdk.core.generated.protocol.Protocol;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class Util {

static Protocol.Failure toProtocolFailure(int code, String message) {
static Protocol.Failure toProtocolFailure(
int code, String message, Map<String, String> metadata) {
Protocol.Failure.Builder builder = Protocol.Failure.newBuilder().setCode(code);
if (message != null) {
builder.setMessage(message);
}
if (metadata != null) {
for (Map.Entry<String, String> entry : metadata.entrySet()) {
builder.addMetadata(
Protocol.FailureMetadata.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue()));
}
}
return builder.build();
}

static Protocol.Failure toProtocolFailure(Throwable throwable) {
if (throwable instanceof TerminalException) {
return toProtocolFailure(((TerminalException) throwable).getCode(), throwable.getMessage());
return toProtocolFailure(
((TerminalException) throwable).getCode(),
throwable.getMessage(),
((TerminalException) throwable).getMetadata());
}
return toProtocolFailure(TerminalException.INTERNAL_SERVER_ERROR_CODE, throwable.toString());
return toProtocolFailure(
TerminalException.INTERNAL_SERVER_ERROR_CODE, throwable.toString(), Map.of());
}

static TerminalException toRestateException(Protocol.Failure failure) {
return new TerminalException(failure.getCode(), failure.getMessage());
return new TerminalException(
failure.getCode(),
failure.getMessage(),
failure.getMetadataList().stream()
.collect(
Collectors.toMap(
Protocol.FailureMetadata::getKey, Protocol.FailureMetadata::getValue)));
}

/** NOTE! This method rewinds the buffer!!! */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ enum ServiceProtocolVersion {
V5 = 5;
// Added:
// * StartMessage.random_seed
// * Failure.metadata
V6 = 6;
}

Expand Down Expand Up @@ -633,6 +634,14 @@ message Failure {
uint32 code = 1;
// Contains a concise error message, e.g. Throwable#getMessage() in Java.
string message = 2;

// Error metadata
repeated FailureMetadata metadata = 3;
}

message FailureMetadata {
string key = 1;
string value = 2;
}

message Header {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public static Protocol.SendSignalCommandMessage sendCancelSignal(String targetIn
}

public static Protocol.Failure failure(int code, String message) {
return Util.toProtocolFailure(code, message);
return Util.toProtocolFailure(code, message, Map.of());
}

public static Protocol.Failure failure(Throwable throwable) {
Expand Down
Loading