diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index 244d2ad3fa14..0184b88d53cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -117,13 +117,13 @@ private GrpcWindmillStreamFactory( this.streamingRpcBatchLimit = streamingRpcBatchLimit; this.windmillMessagesBetweenIsReadyChecks = windmillMessagesBetweenIsReadyChecks; // Configure backoff to retry calls forever, with a maximum sane retry interval. - this.grpcBackOff = + Supplier backoffConfig = Suppliers.memoize( () -> FluentBackoff.DEFAULT .withInitialBackoff(MIN_BACKOFF) - .withMaxBackoff(maxBackOffSupplier.get()) - .backoff()); + .withMaxBackoff(maxBackOffSupplier.get())); + this.grpcBackOff = () -> backoffConfig.get().backoff(); this.streamRegistry = ConcurrentHashMap.newKeySet(); this.sendKeyedGetDataRequests = sendKeyedGetDataRequests; this.requestBatchedGetWorkResponse = requestBatchedGetWorkResponse;