From fad4fbfefcde274c4e78272a2287279fc4f37c80 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 13 Mar 2026 10:35:06 +0100 Subject: [PATCH] Add TerminalException.metadata field --- .../restate/sdk/common/TerminalException.java | 38 +++++++++++++++++-- .../restate/sdk/core/ProtocolException.java | 17 +++++++++ .../core/statemachine/StateMachineImpl.java | 18 +++++++++ .../restate/sdk/core/statemachine/Util.java | 29 ++++++++++++-- .../dev/restate/service/protocol.proto | 9 +++++ .../sdk/core/statemachine/ProtoUtils.java | 2 +- 6 files changed, 105 insertions(+), 8 deletions(-) diff --git a/sdk-common/src/main/java/dev/restate/sdk/common/TerminalException.java b/sdk-common/src/main/java/dev/restate/sdk/common/TerminalException.java index c9a364eda..e9b5d5e41 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/common/TerminalException.java +++ b/sdk-common/src/main/java/dev/restate/sdk/common/TerminalException.java @@ -8,6 +8,9 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.common; +import java.util.Map; +import java.util.Objects; + /** When thrown in a Restate service method, it will complete the invocation with an error. */ public class TerminalException extends RuntimeException { @@ -17,6 +20,7 @@ public class TerminalException extends RuntimeException { public static final int INTERNAL_SERVER_ERROR_CODE = 500; private final int code; + private final Map metadata; public TerminalException() { this(INTERNAL_SERVER_ERROR_CODE); @@ -34,16 +38,37 @@ public TerminalException(int code) { * @param message error message */ public TerminalException(int code, String message) { - super(message); - this.code = code; + this(code, message, null); } /** * Like {@link #TerminalException(int, String)}, with code {@link #INTERNAL_SERVER_ERROR_CODE}. */ public TerminalException(String message) { + this(INTERNAL_SERVER_ERROR_CODE, message, null); + } + + /** + * Create a new {@link TerminalException}. + * + * @param code HTTP response status code + * @param message error message + * @param metadata error metadata (supported only from Restate > 1.6) + */ + public TerminalException(int code, String message, Map metadata) { super(message); - this.code = INTERNAL_SERVER_ERROR_CODE; + this.code = code; + this.metadata = Objects.requireNonNullElse(metadata, Map.of()); + } + + /** + * Create a new {@link TerminalException}. + * + * @param message error message + * @param metadata error metadata (supported only from Restate > 1.6) + */ + public TerminalException(String message, Map metadata) { + this(INTERNAL_SERVER_ERROR_CODE, message, metadata); } /** @@ -52,4 +77,11 @@ public TerminalException(String message) { public int getCode() { return code; } + + /** + * @return the error metadata + */ + public Map getMetadata() { + return metadata; + } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java index 0b0cb9f79..51a08617d 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java @@ -10,6 +10,7 @@ import com.google.protobuf.MessageLite; import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.core.generated.protocol.Protocol; import dev.restate.sdk.core.statemachine.NotificationId; public class ProtocolException extends RuntimeException { @@ -20,6 +21,7 @@ public class ProtocolException extends RuntimeException { public static final int INTERNAL_CODE = 500; public static final int JOURNAL_MISMATCH_CODE = 570; static final int PROTOCOL_VIOLATION_CODE = 571; + static final int UNSUPPORTED_FEATURE = 573; private final int code; @@ -129,4 +131,19 @@ public static ProtocolException idempotencyKeyIsEmpty() { public static ProtocolException unauthorized(Throwable e) { return new ProtocolException("Unauthorized", UNAUTHORIZED_CODE, e); } + + public static ProtocolException unsupportedFeature( + String featureName, + Protocol.ServiceProtocolVersion requiredVersion, + Protocol.ServiceProtocolVersion negotiatedVersion) { + return new ProtocolException( + "Current service protocol version does not support " + + featureName + + ". " + + "Negotiated version: " + + negotiatedVersion.getNumber() + + ", minimum required: " + + requiredVersion.getNumber(), + UNSUPPORTED_FEATURE); + } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java index 4253a1ba0..354daa266 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java @@ -411,6 +411,7 @@ public void completeAwakeable(String awakeableId, Slice value) { @Override public void completeAwakeable(String awakeableId, TerminalException exception) { LOG.debug("Executing 'Complete awakeable {} with failure'", awakeableId); + verifyErrorMetadataFeatureSupport(exception); completeAwakeable(awakeableId, builder -> builder.setFailure(toProtocolFailure(exception))); } @@ -455,6 +456,7 @@ public void completeSignal( "Executing 'Complete signal {} to invocation {} with failure'", signalName, targetInvocationId); + verifyErrorMetadataFeatureSupport(exception); this.completeSignal( targetInvocationId, signalName, @@ -520,6 +522,7 @@ public int promiseComplete(String key, Slice value) { @Override public int promiseComplete(String key, TerminalException exception) { LOG.debug("Executing 'Complete promise {} with failure'", key); + verifyErrorMetadataFeatureSupport(exception); return this.promiseComplete( key, builder -> builder.setCompletionFailure(toProtocolFailure(exception))); } @@ -567,6 +570,9 @@ public void proposeRunCompletion( Duration attemptDuration, @Nullable RetryPolicy retryPolicy) { LOG.debug("Executing 'Run completed with failure'"); + if (exception instanceof TerminalException) { + verifyErrorMetadataFeatureSupport((TerminalException) exception); + } try { this.stateContext .getCurrentState() @@ -639,6 +645,7 @@ public void writeOutput(Slice value) { @Override public void writeOutput(TerminalException exception) { LOG.debug("Executing 'Write invocation output with failure'"); + verifyErrorMetadataFeatureSupport(exception); this.stateContext .getCurrentState() .processNonCompletableCommand( @@ -666,4 +673,15 @@ private void cancelInputSubscription() { this.inputSubscription = null; } } + + private void verifyErrorMetadataFeatureSupport(TerminalException exception) { + if (!exception.getMetadata().isEmpty() + && stateContext.getNegotiatedProtocolVersion().getNumber() + < Protocol.ServiceProtocolVersion.V6.getNumber()) { + throw ProtocolException.unsupportedFeature( + "terminal error metadata", + Protocol.ServiceProtocolVersion.V6, + stateContext.getNegotiatedProtocolVersion()); + } + } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java index 9d1083bd3..6eab24998 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java @@ -16,27 +16,48 @@ import dev.restate.sdk.core.generated.protocol.Protocol; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class Util { - static Protocol.Failure toProtocolFailure(int code, String message) { + static Protocol.Failure toProtocolFailure( + int code, String message, Map metadata) { Protocol.Failure.Builder builder = Protocol.Failure.newBuilder().setCode(code); if (message != null) { builder.setMessage(message); } + if (metadata != null) { + for (Map.Entry entry : metadata.entrySet()) { + builder.addMetadata( + Protocol.FailureMetadata.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue())); + } + } return builder.build(); } static Protocol.Failure toProtocolFailure(Throwable throwable) { if (throwable instanceof TerminalException) { - return toProtocolFailure(((TerminalException) throwable).getCode(), throwable.getMessage()); + return toProtocolFailure( + ((TerminalException) throwable).getCode(), + throwable.getMessage(), + ((TerminalException) throwable).getMetadata()); } - return toProtocolFailure(TerminalException.INTERNAL_SERVER_ERROR_CODE, throwable.toString()); + return toProtocolFailure( + TerminalException.INTERNAL_SERVER_ERROR_CODE, throwable.toString(), Map.of()); } static TerminalException toRestateException(Protocol.Failure failure) { - return new TerminalException(failure.getCode(), failure.getMessage()); + return new TerminalException( + failure.getCode(), + failure.getMessage(), + failure.getMetadataList().stream() + .collect( + Collectors.toMap( + Protocol.FailureMetadata::getKey, Protocol.FailureMetadata::getValue))); } /** NOTE! This method rewinds the buffer!!! */ diff --git a/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto b/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto index 48f62c4fa..0a6696533 100644 --- a/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto +++ b/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto @@ -34,6 +34,7 @@ enum ServiceProtocolVersion { V5 = 5; // Added: // * StartMessage.random_seed + // * Failure.metadata V6 = 6; } @@ -633,6 +634,14 @@ message Failure { uint32 code = 1; // Contains a concise error message, e.g. Throwable#getMessage() in Java. string message = 2; + + // Error metadata + repeated FailureMetadata metadata = 3; +} + +message FailureMetadata { + string key = 1; + string value = 2; } message Header { diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java index d7648c60e..6a68958fe 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java @@ -456,7 +456,7 @@ public static Protocol.SendSignalCommandMessage sendCancelSignal(String targetIn } public static Protocol.Failure failure(int code, String message) { - return Util.toProtocolFailure(code, message); + return Util.toProtocolFailure(code, message, Map.of()); } public static Protocol.Failure failure(Throwable throwable) {