diff --git a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt index cfd2b7254..7ae31b227 100644 --- a/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt +++ b/client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt @@ -271,6 +271,84 @@ val Response.response: Res val SendResponse.sendStatus: SendResponse.SendStatus get() = this.sendStatus() +/** + * Scope client communication, to send requests to services, virtual objects and workflows within a + * scope. Requires Restate >= 1.7. + * + * Example usage: + * ```kotlin + * val greeter = client.scope("my-scope").service() + * val response = greeter.greet("Alice") + * ``` + * + * @param scopeKey the scope key to prepend to all invocation targets + * @return a scoped client + */ +@org.jetbrains.annotations.ApiStatus.Experimental +fun Client.scope(scopeKey: String): ScopedKotlinClient = ScopedKotlinClient(this, scopeKey) + +/** + * Scope client communication, to send requests to services, virtual objects and workflows within a + * scope. Requires Restate >= 1.7. + * + * Obtain an instance via [Client.scope]. + */ +@org.jetbrains.annotations.ApiStatus.Experimental +class ScopedKotlinClient +@PublishedApi +internal constructor( + @PublishedApi internal val client: Client, + @PublishedApi internal val scopeKey: String, +) { + /** @see Client.service */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun service(): SVC { + return service(client, SVC::class.java, scopeKey) + } + + /** @see Client.virtualObject */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun virtualObject(key: String): SVC { + return virtualObject(client, SVC::class.java, key, scopeKey) + } + + /** @see Client.workflow */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun workflow(key: String): SVC { + return workflow(client, SVC::class.java, key, scopeKey) + } + + /** @see Client.toService */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun toService(): KClientRequestBuilder { + ReflectionUtils.mustHaveServiceAnnotation(SVC::class.java) + require(ReflectionUtils.isKotlinClass(SVC::class.java)) { + "Using Java classes with Kotlin's API is not supported" + } + return KClientRequestBuilder(client, SVC::class.java, null, scopeKey) + } + + /** @see Client.toVirtualObject */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun toVirtualObject(key: String): KClientRequestBuilder { + ReflectionUtils.mustHaveVirtualObjectAnnotation(SVC::class.java) + require(ReflectionUtils.isKotlinClass(SVC::class.java)) { + "Using Java classes with Kotlin's API is not supported" + } + return KClientRequestBuilder(client, SVC::class.java, key, scopeKey) + } + + /** @see Client.toWorkflow */ + @org.jetbrains.annotations.ApiStatus.Experimental + inline fun toWorkflow(key: String): KClientRequestBuilder { + ReflectionUtils.mustHaveWorkflowAnnotation(SVC::class.java) + require(ReflectionUtils.isKotlinClass(SVC::class.java)) { + "Using Java classes with Kotlin's API is not supported" + } + return KClientRequestBuilder(client, SVC::class.java, key, scopeKey) + } +} + /** * Create a proxy client for a Restate service. * @@ -332,7 +410,7 @@ inline fun Client.workflow(key: String): SVC { * @return a proxy that intercepts method calls and executes them via the client */ @PublishedApi -internal fun service(client: Client, clazz: Class): SVC { +internal fun service(client: Client, clazz: Class, scope: String? = null): SVC { ReflectionUtils.mustHaveServiceAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -340,7 +418,7 @@ internal fun service(client: Client, clazz: Class): SVC { val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, null).toRequest() + val request = invocation.captureInvocation(serviceName, null, scope).toRequest() @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation // Start a coroutine that calls the client and resumes the continuation @@ -359,7 +437,12 @@ internal fun service(client: Client, clazz: Class): SVC { * @return a proxy that intercepts method calls and executes them via the client */ @PublishedApi -internal fun virtualObject(client: Client, clazz: Class, key: String): SVC { +internal fun virtualObject( + client: Client, + clazz: Class, + key: String, + scope: String? = null, +): SVC { ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -367,7 +450,7 @@ internal fun virtualObject(client: Client, clazz: Class, key: S val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, key).toRequest() + val request = invocation.captureInvocation(serviceName, key, scope).toRequest() @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation // Start a coroutine that calls the client and resumes the continuation @@ -386,7 +469,12 @@ internal fun virtualObject(client: Client, clazz: Class, key: S * @return a proxy that intercepts method calls and executes them via the client */ @PublishedApi -internal fun workflow(client: Client, clazz: Class, key: String): SVC { +internal fun workflow( + client: Client, + clazz: Class, + key: String, + scope: String? = null, +): SVC { ReflectionUtils.mustHaveWorkflowAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -394,7 +482,7 @@ internal fun workflow(client: Client, clazz: Class, key: String val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, key).toRequest() + val request = invocation.captureInvocation(serviceName, key, scope).toRequest() @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation // Start a coroutine that calls the client and resumes the continuation @@ -418,6 +506,7 @@ internal constructor( private val client: Client, private val clazz: Class, private val key: String?, + private val scope: String? = null, ) { /** * Create a request by invoking a method on the target. @@ -432,7 +521,7 @@ internal constructor( suspend fun request(block: suspend SVC.() -> Res): KClientRequest { return KClientRequestImpl( client, - RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest(), + RequestCaptureProxy(clazz, key, scope).capture(block as suspend SVC.() -> Any?).toRequest(), ) as KClientRequest } diff --git a/client/src/main/java/dev/restate/client/Client.java b/client/src/main/java/dev/restate/client/Client.java index 3fa5cee01..f1fbc80c9 100644 --- a/client/src/main/java/dev/restate/client/Client.java +++ b/client/src/main/java/dev/restate/client/Client.java @@ -531,6 +531,25 @@ default Response> getOutput() throws IngressException { } } + /** + * EXPERIMENTAL API: Scope client communication, to send requests to services, virtual + * objects and workflows within a scope. Requires Restate >= 1.7. + * + *
{@code
+   * Client client = Client.connect("http://localhost:8080");
+   *
+   * var greeterProxy = client.scope("my-scope").service(Greeter.class);
+   * GreetingResponse output = greeterProxy.greet(new Greeting("Alice"));
+   * }
+ * + * @param scopeKey the scope key to prepend to all invocation targets + * @return a scoped client + */ + @org.jetbrains.annotations.ApiStatus.Experimental + default ScopedClient scope(String scopeKey) { + return new ScopedClient(this, scopeKey); + } + /** * EXPERIMENTAL API: Simple API to invoke a Restate service from the ingress. * diff --git a/client/src/main/java/dev/restate/client/ClientServiceHandleImpl.java b/client/src/main/java/dev/restate/client/ClientServiceHandleImpl.java index 6fc122376..1a39274a8 100644 --- a/client/src/main/java/dev/restate/client/ClientServiceHandleImpl.java +++ b/client/src/main/java/dev/restate/client/ClientServiceHandleImpl.java @@ -29,14 +29,21 @@ final class ClientServiceHandleImpl implements ClientServiceHandle { private final Class clazz; private final String serviceName; private final @Nullable String key; + private final @Nullable String scope; private MethodInfoCollector methodInfoCollector; ClientServiceHandleImpl(Client innerClient, Class clazz, @Nullable String key) { + this(innerClient, clazz, key, null); + } + + ClientServiceHandleImpl( + Client innerClient, Class clazz, @Nullable String key, @Nullable String scope) { this.innerClient = innerClient; this.clazz = clazz; this.serviceName = ReflectionUtils.extractServiceName(clazz); this.key = key; + this.scope = scope; } @SuppressWarnings("unchecked") @@ -46,6 +53,7 @@ public CompletableFuture> callAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input); return innerClient.callAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -62,6 +70,7 @@ public CompletableFuture> callAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input); return innerClient.callAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -78,6 +87,7 @@ public CompletableFuture> callAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s); return innerClient.callAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -93,6 +103,7 @@ public CompletableFuture> callAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s); return innerClient.callAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -109,6 +120,7 @@ public CompletableFuture> sendAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input); return innerClient.sendAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -126,6 +138,7 @@ public CompletableFuture> sendAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input); return innerClient.sendAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -143,6 +156,7 @@ public CompletableFuture> sendAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s); return innerClient.sendAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -159,6 +173,7 @@ public CompletableFuture> sendAsync( MethodInfo methodInfo = getMethodInfoCollector().resolve(s); return innerClient.sendAsync( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), diff --git a/client/src/main/java/dev/restate/client/ScopedClient.java b/client/src/main/java/dev/restate/client/ScopedClient.java new file mode 100644 index 000000000..034430f57 --- /dev/null +++ b/client/src/main/java/dev/restate/client/ScopedClient.java @@ -0,0 +1,151 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.client; + +import dev.restate.common.Request; +import dev.restate.common.Target; +import dev.restate.common.reflections.MethodInfo; +import dev.restate.common.reflections.ProxySupport; +import dev.restate.common.reflections.ReflectionUtils; +import dev.restate.serde.TypeTag; + +/** + * Scope client communication, to send requests to services, virtual objects and workflows within a + * scope. Requires Restate >= 1.7. + * + *

Obtain an instance via {@link Client#scope(String)}. + */ +@org.jetbrains.annotations.ApiStatus.Experimental +public final class ScopedClient { + + private final Client client; + private final String scope; + + ScopedClient(Client client, String scope) { + this.client = client; + this.scope = scope; + } + + /** + * @see Client#service(Class) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC service(Class clazz) { + ReflectionUtils.mustHaveServiceAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + var serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return client + .call( + Request.of( + Target.virtualObject(scope, serviceName, null, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .response(); + }); + } + + /** + * @see Client#serviceHandle(Class) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ClientServiceHandle serviceHandle(Class clazz) { + ReflectionUtils.mustHaveServiceAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + return new ClientServiceHandleImpl<>(client, clazz, null, scope); + } + + /** + * @see Client#virtualObject(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC virtualObject(Class clazz, String key) { + ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + var serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return client + .call( + Request.of( + Target.virtualObject(scope, serviceName, key, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .response(); + }); + } + + /** + * @see Client#virtualObjectHandle(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ClientServiceHandle virtualObjectHandle(Class clazz, String key) { + ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + return new ClientServiceHandleImpl<>(client, clazz, key, scope); + } + + /** + * @see Client#workflow(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC workflow(Class clazz, String key) { + ReflectionUtils.mustHaveWorkflowAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + var serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return client + .call( + Request.of( + Target.virtualObject(scope, serviceName, key, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .response(); + }); + } + + /** + * @see Client#workflowHandle(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ClientServiceHandle workflowHandle(Class clazz, String key) { + ReflectionUtils.mustHaveWorkflowAnnotation(clazz); + if (ReflectionUtils.isKotlinClass(clazz)) { + throw new IllegalArgumentException("Using Kotlin classes with Java's API is not supported"); + } + return new ClientServiceHandleImpl<>(client, clazz, key, scope); + } +} diff --git a/client/src/main/java/dev/restate/client/base/BaseClient.java b/client/src/main/java/dev/restate/client/base/BaseClient.java index d9742a450..a3fdcffcc 100644 --- a/client/src/main/java/dev/restate/client/base/BaseClient.java +++ b/client/src/main/java/dev/restate/client/base/BaseClient.java @@ -64,7 +64,7 @@ public CompletableFuture> callAsync(Request r Serde reqSerde = this.serdeFactory.create(request.getRequestTypeTag()); Serde resSerde = this.serdeFactory.create(request.getResponseTypeTag()); - URI requestUri = toRequestURI(request.getTarget(), false, null); + URI requestUri = toRequestURI(request.getTarget(), false, null, request.getLimitKey()); Stream> headersStream = Stream.concat( baseOptions.headers().entrySet().stream(), @@ -92,7 +92,7 @@ public CompletableFuture> sendAsync( Request request, @Nullable Duration delay) { Serde reqSerde = this.serdeFactory.create(request.getRequestTypeTag()); - URI requestUri = toRequestURI(request.getTarget(), true, delay); + URI requestUri = toRequestURI(request.getTarget(), true, delay, request.getLimitKey()); Stream> headersStream = Stream.concat( baseOptions.headers().entrySet().stream(), @@ -435,13 +435,29 @@ private String targetToURI(Target target) { return builder.toString(); } - private URI toRequestURI(Target target, boolean isSend, @Nullable Duration delay) { + private URI toRequestURI( + Target target, boolean isSend, @Nullable Duration delay, @Nullable String limitKey) { StringBuilder builder = new StringBuilder(targetToURI(target)); if (isSend) { builder.append("/send"); } + String separator = "?"; + if (target.getScope() != null) { + builder + .append(separator) + .append("scope=") + .append(URLEncoder.encode(target.getScope(), StandardCharsets.UTF_8)); + separator = "&"; + } + if (limitKey != null) { + builder + .append(separator) + .append("limitKey=") + .append(URLEncoder.encode(limitKey, StandardCharsets.UTF_8)); + separator = "&"; + } if (delay != null && !delay.isZero() && !delay.isNegative()) { - builder.append("?delay=").append(delay); + builder.append(separator).append("delay=").append(delay); } return this.baseUri.resolve(builder.toString()); diff --git a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt index c50ad3d10..dee7a53e7 100644 --- a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt +++ b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/RequestCaptureProxy.kt @@ -23,7 +23,11 @@ import dev.restate.common.reflections.ReflectionUtils * @property serviceName the resolved service name * @property key the virtual object/workflow key (null for stateless services) */ -class RequestCaptureProxy(private val clazz: Class, private val key: String?) { +class RequestCaptureProxy( + private val clazz: Class, + private val key: String?, + private val scope: String? = null, +) { private val serviceName: String = ReflectionUtils.extractServiceName(clazz) @@ -36,7 +40,7 @@ class RequestCaptureProxy(private val clazz: Class, private val suspend fun capture(block: suspend SVC.() -> Any?): CapturedInvocation { val proxy = ProxySupport.createProxy(clazz) { invocation -> - throw invocation.captureInvocation(serviceName, key) + throw invocation.captureInvocation(serviceName, key, scope) } try { diff --git a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt index 0b30577d6..291217a0e 100644 --- a/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt +++ b/common-kotlin/src/main/kotlin/dev/restate/common/reflection/kotlin/reflections.kt @@ -46,6 +46,7 @@ data class CapturedInvocation( fun ProxyFactory.MethodInvocation.captureInvocation( serviceName: String, key: String?, + scope: String? = null, ): CapturedInvocation { val handlerInfo = ReflectionUtils.mustHaveHandlerAnnotation(method) val handlerName = handlerInfo.name @@ -73,12 +74,7 @@ fun ProxyFactory.MethodInvocation.captureInvocation( kFunction.findAnnotation(), ) - val target = - if (key != null) { - Target.virtualObject(serviceName, key, handlerName) - } else { - Target.service(serviceName, handlerName) - } + val target = Target.virtualObject(scope, serviceName, key, handlerName) // For suspend functions, arguments are: [input?, continuation] // Extract the input (first argument, excluding continuation) diff --git a/common/src/main/java/dev/restate/common/InvocationOptions.java b/common/src/main/java/dev/restate/common/InvocationOptions.java index ab4d58094..23019c3ac 100644 --- a/common/src/main/java/dev/restate/common/InvocationOptions.java +++ b/common/src/main/java/dev/restate/common/InvocationOptions.java @@ -15,14 +15,18 @@ public class InvocationOptions { - public static final InvocationOptions DEFAULT = new InvocationOptions(null, null); + public static final InvocationOptions DEFAULT = new InvocationOptions(null, null, null); private final @Nullable String idempotencyKey; + private final @Nullable String limitKey; private final @Nullable LinkedHashMap headers; InvocationOptions( - @Nullable String idempotencyKey, @Nullable LinkedHashMap headers) { + @Nullable String idempotencyKey, + @Nullable String limitKey, + @Nullable LinkedHashMap headers) { this.idempotencyKey = idempotencyKey; + this.limitKey = limitKey; this.headers = headers; } @@ -30,6 +34,17 @@ public class InvocationOptions { return idempotencyKey; } + /** + * Returns the limit key for this invocation. + * + *

The limit key is used to limit the concurrency of invocations sharing the same key within a + * scope. It must be used together with a scoped invocation target (see {@link + * Target#scoped(String)}), and requires Restate >= 1.7. + */ + public @Nullable String getLimitKey() { + return limitKey; + } + public @Nullable Map getHeaders() { return headers; } @@ -38,12 +53,13 @@ public class InvocationOptions { public boolean equals(Object o) { if (!(o instanceof InvocationOptions that)) return false; return Objects.equals(getIdempotencyKey(), that.getIdempotencyKey()) + && Objects.equals(getLimitKey(), that.getLimitKey()) && Objects.equals(getHeaders(), that.getHeaders()); } @Override public int hashCode() { - return Objects.hash(getIdempotencyKey(), getHeaders()); + return Objects.hash(getIdempotencyKey(), getLimitKey(), getHeaders()); } @Override @@ -52,34 +68,52 @@ public String toString() { + "idempotencyKey='" + idempotencyKey + '\'' + + ", limitKey='" + + limitKey + + '\'' + ", headers=" + headers + '}'; } public static Builder builder() { - return new Builder(null, null); + return new Builder(null, null, null); } public static Builder idempotencyKey(String idempotencyKey) { - return new Builder(null, null).idempotencyKey(idempotencyKey); + return new Builder(null, null, null).idempotencyKey(idempotencyKey); + } + + /** + * Create a builder with the given limit key. + * + *

The limit key is used to limit the concurrency of invocations sharing the same key within a + * scope. It must be used together with a scoped invocation target (see {@link + * Target#scoped(String)}), and requires Restate >= 1.7. + */ + public static Builder limitKey(String limitKey) { + return new Builder(null, null, null).limitKey(limitKey); } public static Builder header(String key, String value) { - return new Builder(null, null).header(key, value); + return new Builder(null, null, null).header(key, value); } public static Builder headers(Map newHeaders) { - return new Builder(null, null).headers(newHeaders); + return new Builder(null, null, null).headers(newHeaders); } public static final class Builder { @Nullable private String idempotencyKey; + @Nullable private String limitKey; @Nullable private LinkedHashMap headers; private Builder( - @Nullable String idempotencyKey, @Nullable LinkedHashMap headers) { + @Nullable String idempotencyKey, + @Nullable String limitKey, + @Nullable LinkedHashMap headers) { this.idempotencyKey = idempotencyKey; + this.limitKey = limitKey; this.headers = headers; } @@ -92,6 +126,21 @@ public Builder idempotencyKey(String idempotencyKey) { return this; } + /** + * Set the limit key for this invocation. + * + *

The limit key is used to limit the concurrency of invocations sharing the same key within + * a scope. It must be used together with a scoped invocation target (see {@link + * Target#scoped(String)}), and requires Restate >= 1.7. + * + * @param limitKey Limit key to attach in the request. + * @return this instance, so the builder can be used fluently. + */ + public Builder limitKey(String limitKey) { + this.limitKey = limitKey; + return this; + } + /** * Append this header to the list of configured headers. * @@ -132,6 +181,17 @@ public void setIdempotencyKey(@Nullable String idempotencyKey) { idempotencyKey(idempotencyKey); } + public @Nullable String getLimitKey() { + return limitKey; + } + + /** + * @param limitKey Limit key to attach in the request. + */ + public void setLimitKey(@Nullable String limitKey) { + this.limitKey = limitKey; + } + public @Nullable Map getHeaders() { return headers; } @@ -148,11 +208,11 @@ public void setHeaders(@Nullable Map headers) { * @return build the request */ public InvocationOptions build() { - return new InvocationOptions(this.idempotencyKey, this.headers); + return new InvocationOptions(this.idempotencyKey, this.limitKey, this.headers); } } public Builder toBuilder() { - return new Builder(this.idempotencyKey, this.headers); + return new Builder(this.idempotencyKey, this.limitKey, this.headers); } } diff --git a/common/src/main/java/dev/restate/common/Request.java b/common/src/main/java/dev/restate/common/Request.java index 4a503e8bf..821eaa36f 100644 --- a/common/src/main/java/dev/restate/common/Request.java +++ b/common/src/main/java/dev/restate/common/Request.java @@ -73,6 +73,11 @@ static RequestBuilder of(Target target, byte[] request) { */ @Nullable String getIdempotencyKey(); + /** + * @return the limit key + */ + @Nullable String getLimitKey(); + /** * @return the request headers */ diff --git a/common/src/main/java/dev/restate/common/RequestBuilder.java b/common/src/main/java/dev/restate/common/RequestBuilder.java index ae9643d41..0fa8b5af1 100644 --- a/common/src/main/java/dev/restate/common/RequestBuilder.java +++ b/common/src/main/java/dev/restate/common/RequestBuilder.java @@ -25,6 +25,17 @@ public interface RequestBuilder extends Request { */ RequestBuilder setIdempotencyKey(@Nullable String idempotencyKey); + /** + * @param limitKey Limit key to attach in the request. + * @return this instance, so the builder can be used fluently. + */ + RequestBuilder limitKey(@Nullable String limitKey); + + /** + * @param limitKey Limit key to attach in the request. + */ + RequestBuilder setLimitKey(@Nullable String limitKey); + /** * Append this header to the list of configured headers. * diff --git a/common/src/main/java/dev/restate/common/RequestImpl.java b/common/src/main/java/dev/restate/common/RequestImpl.java index 41faf3b4a..80fb7decb 100644 --- a/common/src/main/java/dev/restate/common/RequestImpl.java +++ b/common/src/main/java/dev/restate/common/RequestImpl.java @@ -21,6 +21,7 @@ final class RequestImpl implements WorkflowRequest { private final TypeTag resTypeTag; private final Req request; @Nullable private final String idempotencyKey; + @Nullable private final String limitKey; @Nullable private final LinkedHashMap headers; RequestImpl( @@ -29,12 +30,14 @@ final class RequestImpl implements WorkflowRequest { TypeTag resTypeTag, Req request, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable LinkedHashMap headers) { this.target = target; this.reqTypeTag = reqTypeTag; this.resTypeTag = resTypeTag; this.request = request; this.idempotencyKey = idempotencyKey; + this.limitKey = limitKey; this.headers = headers; } @@ -63,6 +66,11 @@ public Req getRequest() { return idempotencyKey; } + @Override + public @Nullable String getLimitKey() { + return limitKey; + } + @Override public Map getHeaders() { if (this.headers == null) { @@ -77,6 +85,7 @@ static final class Builder implements WorkflowRequestBuilder private final TypeTag resTypeTag; private final Req request; @Nullable private String idempotencyKey; + @Nullable private String limitKey; @Nullable private LinkedHashMap headers; Builder( @@ -85,12 +94,14 @@ static final class Builder implements WorkflowRequestBuilder TypeTag resTypeTag, Req request, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable LinkedHashMap headers) { this.target = target; this.reqTypeTag = reqTypeTag; this.resTypeTag = resTypeTag; this.request = request; this.idempotencyKey = idempotencyKey; + this.limitKey = limitKey; this.headers = headers; } @@ -177,6 +188,22 @@ public Builder setIdempotencyKey(@Nullable String idempotencyKey) { return idempotencyKey(idempotencyKey); } + @Override + public Builder limitKey(@Nullable String limitKey) { + this.limitKey = limitKey; + return this; + } + + @Override + public @Nullable String getLimitKey() { + return limitKey; + } + + @Override + public Builder setLimitKey(@Nullable String limitKey) { + return limitKey(limitKey); + } + @Override public @Nullable Map getHeaders() { return headers; @@ -206,6 +233,7 @@ public RequestImpl build() { this.resTypeTag, this.request, this.idempotencyKey, + this.limitKey, this.headers); } } @@ -218,6 +246,7 @@ public Builder toBuilder() { this.resTypeTag, this.request, this.idempotencyKey, + this.limitKey, this.headers); } @@ -229,12 +258,13 @@ public boolean equals(Object o) { && Objects.equals(resTypeTag, that.getResponseTypeTag()) && Objects.equals(request, that.getRequest()) && Objects.equals(idempotencyKey, that.getIdempotencyKey()) + && Objects.equals(limitKey, that.getLimitKey()) && Objects.equals(headers, that.getHeaders()); } @Override public int hashCode() { - return Objects.hash(target, reqTypeTag, resTypeTag, request, idempotencyKey, headers); + return Objects.hash(target, reqTypeTag, resTypeTag, request, idempotencyKey, limitKey, headers); } @Override @@ -251,6 +281,9 @@ public String toString() { + ", idempotencyKey='" + idempotencyKey + '\'' + + ", limitKey='" + + limitKey + + '\'' + ", headers=" + headers + '}'; diff --git a/common/src/main/java/dev/restate/common/Target.java b/common/src/main/java/dev/restate/common/Target.java index cdbd70e3a..45eb21cac 100644 --- a/common/src/main/java/dev/restate/common/Target.java +++ b/common/src/main/java/dev/restate/common/Target.java @@ -14,26 +14,52 @@ /** Represents an invocation target. */ public final class Target { + private final @Nullable String scope; private final String service; private final String handler; - private final String key; + private final @Nullable String key; - private Target(String service, String handler, String key) { + private Target(@Nullable String scope, String service, String handler, @Nullable String key) { + this.scope = scope; this.service = service; this.handler = handler; this.key = key; } public static Target virtualObject(String name, String key, String handler) { - return new Target(name, handler, key); + return new Target(null, name, handler, key); + } + + public static Target virtualObject(String scope, String name, String key, String handler) { + return new Target(scope, name, handler, key); } public static Target workflow(String name, String key, String handler) { - return new Target(name, handler, key); + return new Target(null, name, handler, key); + } + + public static Target workflow(String scope, String name, String key, String handler) { + return new Target(scope, name, handler, key); } public static Target service(String name, String handler) { - return new Target(name, handler, null); + return new Target(null, name, handler, null); + } + + public static Target service(String scope, String name, String handler) { + return new Target(scope, name, handler, null); + } + + /** Returns a new Target with the given scope. Requires Restate >= 1.7. */ + public Target scoped(String scope) { + return new Target(scope, this.service, this.handler, this.key); + } + + /** + * @return the scope. Null if no scope is set. + */ + public @Nullable String getScope() { + return scope; } public String getService() { @@ -56,21 +82,28 @@ public boolean equals(Object object) { if (this == object) return true; if (object == null || getClass() != object.getClass()) return false; Target target = (Target) object; - return Objects.equals(service, target.service) + return Objects.equals(scope, target.scope) + && Objects.equals(service, target.service) && Objects.equals(handler, target.handler) && Objects.equals(key, target.key); } @Override public int hashCode() { - return Objects.hash(service, handler, key); + return Objects.hash(scope, service, handler, key); } @Override public String toString() { - if (key == null) { - return service + "/" + handler; + StringBuilder sb = new StringBuilder(); + if (scope != null) { + sb.append(scope).append("/"); + } + sb.append(service).append("/"); + if (key != null) { + sb.append(key).append("/"); } - return service + "/" + key + "/" + handler; + sb.append(handler); + return sb.toString(); } } diff --git a/common/src/main/java/dev/restate/common/reflections/RestateUtils.java b/common/src/main/java/dev/restate/common/reflections/RestateUtils.java index 4f5786389..e03d9f1aa 100644 --- a/common/src/main/java/dev/restate/common/reflections/RestateUtils.java +++ b/common/src/main/java/dev/restate/common/reflections/RestateUtils.java @@ -19,6 +19,7 @@ public final class RestateUtils { + @Deprecated public static Request toRequest( String serviceName, @Nullable String key, @@ -27,11 +28,23 @@ public static Request toRequest( TypeTag resTypeTag, Req request, @Nullable InvocationOptions options) { - var builder = - Request.of( - Target.virtualObject(serviceName, key, handlerName), reqTypeTag, resTypeTag, request); + return toRequest(null, serviceName, key, handlerName, reqTypeTag, resTypeTag, request, options); + } + + public static Request toRequest( + @Nullable String scope, + String serviceName, + @Nullable String key, + String handlerName, + TypeTag reqTypeTag, + TypeTag resTypeTag, + Req request, + @Nullable InvocationOptions options) { + Target target = Target.virtualObject(scope, serviceName, key, handlerName); + var builder = Request.of(target, reqTypeTag, resTypeTag, request); if (options != null) { builder.setIdempotencyKey(options.getIdempotencyKey()); + builder.setLimitKey(options.getLimitKey()); if (options.getHeaders() != null) { builder.setHeaders(options.getHeaders()); } diff --git a/examples/src/main/java/my/restate/sdk/examples/ConcurrencyLimitExample.java b/examples/src/main/java/my/restate/sdk/examples/ConcurrencyLimitExample.java new file mode 100644 index 000000000..b37505a32 --- /dev/null +++ b/examples/src/main/java/my/restate/sdk/examples/ConcurrencyLimitExample.java @@ -0,0 +1,89 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package my.restate.sdk.examples; + +import dev.restate.sdk.Restate; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Service; +import dev.restate.sdk.endpoint.Endpoint; +import dev.restate.sdk.http.vertx.RestateHttpServer; + +public class ConcurrencyLimitExample { + + // --- Amazon Merchant Service: a third-party API wrapper --- + + @Service + public static class AmazonMerchantService { + + public record CheckoutRequest(String orderId, String productId, int quantity) {} + + public record CheckoutResponse(String confirmationId) {} + + @Handler + public CheckoutResponse checkout(CheckoutRequest req) { + // In a real app, this would call the Amazon Merchant API + // using the user-provided API key from the scope. + return new CheckoutResponse("conf-" + req.orderId); + } + } + + // --- Order Processor: uses scoped calls to rate-limit per API key --- + + @Service + public static class OrderProcessor { + + public record ProcessOrderRequest(String orderId, String amazonApiKey) {} + + /** + * Process an order by calling AmazonMerchantService within a scope keyed by the user's Amazon + * API key. + * + *

The scope + configured rate limit rules ensure that calls sharing the same API key are + * rate-limited (e.g. 10 requests every 2 hours), preventing us from exceeding the third-party + * API quota. + * + *

Rate limit rules are configured externally in Restate: + * + *

{@code
+     * // Default rule: on any scope, rate limit AmazonMerchantService to 10 req / 2h
+     * {
+     *   "scope": { "any": true },
+     *   "match": { "service": "AmazonMerchantService" },
+     *   "limit": {
+     *     "rateLimit": { "count": 10, "interval": { "hours": 2 } }
+     *   }
+     * }
+     *
+     * // Override for a specific API key: increase limit to 100 req / 2h
+     * {
+     *   "scope": { "equals": "amz-api-key-123" },
+     *   "match": { "service": "AmazonMerchantService" },
+     *   "limit": {
+     *     "rateLimit": { "count": 100, "interval": { "hours": 2 } }
+     *   }
+     * }
+     * }
+ */ + @Handler + public String processOrder(ProcessOrderRequest req) { + // Scope the call by the user's Amazon API key. + // Restate enforces the rate limit rules configured above. + var response = + Restate.scope(req.amazonApiKey) + .service(AmazonMerchantService.class) + .checkout(new AmazonMerchantService.CheckoutRequest(req.orderId, "product-42", 1)); + + return "Order " + req.orderId + " confirmed: " + response.confirmationId(); + } + } + + public static void main(String[] args) { + RestateHttpServer.listen(Endpoint.bind(new AmazonMerchantService()).bind(new OrderProcessor())); + } +} diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt index f5b911821..707bf30e3 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt @@ -76,6 +76,7 @@ internal constructor( request.getTarget(), resolveAndSerialize(request.getRequestTypeTag(), request.getRequest()), request.getIdempotencyKey(), + request.getLimitKey(), request.getHeaders()?.entries, ) .await() @@ -99,6 +100,7 @@ internal constructor( request.getTarget(), resolveAndSerialize(request.getRequestTypeTag(), request.getRequest()), request.getIdempotencyKey(), + request.getLimitKey(), request.getHeaders()?.entries, delay?.toJavaDuration(), ) diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index 45b8830f3..9b4a5361c 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -1404,6 +1404,57 @@ private class KRequestImpl(private val request: Request) : } } +/** + * Scope service-to-service communication, to send requests to services, virtual objects and + * workflows within a scope. Requires Restate >= 1.7. + * + * Example usage: + * ```kotlin + * @Handler + * suspend fun myHandler(): String { + * val greeter = scope("my-scope").service() + * val response = greeter.greet("Alice") + * return "Got: $response" + * } + * ``` + * + * @param scopeKey the scope key to prepend to all invocation targets + * @return a scoped context + */ +@org.jetbrains.annotations.ApiStatus.Experimental +fun scope(scopeKey: String): KScope = KScope(scopeKey) + +/** + * Scope service-to-service communication, to send requests to services, virtual objects and + * workflows within a scope. Requires Restate >= 1.7. + * + * Obtain an instance via [scope]. + */ +@org.jetbrains.annotations.ApiStatus.Experimental +class KScope +@PublishedApi +internal constructor( + @PublishedApi internal val scopeKey: String, +) { + /** @see service */ + @org.jetbrains.annotations.ApiStatus.Experimental + suspend inline fun service(): SVC { + return service(SVC::class.java, scopeKey) + } + + /** @see virtualObject */ + @org.jetbrains.annotations.ApiStatus.Experimental + suspend inline fun virtualObject(key: String): SVC { + return virtualObject(SVC::class.java, key, scopeKey) + } + + /** @see workflow */ + @org.jetbrains.annotations.ApiStatus.Experimental + suspend inline fun workflow(key: String): SVC { + return workflow(SVC::class.java, key, scopeKey) + } +} + /** * Create a proxy client for a Restate service. * @@ -1462,7 +1513,7 @@ suspend inline fun workflow(key: String): SVC { } @PublishedApi -internal fun service(clazz: Class): SVC { +internal fun service(clazz: Class, scope: String? = null): SVC { ReflectionUtils.mustHaveServiceAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -1470,7 +1521,7 @@ internal fun service(clazz: Class): SVC { val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, null).toRequest() + val request = invocation.captureInvocation(serviceName, null, scope).toRequest() // Last argument is the continuation for suspend functions @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation @@ -1483,7 +1534,7 @@ internal fun service(clazz: Class): SVC { } @PublishedApi -internal fun virtualObject(clazz: Class, key: String): SVC { +internal fun virtualObject(clazz: Class, key: String, scope: String? = null): SVC { ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -1491,7 +1542,7 @@ internal fun virtualObject(clazz: Class, key: String): SVC { val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, key).toRequest() + val request = invocation.captureInvocation(serviceName, key, scope).toRequest() // Last argument is the continuation for suspend functions @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation @@ -1504,7 +1555,7 @@ internal fun virtualObject(clazz: Class, key: String): SVC { } @PublishedApi -internal fun workflow(clazz: Class, key: String): SVC { +internal fun workflow(clazz: Class, key: String, scope: String? = null): SVC { ReflectionUtils.mustHaveWorkflowAnnotation(clazz) require(ReflectionUtils.isKotlinClass(clazz)) { "Using Java classes with Kotlin's API is not supported" @@ -1512,7 +1563,7 @@ internal fun workflow(clazz: Class, key: String): SVC { val serviceName = ReflectionUtils.extractServiceName(clazz) return ProxySupport.createProxy(clazz) { invocation -> - val request = invocation.captureInvocation(serviceName, key).toRequest() + val request = invocation.captureInvocation(serviceName, key, scope).toRequest() // Last argument is the continuation for suspend functions @Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation diff --git a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java index 6cf71824a..035a55924 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java +++ b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java @@ -103,6 +103,7 @@ public CallDurableFuture call(Request request) { request.getTarget(), input, request.getIdempotencyKey(), + request.getLimitKey(), request.getHeaders() == null ? Collections.emptyList() : request.getHeaders().entrySet())); @@ -134,6 +135,7 @@ public InvocationHandle send( request.getTarget(), input, request.getIdempotencyKey(), + request.getLimitKey(), request.getHeaders() == null ? Collections.emptyList() : request.getHeaders().entrySet(), diff --git a/sdk-api/src/main/java/dev/restate/sdk/Restate.java b/sdk-api/src/main/java/dev/restate/sdk/Restate.java index acfee597f..2ad9942be 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/Restate.java +++ b/sdk-api/src/main/java/dev/restate/sdk/Restate.java @@ -80,6 +80,23 @@ */ @org.jetbrains.annotations.ApiStatus.Experimental public final class Restate { + /** + * EXPERIMENTAL API: Scope service-to-service communication, to send requests to services, + * virtual objects and workflows within a scope. Requires Restate >= 1.7. + * + *
{@code
+   * var greeterProxy = Restate.scope("my-scope").service(Greeter.class);
+   * GreetingResponse response = greeterProxy.greet(new Greeting("Alice"));
+   * }
+ * + * @param scopeKey the scope key to prepend to all invocation targets + * @return a scoped entry point + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public static Scope scope(String scopeKey) { + return new Scope(scopeKey); + } + /** * @see Context#request() */ diff --git a/sdk-api/src/main/java/dev/restate/sdk/Scope.java b/sdk-api/src/main/java/dev/restate/sdk/Scope.java new file mode 100644 index 000000000..df38af81b --- /dev/null +++ b/sdk-api/src/main/java/dev/restate/sdk/Scope.java @@ -0,0 +1,131 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk; + +import dev.restate.common.Request; +import dev.restate.common.Target; +import dev.restate.common.reflections.MethodInfo; +import dev.restate.common.reflections.ProxySupport; +import dev.restate.common.reflections.ReflectionUtils; +import dev.restate.serde.TypeTag; + +/** + * Scope service-to-service communication, to send requests to services, virtual objects and + * workflows within a scope. Requires Restate >= 1.7. + * + *

Obtain an instance via {@link Restate#scope(String)}. + */ +@org.jetbrains.annotations.ApiStatus.Experimental +public final class Scope { + + private final String scope; + + Scope(String scope) { + this.scope = scope; + } + + /** + * @see Restate#service(Class) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC service(Class clazz) { + ReflectionUtils.mustHaveServiceAnnotation(clazz); + String serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return Context.current() + .call( + Request.of( + Target.virtualObject(scope, serviceName, null, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .await(); + }); + } + + /** + * @see Restate#serviceHandle(Class) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ServiceHandle serviceHandle(Class clazz) { + ReflectionUtils.mustHaveServiceAnnotation(clazz); + return new ServiceHandleImpl<>(clazz, null, scope); + } + + /** + * @see Restate#virtualObject(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC virtualObject(Class clazz, String key) { + ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz); + String serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return Context.current() + .call( + Request.of( + Target.virtualObject(scope, serviceName, key, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .await(); + }); + } + + /** + * @see Restate#virtualObjectHandle(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ServiceHandle virtualObjectHandle(Class clazz, String key) { + ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz); + return new ServiceHandleImpl<>(clazz, key, scope); + } + + /** + * @see Restate#workflow(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public SVC workflow(Class clazz, String key) { + ReflectionUtils.mustHaveWorkflowAnnotation(clazz); + String serviceName = ReflectionUtils.extractServiceName(clazz); + return ProxySupport.createProxy( + clazz, + invocation -> { + var methodInfo = MethodInfo.fromMethod(invocation.getMethod()); + + //noinspection unchecked + return Context.current() + .call( + Request.of( + Target.virtualObject(scope, serviceName, key, methodInfo.getHandlerName()), + (TypeTag) methodInfo.getInputType(), + (TypeTag) methodInfo.getOutputType(), + invocation.getArguments().length == 0 ? null : invocation.getArguments()[0])) + .await(); + }); + } + + /** + * @see Restate#workflowHandle(Class, String) + */ + @org.jetbrains.annotations.ApiStatus.Experimental + public ServiceHandle workflowHandle(Class clazz, String key) { + ReflectionUtils.mustHaveWorkflowAnnotation(clazz); + return new ServiceHandleImpl<>(clazz, key, scope); + } +} diff --git a/sdk-api/src/main/java/dev/restate/sdk/ServiceHandleImpl.java b/sdk-api/src/main/java/dev/restate/sdk/ServiceHandleImpl.java index f4d8eccdd..99e219aaa 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/ServiceHandleImpl.java +++ b/sdk-api/src/main/java/dev/restate/sdk/ServiceHandleImpl.java @@ -26,14 +26,20 @@ final class ServiceHandleImpl implements ServiceHandle { private final Class clazz; private final String serviceName; private final @Nullable String key; + private final @Nullable String scope; // To use call/send private MethodInfoCollector methodInfoCollector; ServiceHandleImpl(Class clazz, @Nullable String key) { + this(clazz, key, null); + } + + ServiceHandleImpl(Class clazz, @Nullable String key, @Nullable String scope) { this.clazz = clazz; this.serviceName = ReflectionUtils.extractServiceName(clazz); this.key = key; + this.scope = scope; } @SuppressWarnings("unchecked") @@ -44,6 +50,7 @@ public DurableFuture call( return Context.current() .call( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -61,6 +68,7 @@ public DurableFuture call( return Context.current() .call( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -77,6 +85,7 @@ public DurableFuture call(Function methodReference, InvocationOpt return Context.current() .call( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -92,6 +101,7 @@ public DurableFuture call(Consumer methodReference, InvocationOptions return Context.current() .call( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -109,6 +119,7 @@ public InvocationHandle send( return Context.current() .send( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -127,6 +138,7 @@ public InvocationHandle send( return Context.current() .send( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -145,6 +157,7 @@ public InvocationHandle send( return Context.current() .send( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), @@ -162,6 +175,7 @@ public InvocationHandle send( return Context.current() .send( toRequest( + scope, serviceName, key, methodInfo.getHandlerName(), diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java index ba564e4b2..b44d0b93c 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java @@ -61,16 +61,37 @@ public interface HandlerContext { record CallResult( AsyncResult invocationIdAsyncResult, AsyncResult callAsyncResult) {} + @Deprecated + default CompletableFuture call( + Target target, + Slice parameter, + @Nullable String idempotencyKey, + @Nullable Collection> headers) { + return call(target, parameter, idempotencyKey, null, headers); + } + CompletableFuture call( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers); + @Deprecated + default CompletableFuture> send( + Target target, + Slice parameter, + @Nullable String idempotencyKey, + @Nullable Collection> headers, + @Nullable Duration delay) { + return send(target, parameter, idempotencyKey, null, headers, delay); + } + CompletableFuture> send( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers, @Nullable Duration delay); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingHandlerContextImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingHandlerContextImpl.java index 257e951a6..10271bfc7 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingHandlerContextImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingHandlerContextImpl.java @@ -84,9 +84,10 @@ public CompletableFuture call( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers) { return CompletableFuture.supplyAsync( - () -> super.call(target, parameter, idempotencyKey, headers), coreExecutor) + () -> super.call(target, parameter, idempotencyKey, limitKey, headers), coreExecutor) .thenCompose(Function.identity()); } @@ -95,10 +96,12 @@ public CompletableFuture> send( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers, @Nullable Duration delay) { return CompletableFuture.supplyAsync( - () -> super.send(target, parameter, idempotencyKey, headers, delay), coreExecutor) + () -> super.send(target, parameter, idempotencyKey, limitKey, headers, delay), + coreExecutor) .thenCompose(Function.identity()); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java index 57906ef86..375866e48 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java @@ -215,11 +215,12 @@ public CompletableFuture call( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers) { return catchExceptions( () -> { StateMachine.CallHandle callHandle = - this.stateMachine.call(target, parameter, idempotencyKey, headers); + this.stateMachine.call(target, parameter, idempotencyKey, limitKey, headers); AsyncResultInternal invocationIdAsyncResult = AsyncResults.single(this, callHandle.invocationIdHandle(), invocationIdCompleter()); @@ -238,12 +239,13 @@ public CompletableFuture> send( Target target, Slice parameter, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers, @Nullable Duration delay) { return catchExceptions( () -> { int sendHandle = - this.stateMachine.send(target, parameter, idempotencyKey, headers, delay); + this.stateMachine.send(target, parameter, idempotencyKey, limitKey, headers, delay); return AsyncResults.single(this, sendHandle, invocationIdCompleter()); }); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ServiceProtocol.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ServiceProtocol.java index 214fb0e2b..0afb476eb 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ServiceProtocol.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ServiceProtocol.java @@ -15,7 +15,7 @@ public class ServiceProtocol { public static final Protocol.ServiceProtocolVersion MIN_SERVICE_PROTOCOL_VERSION = Protocol.ServiceProtocolVersion.V5; public static final Protocol.ServiceProtocolVersion MAX_SERVICE_PROTOCOL_VERSION = - Protocol.ServiceProtocolVersion.V6; + Protocol.ServiceProtocolVersion.V7; static final String CONTENT_TYPE = "content-type"; @@ -43,6 +43,9 @@ static Protocol.ServiceProtocolVersion parseServiceProtocolVersion(String versio if (version.equals("application/vnd.restate.invocation.v6")) { return Protocol.ServiceProtocolVersion.V6; } + if (version.equals("application/vnd.restate.invocation.v7")) { + return Protocol.ServiceProtocolVersion.V7; + } return Protocol.ServiceProtocolVersion.SERVICE_PROTOCOL_VERSION_UNSPECIFIED; } @@ -65,6 +68,9 @@ static String serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersio if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V6) { return "application/vnd.restate.invocation.v6"; } + if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V7) { + return "application/vnd.restate.invocation.v7"; + } throw new IllegalArgumentException( String.format("Service protocol version '%s' has no header value", version.getNumber())); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java index 14d810c11..8bbf97bd8 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java @@ -95,12 +95,14 @@ CallHandle call( Target target, Slice payload, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers); int send( Target target, Slice payload, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers, @Nullable Duration delay); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java index 354daa266..bd84300e3 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java @@ -278,11 +278,14 @@ public CallHandle call( Target target, Slice payload, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers) { LOG.debug("Executing 'Call {}'", target); if (idempotencyKey != null && idempotencyKey.isBlank()) { throw ProtocolException.idempotencyKeyIsEmpty(); } + verifyScopeFeatureSupport(target); + verifyLimitKeyFeatureSupport(limitKey); var invocationIdCompletionId = this.stateContext.getJournal().nextCompletionNotificationId(); var callCompletionId = this.stateContext.getJournal().nextCompletionNotificationId(); @@ -297,9 +300,15 @@ public CallHandle call( if (target.getKey() != null) { callCommandBuilder.setKey(target.getKey()); } + if (target.getScope() != null) { + callCommandBuilder.setScope(target.getScope()); + } if (idempotencyKey != null) { callCommandBuilder.setIdempotencyKey(idempotencyKey); } + if (limitKey != null) { + callCommandBuilder.setLimitKey(limitKey); + } if (headers != null) { for (var header : headers) { callCommandBuilder.addHeaders( @@ -327,6 +336,7 @@ public int send( Target target, Slice payload, @Nullable String idempotencyKey, + @Nullable String limitKey, @Nullable Collection> headers, @Nullable Duration delay) { if (delay != null && !delay.isZero()) { @@ -337,6 +347,8 @@ public int send( if (idempotencyKey != null && idempotencyKey.isBlank()) { throw ProtocolException.idempotencyKeyIsEmpty(); } + verifyScopeFeatureSupport(target); + verifyLimitKeyFeatureSupport(limitKey); var invocationIdCompletionId = this.stateContext.getJournal().nextCompletionNotificationId(); @@ -349,9 +361,15 @@ public int send( if (target.getKey() != null) { sendCommandBuilder.setKey(target.getKey()); } + if (target.getScope() != null) { + sendCommandBuilder.setScope(target.getScope()); + } if (idempotencyKey != null) { sendCommandBuilder.setIdempotencyKey(idempotencyKey); } + if (limitKey != null) { + sendCommandBuilder.setLimitKey(limitKey); + } if (headers != null) { for (var header : headers) { sendCommandBuilder.addHeaders( @@ -684,4 +702,26 @@ private void verifyErrorMetadataFeatureSupport(TerminalException exception) { stateContext.getNegotiatedProtocolVersion()); } } + + private void verifyLimitKeyFeatureSupport(@Nullable String limitKey) { + if (limitKey != null + && stateContext.getNegotiatedProtocolVersion().getNumber() + < Protocol.ServiceProtocolVersion.V7.getNumber()) { + throw ProtocolException.unsupportedFeature( + "limit key", + Protocol.ServiceProtocolVersion.V7, + stateContext.getNegotiatedProtocolVersion()); + } + } + + private void verifyScopeFeatureSupport(Target target) { + if (target.getScope() != null + && stateContext.getNegotiatedProtocolVersion().getNumber() + < Protocol.ServiceProtocolVersion.V7.getNumber()) { + throw ProtocolException.unsupportedFeature( + "invocation target scope", + Protocol.ServiceProtocolVersion.V7, + stateContext.getNegotiatedProtocolVersion()); + } + } } diff --git a/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto b/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto index 0a6696533..3cd0e16ad 100644 --- a/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto +++ b/sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto @@ -36,6 +36,12 @@ enum ServiceProtocolVersion { // * StartMessage.random_seed // * Failure.metadata V6 = 6; + // Added: + // * CallCommandMessage.scope + // * OneWayCallCommandMessage.scope + // * CallCommandMessage.limit_key + // * OneWayCallCommandMessage.limit_key + V7 = 7; } // --- Core frames --- @@ -419,6 +425,12 @@ message CallCommandMessage { // If present, it must be non empty. optional string idempotency_key = 6; + // Optional scope for the invocation target. + optional string scope = 7; + + // Optional limit key for the invocation. + optional string limit_key = 8; + uint32 invocation_id_notification_idx = 10; uint32 result_completion_id = 11; string name = 12; @@ -471,6 +483,12 @@ message OneWayCallCommandMessage { // If present, it must be non empty. optional string idempotency_key = 7; + // Optional scope for the invocation target. + optional string scope = 8; + + // Optional limit key for the invocation. + optional string limit_key = 9; + uint32 invocation_id_notification_idx = 10; string name = 12; } diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java index 6a68958fe..12af0be1a 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/ProtoUtils.java @@ -247,6 +247,9 @@ public static Protocol.CallCommandMessage.Builder callCmd( if (target.getKey() != null) { builder.setKey(target.getKey()); } + if (target.getScope() != null) { + builder.setScope(target.getScope()); + } builder .setInvocationIdNotificationIdx(invocationIdCompletionId) .setResultCompletionId(resultCompletionId); @@ -288,6 +291,9 @@ public static Protocol.OneWayCallCommandMessage.Builder oneWayCallCmd( if (target.getKey() != null) { builder.setKey(target.getKey()); } + if (target.getScope() != null) { + builder.setScope(target.getScope()); + } if (idempotencyKey != null) { builder.setIdempotencyKey(idempotencyKey); } diff --git a/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java index 750874140..fb6d01890 100644 --- a/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java +++ b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java @@ -138,7 +138,11 @@ public CompletableFuture> timer(Duration duration, String s) { @Override public CompletableFuture call( - Target target, Slice slice, String s, Collection> collection) { + Target target, + Slice slice, + String s, + String limitKey, + Collection> collection) { throw new UnsupportedOperationException( "FakeHandlerContext doesn't currently support mocking this operation"); } @@ -148,6 +152,7 @@ public CompletableFuture> send( Target target, Slice slice, String s, + String limitKey, Collection> collection, Duration duration) { throw new UnsupportedOperationException(