From 69f8c2f7e626be5548d7469e4792e7c81b7d0039 Mon Sep 17 00:00:00 2001 From: Sania Parveen Date: Thu, 21 May 2026 18:04:09 +0000 Subject: [PATCH 1/3] Instead of memoizing Backoff constructed instance, memoize the builder config instead --- .../client/grpc/GrpcWindmillStreamFactory.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index 244d2ad3fa14..a60b98a8412b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -117,13 +117,12 @@ private GrpcWindmillStreamFactory( this.streamingRpcBatchLimit = streamingRpcBatchLimit; this.windmillMessagesBetweenIsReadyChecks = windmillMessagesBetweenIsReadyChecks; // Configure backoff to retry calls forever, with a maximum sane retry interval. - this.grpcBackOff = + Supplier backoffConfig = Suppliers.memoize( - () -> - FluentBackoff.DEFAULT - .withInitialBackoff(MIN_BACKOFF) - .withMaxBackoff(maxBackOffSupplier.get()) - .backoff()); + () -> FluentBackoff.DEFAULT + .withInitialBackoff(MIN_BACKOFF) + .withMaxBackoff(maxBackOffSupplier.get())); + this.grpcBackOff = () -> backoffConfig.get().backoff(); this.streamRegistry = ConcurrentHashMap.newKeySet(); this.sendKeyedGetDataRequests = sendKeyedGetDataRequests; this.requestBatchedGetWorkResponse = requestBatchedGetWorkResponse; From dd5434ee13ac67c391da46c9f783f45c21382d7d Mon Sep 17 00:00:00 2001 From: Sania Parveen Date: Thu, 21 May 2026 18:24:58 +0000 Subject: [PATCH 2/3] fix indendation --- .../windmill/client/grpc/GrpcWindmillStreamFactory.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index a60b98a8412b..52d419b1c255 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -119,9 +119,10 @@ private GrpcWindmillStreamFactory( // Configure backoff to retry calls forever, with a maximum sane retry interval. Supplier backoffConfig = Suppliers.memoize( - () -> FluentBackoff.DEFAULT - .withInitialBackoff(MIN_BACKOFF) - .withMaxBackoff(maxBackOffSupplier.get())); + () -> + FluentBackoff.DEFAULT + .withInitialBackoff(MIN_BACKOFF) + .withMaxBackoff(maxBackOffSupplier.get())); this.grpcBackOff = () -> backoffConfig.get().backoff(); this.streamRegistry = ConcurrentHashMap.newKeySet(); this.sendKeyedGetDataRequests = sendKeyedGetDataRequests; From 47756cccc40be1d7898b8ef906b5fc4c2631c430 Mon Sep 17 00:00:00 2001 From: Sania Parveen Date: Thu, 21 May 2026 18:31:37 +0000 Subject: [PATCH 3/3] indendation fix --- .../worker/windmill/client/grpc/GrpcWindmillStreamFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java index 52d419b1c255..0184b88d53cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -1,4 +1,4 @@ - /* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information