-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming] Prepare BoundedQueueExecutor for MultiKey bundles #38592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.streaming; | ||
|
|
||
| /** | ||
| * A handle to use when requesting pulling more work from @BoundedQueueExecutor | ||
| * via @BoundedQueueExecutor.pollWork | ||
| */ | ||
| public interface BoundedQueueExecutorWorkHandle {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,18 @@ | |
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; | ||
|
|
||
|
|
@@ -38,7 +43,19 @@ public class BoundedQueueExecutor { | |
|
|
||
| // Used to guard elementsOutstanding and bytesOutstanding. | ||
| private final Monitor monitor; | ||
| private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| private static class Budget { | ||
|
|
||
| final int elements; | ||
| final long bytes; | ||
|
|
||
| Budget(int elements, long bytes) { | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
| } | ||
|
|
||
| private final ConcurrentLinkedQueue<Budget> decrementQueue = new ConcurrentLinkedQueue<>(); | ||
| private final Object decrementQueueDrainLock = new Object(); | ||
| private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); | ||
| private int elementsOutstanding = 0; | ||
|
|
@@ -106,7 +123,7 @@ protected void afterExecute(Runnable r, Throwable t) { | |
|
|
||
| // Before adding a Work to the queue, check that there are enough bytes of space or no other | ||
| // outstanding elements of work. | ||
| public void execute(Runnable work, long workBytes) { | ||
| public void execute(ExecutableWork work, long workBytes) { | ||
| monitor.enterWhenUninterruptibly( | ||
| new Guard(monitor) { | ||
| @Override | ||
|
|
@@ -119,12 +136,18 @@ public boolean isSatisfied() { | |
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| // Forcibly add something to the queue, ignoring the length limit. | ||
| public void forceExecute(Runnable work, long workBytes) { | ||
| // Forcibly add ExecutableWork to the queue, ignoring the limits. | ||
| public void forceExecute(ExecutableWork work, long workBytes) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(work, workBytes); | ||
| } | ||
|
|
||
| /** Forcibly execute a Runnable callback with 0 bytes of size. */ | ||
| public void forceExecute(Runnable runnable) { | ||
| monitor.enter(); | ||
| executeMonitorHeld(runnable); | ||
| } | ||
|
|
||
| // Set the maximum/core pool size of the executor. | ||
| public synchronized void setMaximumPoolSize(int maximumPoolSize, int maximumElementsOutstanding) { | ||
| // For ThreadPoolExecutor, the maximum pool size should always greater than or equal to core | ||
|
|
@@ -221,8 +244,87 @@ public String summaryHtml() { | |
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work, long workBytes) { | ||
| class BoundedQueueExecutorWorkHandleImpl | ||
| implements BoundedQueueExecutorWorkHandle, AutoCloseable { | ||
|
|
||
| private int elements; | ||
| private long bytes; | ||
| private boolean closed = false; | ||
|
|
||
| private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) { | ||
| this.elements = elements; | ||
| this.bytes = bytes; | ||
| } | ||
|
|
||
| public synchronized void addBudget(int elements, long bytes) { | ||
| Preconditions.checkState(!closed, "Cannot add budget to a closed WorkBudgetHandle"); | ||
| this.elements += elements; | ||
| this.bytes += bytes; | ||
| } | ||
|
|
||
| public synchronized void cancel() { | ||
| this.closed = true; | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized void close() { | ||
| if (closed) return; | ||
| closed = true; | ||
| decrementCounters(this.elements, this.bytes); | ||
| } | ||
|
arunpandianp marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private static class QueuedWork implements Runnable { | ||
|
|
||
| private final ExecutableWork work; | ||
| private final BoundedQueueExecutorWorkHandleImpl handle; | ||
| private final long workBytes; | ||
|
|
||
| public QueuedWork( | ||
| ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long workBytes) { | ||
| this.work = work; | ||
| this.handle = handle; | ||
| this.workBytes = workBytes; | ||
| } | ||
|
|
||
| public void cancelHandle() { | ||
| handle.cancel(); | ||
| } | ||
|
|
||
| public ExecutableWork getWork() { | ||
| return work; | ||
| } | ||
|
|
||
| public long getWorkBytes() { | ||
| return workBytes; | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| Preconditions.checkArgument(!handle.closed); | ||
| try { | ||
| work.run(handle); | ||
| } finally { | ||
| handle.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(ExecutableWork work, long workBytes) { | ||
| ++elementsOutstanding; | ||
| bytesOutstanding += workBytes; | ||
| monitor.leave(); | ||
| BoundedQueueExecutorWorkHandleImpl handle = | ||
| new BoundedQueueExecutorWorkHandleImpl(1, workBytes); | ||
| try { | ||
| executor.execute(new QueuedWork(work, handle, workBytes)); | ||
| } catch (Throwable e) { | ||
| handle.close(); | ||
| throw ExceptionUtils.propagate(e); | ||
| } | ||
| } | ||
|
|
||
| private void executeMonitorHeld(Runnable work) { | ||
| ++elementsOutstanding; | ||
| monitor.leave(); | ||
|
|
||
|
|
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long workBytes) { | |
| try { | ||
| work.run(); | ||
| } finally { | ||
| decrementCounters(workBytes); | ||
| decrementCounters(1, 0L); | ||
| } | ||
| }); | ||
| } catch (RuntimeException e) { | ||
| // If the execute() call threw an exception, decrement counters here. | ||
| decrementCounters(workBytes); | ||
| throw e; | ||
| } catch (Throwable e) { | ||
| decrementCounters(1, 0L); | ||
| throw ExceptionUtils.propagate(e); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() { | ||
| return new BoundedQueueExecutorWorkHandleImpl(0, 0L); | ||
| } | ||
|
|
||
| /** | ||
| * Poll additional work to be executed inline inside with the current execute(ExecutableWork work, | ||
| * long workBytes) call. It is the responsibility of the caller to execute or discard the returned | ||
| * ExecutableWork. Budget for the returned work is released when the execute() call finishes. | ||
| * | ||
| * @param handle the handle that was passed to ExecutableWork.executeWorkFn | ||
| */ | ||
| public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle handle) { | ||
| Preconditions.checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl); | ||
| BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle; | ||
|
arunpandianp marked this conversation as resolved.
arunpandianp marked this conversation as resolved.
|
||
| while (true) { | ||
| Runnable runnable = executor.getQueue().poll(); | ||
| if (runnable == null) { | ||
| return Optional.empty(); | ||
| } | ||
| if (runnable instanceof QueuedWork) { | ||
| QueuedWork queuedWork = (QueuedWork) runnable; | ||
| queuedWork.cancelHandle(); | ||
| internalHandle.addBudget(1, queuedWork.getWorkBytes()); | ||
| return Optional.of(queuedWork.getWork()); | ||
| } | ||
| // Pop and execute standard callbacks immediately on the calling thread to drain the queue | ||
| runnable.run(); | ||
| } | ||
|
Comment on lines
+361
to
374
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The getter is unused now. Planning to improve the logic before using it. This change is just setting up the Handles and getters. |
||
| } | ||
|
|
||
| private void decrementCounters(long workBytes) { | ||
| // All threads queue decrements and one thread grabs the monitor and updates | ||
| // counters. We do this to reduce contention on monitor which is locked by | ||
| // GetWork thread | ||
| decrementQueue.add(workBytes); | ||
| private void decrementCounters(int elements, long bytes) { | ||
| decrementQueue.add(new Budget(elements, bytes)); | ||
| boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); | ||
| if (submittedToExistingBatch) { | ||
| // There is already a thread about to drain the decrement queue | ||
|
|
@@ -265,12 +394,12 @@ private void decrementCounters(long workBytes) { | |
| long bytesToDecrement = 0; | ||
| int elementsToDecrement = 0; | ||
| while (true) { | ||
| Long pollResult = decrementQueue.poll(); | ||
| Budget pollResult = decrementQueue.poll(); | ||
| if (pollResult == null) { | ||
| break; | ||
| } | ||
| bytesToDecrement += pollResult; | ||
| ++elementsToDecrement; | ||
| bytesToDecrement += pollResult.bytes; | ||
| elementsToDecrement += pollResult.elements; | ||
| } | ||
| if (elementsToDecrement == 0) { | ||
| return; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import org.apache.beam.sdk.annotations.Internal; | ||
|
|
||
| /** Utility methods for simplifying work with exceptions and throwables. */ | ||
| @Internal | ||
| public final class ExceptionUtils { | ||
|
|
||
| private ExceptionUtils() {} | ||
|
|
||
| /** | ||
| * Propagates {@code throwable} as-is if it is an instance of {@link RuntimeException} or {@link | ||
| * Error}, or else as a last resort wraps it in a {@code RuntimeException} and then propagates. | ||
| */ | ||
| public static RuntimeException propagate(Throwable throwable) { | ||
| if (throwable instanceof RuntimeException) { | ||
| throw (RuntimeException) throwable; | ||
| } else if (throwable instanceof Error) { | ||
| throw (Error) throwable; | ||
| } else { | ||
| throw new RuntimeException(throwable); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.