From 4753a8bea6de1ad5db9a0c5262ad0c9de01a86c4 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 22 May 2026 00:26:00 +0000 Subject: [PATCH 1/4] [Dataflow Streaming] Prepare BoundedQueueExecutor for MultiKey bundles --- .../worker/streaming/ExecutableWork.java | 18 ++-- .../dataflow/worker/streaming/WorkResult.java | 32 +++++++ .../worker/util/BoundedQueueExecutor.java | 59 ++++++++---- .../dataflow/worker/util/ExceptionUtils.java | 41 +++++++++ .../processing/StreamingCommitFinalizer.java | 2 +- .../processing/StreamingWorkScheduler.java | 17 ++-- .../worker/StreamingDataflowWorkerTest.java | 6 +- .../worker/streaming/ActiveWorkStateTest.java | 4 +- .../streaming/ComputationStateCacheTest.java | 2 +- .../worker/util/BoundedQueueExecutorTest.java | 90 +++++++++++++------ .../failures/WorkFailureProcessorTest.java | 6 +- .../work/refresh/ActiveWorkRefresherTest.java | 6 +- 12 files changed, 222 insertions(+), 61 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java index db279f066630..9f29a6496f77 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java @@ -18,24 +18,28 @@ package org.apache.beam.runners.dataflow.worker.streaming; import com.google.auto.value.AutoValue; -import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; /** {@link Work} instance and a processing function used to process the work. */ @AutoValue -public abstract class ExecutableWork implements Runnable { +public abstract class ExecutableWork { - public static ExecutableWork create(Work work, Consumer executeWorkFn) { + public static ExecutableWork create(Work work, Function executeWorkFn) { return new AutoValue_ExecutableWork(work, executeWorkFn); } public abstract Work work(); - public abstract Consumer executeWorkFn(); + public abstract Function executeWorkFn(); - @Override - public void run() { - executeWorkFn().accept(work()); + public WorkResult run() { + try { + return executeWorkFn().apply(work()); + } catch (Throwable t) { + throw ExceptionUtils.propagate(t); + } } public final WorkId id() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java new file mode 100644 index 000000000000..07db0e98aca4 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; + +/** The result of executing an {@link ExecutableWork}. */ +@AutoValue +public abstract class WorkResult { + public static WorkResult create(int itemsProcessed, long bytesProcessed) { + return new AutoValue_WorkResult(itemsProcessed, bytesProcessed); + } + + public abstract int itemsProcessed(); + + public abstract long bytesProcessed(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9079c3cc69b8..5a2a7584bc9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -38,7 +40,7 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor; - private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); private final Object decrementQueueDrainLock = new Object(); private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; @@ -106,7 +108,7 @@ protected void afterExecute(Runnable r, Throwable t) { // Before adding a Work to the queue, check that there are enough bytes of space or no other // outstanding elements of work. - public void execute(Runnable work, long workBytes) { + public void execute(ExecutableWork work, long workBytes) { monitor.enterWhenUninterruptibly( new Guard(monitor) { @Override @@ -119,12 +121,17 @@ public boolean isSatisfied() { executeMonitorHeld(work, workBytes); } - // Forcibly add something to the queue, ignoring the length limit. - public void forceExecute(Runnable work, long workBytes) { + public void forceExecute(ExecutableWork work, long workBytes) { monitor.enter(); executeMonitorHeld(work, workBytes); } + /** Forcibly execute a Runnable callback with 0 bytes of size. */ + public void forceExecute(Runnable work) { + monitor.enter(); + executeMonitorHeld(work); + } + // Set the maximum/core pool size of the executor. public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core @@ -221,32 +228,54 @@ public String summaryHtml() { } } - private void executeMonitorHeld(Runnable work, long workBytes) { + private void executeMonitorHeld(ExecutableWork work, long workBytes) { bytesOutstanding += workBytes; ++elementsOutstanding; monitor.leave(); + executor.execute( + () -> { + // Any execution exception thrown by work.run() propagates uncaught, triggering + // the default JVM UncaughtExceptionHandler which immediately crashes/terminates + // the JVM. Since the process exits immediately, reclaiming resource budgets in + // this JVM is unnecessary. Furthermore, since a failed execution does not return + // a WorkResult, we do not have a good/accurate fallback value to decrement. + WorkResult result = work.run(); + decrementCounters(result); + }); + } + + private void executeMonitorHeld(Runnable work) { + ++elementsOutstanding; + monitor.leave(); + try { executor.execute( () -> { try { work.run(); } finally { - decrementCounters(workBytes); + // Commit finalizer callbacks catch and swallow all exceptions downstream + // to keep the worker alive (so the JVM does not crash). Therefore, to + // prevent elements outstanding capacity leaks under swallowed failures, + // we must guarantee decrementing element counts in the finally block. + decrementCounters(WorkResult.create(1, 0L)); } }); - } catch (RuntimeException e) { - // If the execute() call threw an exception, decrement counters here. - decrementCounters(workBytes); - throw e; + } catch (Throwable e) { + // Since finalizer rejections are caught and swallowed downstream, we must + // decrement elements outstanding immediately on task submission failure to + // prevent permanent capacity leaks in the running JVM. + decrementCounters(WorkResult.create(1, 0L)); + throw ExceptionUtils.propagate(e); } } - private void decrementCounters(long workBytes) { + private void decrementCounters(WorkResult result) { // All threads queue decrements and one thread grabs the monitor and updates // counters. We do this to reduce contention on monitor which is locked by // GetWork thread - decrementQueue.add(workBytes); + decrementQueue.add(result); boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); if (submittedToExistingBatch) { // There is already a thread about to drain the decrement queue @@ -265,12 +294,12 @@ private void decrementCounters(long workBytes) { long bytesToDecrement = 0; int elementsToDecrement = 0; while (true) { - Long pollResult = decrementQueue.poll(); + WorkResult pollResult = decrementQueue.poll(); if (pollResult == null) { break; } - bytesToDecrement += pollResult; - ++elementsToDecrement; + bytesToDecrement += pollResult.bytesProcessed(); + elementsToDecrement += pollResult.itemsProcessed(); } if (elementsToDecrement == 0) { return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java new file mode 100644 index 000000000000..4bbdbb3216ca --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import org.apache.beam.sdk.annotations.Internal; + +/** Utility methods for simplifying work with exceptions and throwables. */ +@Internal +public final class ExceptionUtils { + + private ExceptionUtils() {} + + /** + * Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link + * Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates. + */ + public static RuntimeException propagate(Throwable throwable) { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } else if (throwable instanceof Error) { + throw (Error) throwable; + } else { + throw new RuntimeException(throwable); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 5a66545ab335..22573bf1ced2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable finalizeIds) { } for (Runnable callback : callbacksToExecute) { try { - finalizationExecutor.forceExecute(callback, 0); + finalizationExecutor.forceExecute(callback); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 1428037d9ca0..65a1325cd40e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -42,11 +42,13 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; @@ -233,15 +235,15 @@ public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { * * @implNote This will block the calling thread during execution of user DoFns. */ - private void processWork( + private WorkResult processWork( ComputationState computationState, Work work, ImmutableList getWorkStreamLatencies) { work.recordGetWorkStreamLatencies(getWorkStreamLatencies); - processWork(computationState, work); + return processWork(computationState, work); } - private void processWork(ComputationState computationState, Work work) { + private WorkResult processWork(ComputationState computationState, Work work) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); @@ -258,7 +260,7 @@ private void processWork(ComputationState computationState, Work work) { outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); work.setState(Work.State.COMMIT_QUEUED); work.queueCommit(outputBuilder.build(), computationState); - return; + return WorkResult.create(1, work.getSerializedWorkItemSize()); } long processingStartTimeNanos = System.nanoTime(); @@ -284,6 +286,7 @@ private void processWork(ComputationState computationState, Work work) { work.queueCommit(validatedCommitRequest, computationState); recordProcessingStats(commitRequest, workItem, executeWorkResult); LOG.debug("Processing done for work token: {}", workItem.getWorkToken()); + return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (Throwable t) { // OutOfMemoryError that are caught will be rethrown and trigger jvm termination. try { @@ -294,10 +297,14 @@ private void processWork(ComputationState computationState, Work work) { invalidWork -> computationState.completeWorkAndScheduleNextWorkForKey( invalidWork.getShardedKey(), invalidWork.id())); + // Failure successfully processed/invalidated/rescheduled. Return failure WorkResult to + // release budget cleanly. + return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t2) { - throw new RuntimeException(t2); + LOG.warn("Failed to process work failure safely for work {}", work.id(), t2); + throw ExceptionUtils.propagate(t2); } } finally { // Update total processing time counters. Updating in finally clause ensures that diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d8a1d1b90d47..593883343bd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -106,6 +106,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; @@ -373,7 +374,10 @@ private static ExecutableWork createMockWork( computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)), false, Instant::now), - processWorkFn); + work -> { + processWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } private byte[] intervalWindowBytes(IntervalWindow window) throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 865ae2612803..4942e9610fd6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -73,7 +73,7 @@ private static ExecutableWork createWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, Instant::now), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { @@ -85,7 +85,7 @@ private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, () -> Instant.EPOCH), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } private static Work.ProcessingContext createWorkProcessingContext() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java index 1c8b8fca131d..d9ad1157ee4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java @@ -77,7 +77,7 @@ private static ExecutableWork createWork(ShardedKey shardedKey, long workToken, mock(HeartbeatSender.class)), false, Instant::now), - ignored -> {}); + work -> WorkResult.create(1, work.getSerializedWorkItemSize())); } @Before diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index d7ea039bb809..bdadcd2b9e8c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -85,18 +86,22 @@ private static ExecutableWork createWork(Consumer executeWorkFn) { mock(HeartbeatSender.class)), false, Instant::now), - executeWorkFn); + work -> { + executeWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } - private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) { - return () -> { - start.countDown(); - try { - stop.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + private ExecutableWork createSleepProcessWork(CountDownLatch start, CountDownLatch stop) { + return createWork( + ignored -> { + start.countDown(); + try { + stop.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Before @@ -123,9 +128,9 @@ public void testScheduleWorkWhenExceedMaximumPoolSize() throws Exception { CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); executor.execute(m1, 1); processStart1.await(); @@ -152,8 +157,8 @@ public void testScheduleWorkWhenExceedMaximumBytesOutstanding() throws Exception CountDownLatch processStop1 = new CountDownLatch(1); CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStop2 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); executor.execute(m1, 10000000); processStart1.await(); @@ -187,9 +192,9 @@ public void testOverrideMaximumPoolSize() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -225,9 +230,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -264,9 +269,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncrease CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -308,9 +313,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); // Initial state. assertEquals(0, executor.activeCount()); @@ -351,6 +356,37 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( executor.shutdown(); } + @Test + public void testRunnableExceptionPropagationDecrementsCounters() throws Exception { + CountDownLatch processStart = new CountDownLatch(1); + CountDownLatch processStop = new CountDownLatch(1); + + Runnable work = + () -> { + processStart.countDown(); + try { + processStop.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("Simulated finalizer processing exception"); + }; + + executor.forceExecute(work); + processStart.await(); + + assertEquals(1, executor.elementsOutstanding()); + + processStop.countDown(); + + // Wait until outstanding elements are released + while (executor.elementsOutstanding() != 0) { + Thread.sleep(10); + } + + assertEquals(0, executor.elementsOutstanding()); + } + @Test public void testRenderSummaryHtml() { String expectedSummaryHtml = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java index 68a11895fa12..291be6f0bf9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -98,7 +99,10 @@ private static ExecutableWork createWork(Supplier clock, Consumer mock(HeartbeatSender.class)), false, clock), - processWorkFn); + work -> { + processWorkFn.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } private static ExecutableWork createWork(Consumer processWorkFn) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java index 054db878c869..e9dc42de8aa6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -137,7 +138,10 @@ private ExecutableWork createOldWork( "computationId", new FakeGetDataClient(), ignored -> {}, heartbeatSender), false, ActiveWorkRefresherTest::aLongTimeAgo), - processWork); + work -> { + processWork.accept(work); + return WorkResult.create(1, work.getSerializedWorkItemSize()); + }); } @Test From 0726da4e690b5e284b619812eeb1b4af0fe2d020 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 22 May 2026 22:32:05 +0000 Subject: [PATCH 2/4] [Dataflow Streaming] Refactor BoundedQueueExecutor with handles --- ...va => BoundedQueueExecutorWorkHandle.java} | 18 +- .../worker/streaming/ExecutableWork.java | 11 +- .../worker/util/BoundedQueueExecutor.java | 158 ++++++++++++++---- .../processing/StreamingWorkScheduler.java | 23 ++- .../worker/StreamingDataflowWorkerTest.java | 4 +- .../worker/streaming/ActiveWorkStateTest.java | 4 +- .../streaming/ComputationStateCacheTest.java | 2 +- .../worker/util/BoundedQueueExecutorTest.java | 145 +++++++++++++++- .../failures/WorkFailureProcessorTest.java | 4 +- .../work/refresh/ActiveWorkRefresherTest.java | 4 +- 10 files changed, 295 insertions(+), 78 deletions(-) rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/{WorkResult.java => BoundedQueueExecutorWorkHandle.java} (69%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java similarity index 69% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java index 07db0e98aca4..1ca534966947 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkResult.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java @@ -17,16 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.streaming; -import com.google.auto.value.AutoValue; - -/** The result of executing an {@link ExecutableWork}. */ -@AutoValue -public abstract class WorkResult { - public static WorkResult create(int itemsProcessed, long bytesProcessed) { - return new AutoValue_WorkResult(itemsProcessed, bytesProcessed); - } - - public abstract int itemsProcessed(); - - public abstract long bytesProcessed(); -} +/** + * A handle to use when requesting pulling more work from @BoundedQueueExecutor + * via @BoundedQueueExecutor.pollWork + */ +public interface BoundedQueueExecutorWorkHandle {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java index 9f29a6496f77..161c16106373 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.worker.streaming; import com.google.auto.value.AutoValue; -import java.util.function.Function; +import java.util.function.BiConsumer; import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; @@ -26,17 +26,18 @@ @AutoValue public abstract class ExecutableWork { - public static ExecutableWork create(Work work, Function executeWorkFn) { + public static ExecutableWork create( + Work work, BiConsumer executeWorkFn) { return new AutoValue_ExecutableWork(work, executeWorkFn); } public abstract Work work(); - public abstract Function executeWorkFn(); + public abstract BiConsumer executeWorkFn(); - public WorkResult run() { + public void run(BoundedQueueExecutorWorkHandle handle) { try { - return executeWorkFn().apply(work()); + executeWorkFn().accept(work(), handle); } catch (Throwable t) { throw ExceptionUtils.propagate(t); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 5a2a7584bc9b..386b9e0a436a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -24,8 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -40,7 +43,19 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor; - private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); + + private static class Budget { + + final int elements; + final long bytes; + + Budget(int elements, long bytes) { + this.elements = elements; + this.bytes = bytes; + } + } + + private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); private final Object decrementQueueDrainLock = new Object(); private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; @@ -121,15 +136,16 @@ public boolean isSatisfied() { executeMonitorHeld(work, workBytes); } + // Forcibly add ExecutableWork to the queue, ignoring the limits. public void forceExecute(ExecutableWork work, long workBytes) { monitor.enter(); executeMonitorHeld(work, workBytes); } /** Forcibly execute a Runnable callback with 0 bytes of size. */ - public void forceExecute(Runnable work) { + public void forceExecute(Runnable runnable) { monitor.enter(); - executeMonitorHeld(work); + executeMonitorHeld(runnable); } // Set the maximum/core pool size of the executor. @@ -228,21 +244,84 @@ public String summaryHtml() { } } + class BoundedQueueExecutorWorkHandleImpl + implements BoundedQueueExecutorWorkHandle, AutoCloseable { + + private int elements; + private long bytes; + private boolean closed = false; + + private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) { + this.elements = elements; + this.bytes = bytes; + } + + public synchronized void addBudget(int elements, long bytes) { + Preconditions.checkState(!closed, "Cannot add budget to a closed WorkBudgetHandle"); + this.elements += elements; + this.bytes += bytes; + } + + public synchronized void cancel() { + this.closed = true; + } + + @Override + public synchronized void close() { + Preconditions.checkArgument(!closed); + closed = true; + decrementCounters(this.elements, this.bytes); + } + } + + private static class QueuedWork implements Runnable { + + private final ExecutableWork work; + private final BoundedQueueExecutorWorkHandleImpl handle; + private final long workBytes; + + public QueuedWork( + ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long workBytes) { + this.work = work; + this.handle = handle; + this.workBytes = workBytes; + } + + public void cancelHandle() { + handle.cancel(); + } + + public ExecutableWork getWork() { + return work; + } + + public long getWorkBytes() { + return workBytes; + } + + @Override + public void run() { + Preconditions.checkArgument(!handle.closed); + try { + work.run(handle); + } finally { + handle.close(); + } + } + } + private void executeMonitorHeld(ExecutableWork work, long workBytes) { - bytesOutstanding += workBytes; ++elementsOutstanding; + bytesOutstanding += workBytes; monitor.leave(); - - executor.execute( - () -> { - // Any execution exception thrown by work.run() propagates uncaught, triggering - // the default JVM UncaughtExceptionHandler which immediately crashes/terminates - // the JVM. Since the process exits immediately, reclaiming resource budgets in - // this JVM is unnecessary. Furthermore, since a failed execution does not return - // a WorkResult, we do not have a good/accurate fallback value to decrement. - WorkResult result = work.run(); - decrementCounters(result); - }); + BoundedQueueExecutorWorkHandleImpl handle = + new BoundedQueueExecutorWorkHandleImpl(1, workBytes); + try { + executor.execute(new QueuedWork(work, handle, workBytes)); + } catch (Throwable e) { + handle.close(); + throw ExceptionUtils.propagate(e); + } } private void executeMonitorHeld(Runnable work) { @@ -255,27 +334,40 @@ private void executeMonitorHeld(Runnable work) { try { work.run(); } finally { - // Commit finalizer callbacks catch and swallow all exceptions downstream - // to keep the worker alive (so the JVM does not crash). Therefore, to - // prevent elements outstanding capacity leaks under swallowed failures, - // we must guarantee decrementing element counts in the finally block. - decrementCounters(WorkResult.create(1, 0L)); + decrementCounters(1, 0L); } }); } catch (Throwable e) { - // Since finalizer rejections are caught and swallowed downstream, we must - // decrement elements outstanding immediately on task submission failure to - // prevent permanent capacity leaks in the running JVM. - decrementCounters(WorkResult.create(1, 0L)); + decrementCounters(1, 0L); throw ExceptionUtils.propagate(e); } } - private void decrementCounters(WorkResult result) { - // All threads queue decrements and one thread grabs the monitor and updates - // counters. We do this to reduce contention on monitor which is locked by - // GetWork thread - decrementQueue.add(result); + @VisibleForTesting + BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { + return new BoundedQueueExecutorWorkHandleImpl(0, 0L); + } + + public Optional pollWork(BoundedQueueExecutorWorkHandle handle) { + BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; + while (true) { + Runnable runnable = executor.getQueue().poll(); + if (runnable == null) { + return Optional.empty(); + } + if (runnable instanceof QueuedWork) { + QueuedWork queuedWork = (QueuedWork) runnable; + queuedWork.cancelHandle(); + internalHandle.addBudget(1, queuedWork.getWorkBytes()); + return Optional.of(queuedWork.getWork()); + } + // Pop and execute standard callbacks immediately on the calling thread to drain the queue + runnable.run(); + } + } + + private void decrementCounters(int elements, long bytes) { + decrementQueue.add(new Budget(elements, bytes)); boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); if (submittedToExistingBatch) { // There is already a thread about to drain the decrement queue @@ -294,12 +386,12 @@ private void decrementCounters(WorkResult result) { long bytesToDecrement = 0; int elementsToDecrement = 0; while (true) { - WorkResult pollResult = decrementQueue.poll(); + Budget pollResult = decrementQueue.poll(); if (pollResult == null) { break; } - bytesToDecrement += pollResult.bytesProcessed(); - elementsToDecrement += pollResult.itemsProcessed(); + bytesToDecrement += pollResult.bytes; + elementsToDecrement += pollResult.elements; } if (elementsToDecrement == 0) { return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 65a1325cd40e..0c3289d4cf58 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.ReaderCache; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; +import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; @@ -42,7 +43,6 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; @@ -220,7 +220,7 @@ public void scheduleWork( ExecutableWork.create( Work.create( workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock), - work -> processWork(computationState, work, getWorkStreamLatencies))); + (work, handle) -> processWork(computationState, work, getWorkStreamLatencies, handle))); } /** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */ @@ -234,16 +234,19 @@ public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { * internally if processing fails due to uncaught {@link Exception}(s). * * @implNote This will block the calling thread during execution of user DoFns. + * @param handle handled to pass to BoundedQueueExecutor.pollWork, currently unused */ - private WorkResult processWork( + private void processWork( ComputationState computationState, Work work, - ImmutableList getWorkStreamLatencies) { + ImmutableList getWorkStreamLatencies, + BoundedQueueExecutorWorkHandle handle) { work.recordGetWorkStreamLatencies(getWorkStreamLatencies); - return processWork(computationState, work); + processWork(computationState, work, handle); } - private WorkResult processWork(ComputationState computationState, Work work) { + private void processWork( + ComputationState computationState, Work work, BoundedQueueExecutorWorkHandle unusedHandle) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); @@ -260,7 +263,7 @@ private WorkResult processWork(ComputationState computationState, Work work) { outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); work.setState(Work.State.COMMIT_QUEUED); work.queueCommit(outputBuilder.build(), computationState); - return WorkResult.create(1, work.getSerializedWorkItemSize()); + return; } long processingStartTimeNanos = System.nanoTime(); @@ -286,20 +289,16 @@ private WorkResult processWork(ComputationState computationState, Work work) { work.queueCommit(validatedCommitRequest, computationState); recordProcessingStats(commitRequest, workItem, executeWorkResult); LOG.debug("Processing done for work token: {}", workItem.getWorkToken()); - return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (Throwable t) { // OutOfMemoryError that are caught will be rethrown and trigger jvm termination. try { workFailureProcessor.logAndProcessFailure( computationId, - ExecutableWork.create(work, retry -> processWork(computationState, retry)), + ExecutableWork.create(work, (retry, h) -> processWork(computationState, retry, h)), t, invalidWork -> computationState.completeWorkAndScheduleNextWorkForKey( invalidWork.getShardedKey(), invalidWork.id())); - // Failure successfully processed/invalidated/rescheduled. Return failure WorkResult to - // release budget cleanly. - return WorkResult.create(1, work.getSerializedWorkItemSize()); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t2) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 593883343bd4..4d39e5d83f66 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -106,7 +106,6 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; @@ -374,9 +373,8 @@ private static ExecutableWork createMockWork( computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)), false, Instant::now), - work -> { + (work, handle) -> { processWorkFn.accept(work); - return WorkResult.create(1, work.getSerializedWorkItemSize()); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 4942e9610fd6..0f14efdd0c0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -73,7 +73,7 @@ private static ExecutableWork createWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, Instant::now), - work -> WorkResult.create(1, work.getSerializedWorkItemSize())); + (work, handle) -> {}); } private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { @@ -85,7 +85,7 @@ private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, () -> Instant.EPOCH), - work -> WorkResult.create(1, work.getSerializedWorkItemSize())); + (work, handle) -> {}); } private static Work.ProcessingContext createWorkProcessingContext() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java index d9ad1157ee4c..30ad97140e1e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java @@ -77,7 +77,7 @@ private static ExecutableWork createWork(ShardedKey shardedKey, long workToken, mock(HeartbeatSender.class)), false, Instant::now), - work -> WorkResult.create(1, work.getSerializedWorkItemSize())); + (work, handle) -> {}); } @Before diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index bdadcd2b9e8c..577d72da9e37 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -26,13 +26,14 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.BoundedQueueExecutorWorkHandleImpl; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -86,9 +87,8 @@ private static ExecutableWork createWork(Consumer executeWorkFn) { mock(HeartbeatSender.class)), false, Instant::now), - work -> { + (work, handle) -> { executeWorkFn.accept(work); - return WorkResult.create(1, work.getSerializedWorkItemSize()); }); } @@ -387,6 +387,145 @@ public void testRunnableExceptionPropagationDecrementsCounters() throws Exceptio assertEquals(0, executor.elementsOutstanding()); } + @Test + public void testPollWorkAndInlineBatchExecution() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + 1, + DEFAULT_THREAD_EXPIRATION_SEC, + TimeUnit.SECONDS, + 10, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("testPollWorkAndInlineBatchExecution-%d") + .setDaemon(true) + .build(), + useFairMonitor); + + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = createSleepProcessWork(blockerStart, blockerStop); + + CountDownLatch start1 = new CountDownLatch(1); + CountDownLatch stop1 = new CountDownLatch(1); + ExecutableWork m1 = createSleepProcessWork(start1, stop1); + + CountDownLatch start2 = new CountDownLatch(1); + CountDownLatch stop2 = new CountDownLatch(1); + ExecutableWork m2 = createSleepProcessWork(start2, stop2); + + // 1. Occupy the single worker thread with blocker work so subsequent tasks remain queued. + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + assertEquals(1, testExecutor.elementsOutstanding()); + assertEquals(0, testExecutor.bytesOutstanding()); + + // 2. Enqueue tasks to stay in the queue. + testExecutor.execute(m1, 1000); + testExecutor.execute(m2, 2000); + + assertEquals(3, testExecutor.elementsOutstanding()); + assertEquals(3000, testExecutor.bytesOutstanding()); + + // 3. Create the batch handle. + try (BoundedQueueExecutorWorkHandleImpl batchHandle = testExecutor.createEmptyBudgetHandle()) { + // 4. Poll tasks inline. + Optional polled1 = testExecutor.pollWork(batchHandle); + assertTrue(polled1.isPresent()); + assertEquals(m1, polled1.get()); + + Optional polled2 = testExecutor.pollWork(batchHandle); + assertTrue(polled2.isPresent()); + assertEquals(m2, polled2.get()); + + // Queue should now be empty. + Optional polled3 = testExecutor.pollWork(batchHandle); + assertFalse(polled3.isPresent()); + + // 5. Run polled tasks inline. + start1.countDown(); + stop1.countDown(); + polled1.get().run(batchHandle); + + start2.countDown(); + stop2.countDown(); + polled2.get().run(batchHandle); + + // Outstanding counts should NOT yet be decremented. + assertEquals(3, testExecutor.elementsOutstanding()); + assertEquals(3000, testExecutor.bytesOutstanding()); + } + + // 6. Upon close, outstanding counts should immediately reflect the batch decrement in one shot. + // Only the blocker task (0 bytes, 1 element) should remain outstanding. + while (testExecutor.elementsOutstanding() != 1) { + Thread.sleep(10); + } + assertEquals(1, testExecutor.elementsOutstanding()); + assertEquals(0, testExecutor.bytesOutstanding()); + + // Clean up blocker. + blockerStop.countDown(); + testExecutor.shutdown(); + } + + @Test + public void testPollWorkAndInlineBatchExecutionWithException() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + 1, + DEFAULT_THREAD_EXPIRATION_SEC, + TimeUnit.SECONDS, + 10, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("testPollWorkAndInlineBatchExecutionWithException-%d") + .setDaemon(true) + .build(), + useFairMonitor); + + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = createSleepProcessWork(blockerStart, blockerStop); + + // Occupy all worker threads + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + + ExecutableWork inlineWork1 = + createWork( + ignored -> { + throw new RuntimeException("Simulated inline execution exception"); + }); + + long size1 = inlineWork1.work().getSerializedWorkItemSize(); + testExecutor.execute(inlineWork1, size1); + + long outstandingBytesBefore = testExecutor.bytesOutstanding(); + int outstandingElementsBefore = testExecutor.elementsOutstanding(); + + try { + try (BoundedQueueExecutorWorkHandleImpl batchHandle = + testExecutor.createEmptyBudgetHandle()) { + Optional polled1 = testExecutor.pollWork(batchHandle); + assertTrue(polled1.isPresent()); + polled1.get().run(batchHandle); + } + } catch (RuntimeException e) { + assertEquals("Simulated inline execution exception", e.getMessage()); + } + + // Outstanding elements must still be released cleanly by try-with-resources close! + while (testExecutor.elementsOutstanding() != outstandingElementsBefore - 1) { + Thread.sleep(10); + } + assertEquals(outstandingElementsBefore - 1, testExecutor.elementsOutstanding()); + assertEquals(outstandingBytesBefore - size1, testExecutor.bytesOutstanding()); + + blockerStop.countDown(); + testExecutor.shutdown(); + } + @Test public void testRenderSummaryHtml() { String expectedSummaryHtml = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java index 291be6f0bf9b..51bd4816b031 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java @@ -33,7 +33,6 @@ import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -99,9 +98,8 @@ private static ExecutableWork createWork(Supplier clock, Consumer mock(HeartbeatSender.class)), false, clock), - work -> { + (work, handle) -> { processWorkFn.accept(work); - return WorkResult.create(1, work.getSerializedWorkItemSize()); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java index e9dc42de8aa6..88a82c6f76b6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java @@ -46,7 +46,6 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.WorkResult; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; @@ -138,9 +137,8 @@ private ExecutableWork createOldWork( "computationId", new FakeGetDataClient(), ignored -> {}, heartbeatSender), false, ActiveWorkRefresherTest::aLongTimeAgo), - work -> { + (work, handle) -> { processWork.accept(work); - return WorkResult.create(1, work.getSerializedWorkItemSize()); }); } From 32fa605bcf57c63ec91c434afae75396555566aa Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 22 May 2026 22:44:31 +0000 Subject: [PATCH 3/4] doc fix --- .../runners/dataflow/worker/util/BoundedQueueExecutor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 386b9e0a436a..f22d1d8b570e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -348,6 +348,13 @@ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { return new BoundedQueueExecutorWorkHandleImpl(0, 0L); } + /** + * Poll additional work to be executed inline inside with the current execute(ExecutableWork work, + * long workBytes) call. It is the responsibility of the caller to execute or discard the returned + * ExecutableWork. Budget for the returned work is released when the execute() call finishes. + * + * @param handle the handle that was passed to ExecutableWork.executeWorkFn + */ public Optional pollWork(BoundedQueueExecutorWorkHandle handle) { BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; while (true) { From d6e6ff3feb18825ecde82a81a97e992db6061c94 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 22 May 2026 22:52:58 +0000 Subject: [PATCH 4/4] fix comments --- .../runners/dataflow/worker/util/BoundedQueueExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index f22d1d8b570e..c64ec9bbd568 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -268,7 +268,7 @@ public synchronized void cancel() { @Override public synchronized void close() { - Preconditions.checkArgument(!closed); + if (closed) return; closed = true; decrementCounters(this.elements, this.bytes); } @@ -356,6 +356,7 @@ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { * @param handle the handle that was passed to ExecutableWork.executeWorkFn */ public Optional pollWork(BoundedQueueExecutorWorkHandle handle) { + Preconditions.checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl); BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; while (true) { Runnable runnable = executor.getQueue().poll();