diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java new file mode 100644 index 000000000000..1ca534966947 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/BoundedQueueExecutorWorkHandle.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +/** + * A handle to use when requesting pulling more work from @BoundedQueueExecutor + * via @BoundedQueueExecutor.pollWork + */ +public interface BoundedQueueExecutorWorkHandle {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java index db279f066630..161c16106373 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java @@ -18,24 +18,29 @@ package org.apache.beam.runners.dataflow.worker.streaming; import com.google.auto.value.AutoValue; -import java.util.function.Consumer; +import java.util.function.BiConsumer; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; /** {@link Work} instance and a processing function used to process the work. */ @AutoValue -public abstract class ExecutableWork implements Runnable { +public abstract class ExecutableWork { - public static ExecutableWork create(Work work, Consumer executeWorkFn) { + public static ExecutableWork create( + Work work, BiConsumer executeWorkFn) { return new AutoValue_ExecutableWork(work, executeWorkFn); } public abstract Work work(); - public abstract Consumer executeWorkFn(); + public abstract BiConsumer executeWorkFn(); - @Override - public void run() { - executeWorkFn().accept(work()); + public void run(BoundedQueueExecutorWorkHandle handle) { + try { + executeWorkFn().accept(work(), handle); + } catch (Throwable t) { + throw ExceptionUtils.propagate(t); + } } public final WorkId id() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9079c3cc69b8..c64ec9bbd568 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -24,6 +25,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -38,7 +43,19 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor; - private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); + + private static class Budget { + + final int elements; + final long bytes; + + Budget(int elements, long bytes) { + this.elements = elements; + this.bytes = bytes; + } + } + + private final ConcurrentLinkedQueue decrementQueue = new ConcurrentLinkedQueue<>(); private final Object decrementQueueDrainLock = new Object(); private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; @@ -106,7 +123,7 @@ protected void afterExecute(Runnable r, Throwable t) { // Before adding a Work to the queue, check that there are enough bytes of space or no other // outstanding elements of work. - public void execute(Runnable work, long workBytes) { + public void execute(ExecutableWork work, long workBytes) { monitor.enterWhenUninterruptibly( new Guard(monitor) { @Override @@ -119,12 +136,18 @@ public boolean isSatisfied() { executeMonitorHeld(work, workBytes); } - // Forcibly add something to the queue, ignoring the length limit. - public void forceExecute(Runnable work, long workBytes) { + // Forcibly add ExecutableWork to the queue, ignoring the limits. + public void forceExecute(ExecutableWork work, long workBytes) { monitor.enter(); executeMonitorHeld(work, workBytes); } + /** Forcibly execute a Runnable callback with 0 bytes of size. */ + public void forceExecute(Runnable runnable) { + monitor.enter(); + executeMonitorHeld(runnable); + } + // Set the maximum/core pool size of the executor. public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core @@ -221,8 +244,87 @@ public String summaryHtml() { } } - private void executeMonitorHeld(Runnable work, long workBytes) { + class BoundedQueueExecutorWorkHandleImpl + implements BoundedQueueExecutorWorkHandle, AutoCloseable { + + private int elements; + private long bytes; + private boolean closed = false; + + private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) { + this.elements = elements; + this.bytes = bytes; + } + + public synchronized void addBudget(int elements, long bytes) { + Preconditions.checkState(!closed, "Cannot add budget to a closed WorkBudgetHandle"); + this.elements += elements; + this.bytes += bytes; + } + + public synchronized void cancel() { + this.closed = true; + } + + @Override + public synchronized void close() { + if (closed) return; + closed = true; + decrementCounters(this.elements, this.bytes); + } + } + + private static class QueuedWork implements Runnable { + + private final ExecutableWork work; + private final BoundedQueueExecutorWorkHandleImpl handle; + private final long workBytes; + + public QueuedWork( + ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long workBytes) { + this.work = work; + this.handle = handle; + this.workBytes = workBytes; + } + + public void cancelHandle() { + handle.cancel(); + } + + public ExecutableWork getWork() { + return work; + } + + public long getWorkBytes() { + return workBytes; + } + + @Override + public void run() { + Preconditions.checkArgument(!handle.closed); + try { + work.run(handle); + } finally { + handle.close(); + } + } + } + + private void executeMonitorHeld(ExecutableWork work, long workBytes) { + ++elementsOutstanding; bytesOutstanding += workBytes; + monitor.leave(); + BoundedQueueExecutorWorkHandleImpl handle = + new BoundedQueueExecutorWorkHandleImpl(1, workBytes); + try { + executor.execute(new QueuedWork(work, handle, workBytes)); + } catch (Throwable e) { + handle.close(); + throw ExceptionUtils.propagate(e); + } + } + + private void executeMonitorHeld(Runnable work) { ++elementsOutstanding; monitor.leave(); @@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long workBytes) { try { work.run(); } finally { - decrementCounters(workBytes); + decrementCounters(1, 0L); } }); - } catch (RuntimeException e) { - // If the execute() call threw an exception, decrement counters here. - decrementCounters(workBytes); - throw e; + } catch (Throwable e) { + decrementCounters(1, 0L); + throw ExceptionUtils.propagate(e); + } + } + + @VisibleForTesting + BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { + return new BoundedQueueExecutorWorkHandleImpl(0, 0L); + } + + /** + * Poll additional work to be executed inline inside with the current execute(ExecutableWork work, + * long workBytes) call. It is the responsibility of the caller to execute or discard the returned + * ExecutableWork. Budget for the returned work is released when the execute() call finishes. + * + * @param handle the handle that was passed to ExecutableWork.executeWorkFn + */ + public Optional pollWork(BoundedQueueExecutorWorkHandle handle) { + Preconditions.checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl); + BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; + while (true) { + Runnable runnable = executor.getQueue().poll(); + if (runnable == null) { + return Optional.empty(); + } + if (runnable instanceof QueuedWork) { + QueuedWork queuedWork = (QueuedWork) runnable; + queuedWork.cancelHandle(); + internalHandle.addBudget(1, queuedWork.getWorkBytes()); + return Optional.of(queuedWork.getWork()); + } + // Pop and execute standard callbacks immediately on the calling thread to drain the queue + runnable.run(); } } - private void decrementCounters(long workBytes) { - // All threads queue decrements and one thread grabs the monitor and updates - // counters. We do this to reduce contention on monitor which is locked by - // GetWork thread - decrementQueue.add(workBytes); + private void decrementCounters(int elements, long bytes) { + decrementQueue.add(new Budget(elements, bytes)); boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); if (submittedToExistingBatch) { // There is already a thread about to drain the decrement queue @@ -265,12 +394,12 @@ private void decrementCounters(long workBytes) { long bytesToDecrement = 0; int elementsToDecrement = 0; while (true) { - Long pollResult = decrementQueue.poll(); + Budget pollResult = decrementQueue.poll(); if (pollResult == null) { break; } - bytesToDecrement += pollResult; - ++elementsToDecrement; + bytesToDecrement += pollResult.bytes; + elementsToDecrement += pollResult.elements; } if (elementsToDecrement == 0) { return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java new file mode 100644 index 000000000000..4bbdbb3216ca --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ExceptionUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import org.apache.beam.sdk.annotations.Internal; + +/** Utility methods for simplifying work with exceptions and throwables. */ +@Internal +public final class ExceptionUtils { + + private ExceptionUtils() {} + + /** + * Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link + * Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates. + */ + public static RuntimeException propagate(Throwable throwable) { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } else if (throwable instanceof Error) { + throw (Error) throwable; + } else { + throw new RuntimeException(throwable); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 5a66545ab335..22573bf1ced2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -156,7 +156,7 @@ public void finalizeCommits(Iterable finalizeIds) { } for (Runnable callback : callbacksToExecute) { try { - finalizationExecutor.forceExecute(callback, 0); + finalizationExecutor.forceExecute(callback); } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 1428037d9ca0..0c3289d4cf58 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.ReaderCache; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; +import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; @@ -47,6 +48,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; @@ -218,7 +220,7 @@ public void scheduleWork( ExecutableWork.create( Work.create( workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock), - work -> processWork(computationState, work, getWorkStreamLatencies))); + (work, handle) -> processWork(computationState, work, getWorkStreamLatencies, handle))); } /** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */ @@ -232,16 +234,19 @@ public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { * internally if processing fails due to uncaught {@link Exception}(s). * * @implNote This will block the calling thread during execution of user DoFns. + * @param handle handled to pass to BoundedQueueExecutor.pollWork, currently unused */ private void processWork( ComputationState computationState, Work work, - ImmutableList getWorkStreamLatencies) { + ImmutableList getWorkStreamLatencies, + BoundedQueueExecutorWorkHandle handle) { work.recordGetWorkStreamLatencies(getWorkStreamLatencies); - processWork(computationState, work); + processWork(computationState, work, handle); } - private void processWork(ComputationState computationState, Work work) { + private void processWork( + ComputationState computationState, Work work, BoundedQueueExecutorWorkHandle unusedHandle) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); @@ -289,7 +294,7 @@ private void processWork(ComputationState computationState, Work work) { try { workFailureProcessor.logAndProcessFailure( computationId, - ExecutableWork.create(work, retry -> processWork(computationState, retry)), + ExecutableWork.create(work, (retry, h) -> processWork(computationState, retry, h)), t, invalidWork -> computationState.completeWorkAndScheduleNextWorkForKey( @@ -297,7 +302,8 @@ private void processWork(ComputationState computationState, Work work) { } catch (OutOfMemoryError oom) { throw oom; } catch (Throwable t2) { - throw new RuntimeException(t2); + LOG.warn("Failed to process work failure safely for work {}", work.id(), t2); + throw ExceptionUtils.propagate(t2); } } finally { // Update total processing time counters. Updating in finally clause ensures that diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d8a1d1b90d47..4d39e5d83f66 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -373,7 +373,9 @@ private static ExecutableWork createMockWork( computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)), false, Instant::now), - processWorkFn); + (work, handle) -> { + processWorkFn.accept(work); + }); } private byte[] intervalWindowBytes(IntervalWindow window) throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 865ae2612803..0f14efdd0c0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -73,7 +73,7 @@ private static ExecutableWork createWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, Instant::now), - ignored -> {}); + (work, handle) -> {}); } private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { @@ -85,7 +85,7 @@ private static ExecutableWork expiredWork(Windmill.WorkItem workItem) { createWorkProcessingContext(), false, () -> Instant.EPOCH), - ignored -> {}); + (work, handle) -> {}); } private static Work.ProcessingContext createWorkProcessingContext() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java index 1c8b8fca131d..30ad97140e1e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java @@ -77,7 +77,7 @@ private static ExecutableWork createWork(ShardedKey shardedKey, long workToken, mock(HeartbeatSender.class)), false, Instant::now), - ignored -> {}); + (work, handle) -> {}); } @Before diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index d7ea039bb809..577d72da9e37 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -26,12 +26,14 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.BoundedQueueExecutorWorkHandleImpl; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; @@ -85,18 +87,21 @@ private static ExecutableWork createWork(Consumer executeWorkFn) { mock(HeartbeatSender.class)), false, Instant::now), - executeWorkFn); + (work, handle) -> { + executeWorkFn.accept(work); + }); } - private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) { - return () -> { - start.countDown(); - try { - stop.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + private ExecutableWork createSleepProcessWork(CountDownLatch start, CountDownLatch stop) { + return createWork( + ignored -> { + start.countDown(); + try { + stop.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Before @@ -123,9 +128,9 @@ public void testScheduleWorkWhenExceedMaximumPoolSize() throws Exception { CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); executor.execute(m1, 1); processStart1.await(); @@ -152,8 +157,8 @@ public void testScheduleWorkWhenExceedMaximumBytesOutstanding() throws Exception CountDownLatch processStop1 = new CountDownLatch(1); CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStop2 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); executor.execute(m1, 10000000); processStart1.await(); @@ -187,9 +192,9 @@ public void testOverrideMaximumPoolSize() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -225,9 +230,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception { CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -264,9 +269,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncrease CountDownLatch processStart2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch stop = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, stop); - Runnable m2 = createSleepProcessWorkFn(processStart2, stop); - Runnable m3 = createSleepProcessWorkFn(processStart3, stop); + ExecutableWork m1 = createSleepProcessWork(processStart1, stop); + ExecutableWork m2 = createSleepProcessWork(processStart2, stop); + ExecutableWork m3 = createSleepProcessWork(processStart3, stop); // Initial state. assertEquals(0, executor.activeCount()); @@ -308,9 +313,9 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( CountDownLatch processStop2 = new CountDownLatch(1); CountDownLatch processStart3 = new CountDownLatch(1); CountDownLatch processStop3 = new CountDownLatch(1); - Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1); - Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2); - Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3); + ExecutableWork m1 = createSleepProcessWork(processStart1, processStop1); + ExecutableWork m2 = createSleepProcessWork(processStart2, processStop2); + ExecutableWork m3 = createSleepProcessWork(processStart3, processStop3); // Initial state. assertEquals(0, executor.activeCount()); @@ -351,6 +356,176 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced( executor.shutdown(); } + @Test + public void testRunnableExceptionPropagationDecrementsCounters() throws Exception { + CountDownLatch processStart = new CountDownLatch(1); + CountDownLatch processStop = new CountDownLatch(1); + + Runnable work = + () -> { + processStart.countDown(); + try { + processStop.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("Simulated finalizer processing exception"); + }; + + executor.forceExecute(work); + processStart.await(); + + assertEquals(1, executor.elementsOutstanding()); + + processStop.countDown(); + + // Wait until outstanding elements are released + while (executor.elementsOutstanding() != 0) { + Thread.sleep(10); + } + + assertEquals(0, executor.elementsOutstanding()); + } + + @Test + public void testPollWorkAndInlineBatchExecution() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + 1, + DEFAULT_THREAD_EXPIRATION_SEC, + TimeUnit.SECONDS, + 10, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("testPollWorkAndInlineBatchExecution-%d") + .setDaemon(true) + .build(), + useFairMonitor); + + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = createSleepProcessWork(blockerStart, blockerStop); + + CountDownLatch start1 = new CountDownLatch(1); + CountDownLatch stop1 = new CountDownLatch(1); + ExecutableWork m1 = createSleepProcessWork(start1, stop1); + + CountDownLatch start2 = new CountDownLatch(1); + CountDownLatch stop2 = new CountDownLatch(1); + ExecutableWork m2 = createSleepProcessWork(start2, stop2); + + // 1. Occupy the single worker thread with blocker work so subsequent tasks remain queued. + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + assertEquals(1, testExecutor.elementsOutstanding()); + assertEquals(0, testExecutor.bytesOutstanding()); + + // 2. Enqueue tasks to stay in the queue. + testExecutor.execute(m1, 1000); + testExecutor.execute(m2, 2000); + + assertEquals(3, testExecutor.elementsOutstanding()); + assertEquals(3000, testExecutor.bytesOutstanding()); + + // 3. Create the batch handle. + try (BoundedQueueExecutorWorkHandleImpl batchHandle = testExecutor.createEmptyBudgetHandle()) { + // 4. Poll tasks inline. + Optional polled1 = testExecutor.pollWork(batchHandle); + assertTrue(polled1.isPresent()); + assertEquals(m1, polled1.get()); + + Optional polled2 = testExecutor.pollWork(batchHandle); + assertTrue(polled2.isPresent()); + assertEquals(m2, polled2.get()); + + // Queue should now be empty. + Optional polled3 = testExecutor.pollWork(batchHandle); + assertFalse(polled3.isPresent()); + + // 5. Run polled tasks inline. + start1.countDown(); + stop1.countDown(); + polled1.get().run(batchHandle); + + start2.countDown(); + stop2.countDown(); + polled2.get().run(batchHandle); + + // Outstanding counts should NOT yet be decremented. + assertEquals(3, testExecutor.elementsOutstanding()); + assertEquals(3000, testExecutor.bytesOutstanding()); + } + + // 6. Upon close, outstanding counts should immediately reflect the batch decrement in one shot. + // Only the blocker task (0 bytes, 1 element) should remain outstanding. + while (testExecutor.elementsOutstanding() != 1) { + Thread.sleep(10); + } + assertEquals(1, testExecutor.elementsOutstanding()); + assertEquals(0, testExecutor.bytesOutstanding()); + + // Clean up blocker. + blockerStop.countDown(); + testExecutor.shutdown(); + } + + @Test + public void testPollWorkAndInlineBatchExecutionWithException() throws Exception { + BoundedQueueExecutor testExecutor = + new BoundedQueueExecutor( + 1, + DEFAULT_THREAD_EXPIRATION_SEC, + TimeUnit.SECONDS, + 10, + MAXIMUM_BYTES_OUTSTANDING, + new ThreadFactoryBuilder() + .setNameFormat("testPollWorkAndInlineBatchExecutionWithException-%d") + .setDaemon(true) + .build(), + useFairMonitor); + + CountDownLatch blockerStart = new CountDownLatch(1); + CountDownLatch blockerStop = new CountDownLatch(1); + ExecutableWork blockerWork = createSleepProcessWork(blockerStart, blockerStop); + + // Occupy all worker threads + testExecutor.execute(blockerWork, 0); + blockerStart.await(); + + ExecutableWork inlineWork1 = + createWork( + ignored -> { + throw new RuntimeException("Simulated inline execution exception"); + }); + + long size1 = inlineWork1.work().getSerializedWorkItemSize(); + testExecutor.execute(inlineWork1, size1); + + long outstandingBytesBefore = testExecutor.bytesOutstanding(); + int outstandingElementsBefore = testExecutor.elementsOutstanding(); + + try { + try (BoundedQueueExecutorWorkHandleImpl batchHandle = + testExecutor.createEmptyBudgetHandle()) { + Optional polled1 = testExecutor.pollWork(batchHandle); + assertTrue(polled1.isPresent()); + polled1.get().run(batchHandle); + } + } catch (RuntimeException e) { + assertEquals("Simulated inline execution exception", e.getMessage()); + } + + // Outstanding elements must still be released cleanly by try-with-resources close! + while (testExecutor.elementsOutstanding() != outstandingElementsBefore - 1) { + Thread.sleep(10); + } + assertEquals(outstandingElementsBefore - 1, testExecutor.elementsOutstanding()); + assertEquals(outstandingBytesBefore - size1, testExecutor.bytesOutstanding()); + + blockerStop.countDown(); + testExecutor.shutdown(); + } + @Test public void testRenderSummaryHtml() { String expectedSummaryHtml = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java index 68a11895fa12..51bd4816b031 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java @@ -98,7 +98,9 @@ private static ExecutableWork createWork(Supplier clock, Consumer mock(HeartbeatSender.class)), false, clock), - processWorkFn); + (work, handle) -> { + processWorkFn.accept(work); + }); } private static ExecutableWork createWork(Consumer processWorkFn) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java index 054db878c869..88a82c6f76b6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java @@ -137,7 +137,9 @@ private ExecutableWork createOldWork( "computationId", new FakeGetDataClient(), ignored -> {}, heartbeatSender), false, ActiveWorkRefresherTest::aLongTimeAgo), - processWork); + (work, handle) -> { + processWork.accept(work); + }); } @Test