Hide WorkerThread implementation details#6362
Conversation
|
| Filename | Overview |
|---|---|
| dali/pipeline/util/worker_thread.h | Reduced to a pure abstract interface + factory declaration; all private state, mutex, queue, thread, and NVML members removed; copyright year updated to 2026. |
| dali/pipeline/util/worker_thread_internal.h | New internal header declaring WorkerThreadImpl (final, DLL_PUBLIC) and detail::Barrier; NVML header guarded inside NVML_ENABLED; all members correctly use trailing-underscore convention. |
| dali/pipeline/util/worker_thread.cc | New implementation file; CheckForErrors() now correctly throws queued worker failures instead of silently discarding them; NVML handle initialized only when set_affinity=true. |
| dali/pipeline/util/worker_thread_test.cc | Tests migrated to factory pattern and CPU_ONLY_DEVICE_ID sentinel; new CheckForErrorsErrorHandling test validates that queued errors are now surfaced via throw. |
| dali/pipeline/executor/async_pipelined_executor.h | Switches to WorkerThreadImpl direct storage; CheckForErrors() now properly sets exec_error_, calls SignalStop(), and notifies mixed/gpu condition variables before rethrowing. |
| dali/pipeline/executor/async_separated_pipelined_executor.h | Parallel change to async_pipelined_executor.h; CheckForErrors() uses exec_error_+SignalStop() only (no mixed/gpu CVs — correctly absent from this class); WorkerThreadImpl stored directly. |
| dali/pipeline/operator/builtin/input_operator.h | sync_worker_ moved from WorkerThread value-member to unique_ptr initialized via CreateWorkerThread factory; all access sites updated to arrow syntax. |
| dali/operators/video/legacy/reader/video_loader.h | thread_file_reader_ migrated from WorkerThread value to unique_ptr; destructor continues to call ForceStop()+Shutdown() before unique_ptr teardown, preserving correct cleanup order. |
Sequence Diagram
sequenceDiagram
participant Caller as RunCPU / RunMixed / RunGPU
participant Exec as AsyncPipelinedExecutor
participant WTImpl as WorkerThreadImpl
participant Thread as Worker Thread
Caller->>Exec: CheckForErrors()
Exec->>WTImpl: cpu_thread_.CheckForErrors()
alt errors_ non-empty
WTImpl-->>WTImpl: "pop error, running_=false, notify cv_"
WTImpl--xExec: throw runtime_error
Exec-->>Exec: "exec_error_=true, SignalStop()"
Exec-->>Thread: mixed_work_cv_.notify_all() / gpu_work_cv_.notify_all()
Exec--xCaller: rethrow
else no errors
WTImpl-->>Exec: (no-op)
Exec->>WTImpl: mixed/gpu_thread_.CheckForErrors()
WTImpl-->>Exec: (no-op)
Exec-->>Caller: return
end
Caller->>WTImpl: DoWork(lambda)
WTImpl-->>Thread: wake via cv_.notify_one()
Thread->>Thread: execute lambda
alt success
Thread-->>WTImpl: "work_complete_=true, completed_.notify_one()"
else exception
Thread-->>WTImpl: "errors_.push(), running_=false, completed_.notify_one()"
end
Reviews (12): Last reviewed commit: "Expose WorkerThreadImpl to async executo..." | Re-trigger Greptile
75c1662 to
738ce61
Compare
|
@greptile review |
|
!build |
|
CI MESSAGE: [52151479]: BUILD STARTED |
|
CI MESSAGE: [52151479]: BUILD FAILED |
738ce61 to
b1db590
Compare
|
!build |
|
CI MESSAGE: [52186371]: BUILD STARTED |
b1db590 to
85d1da9
Compare
|
CI MESSAGE: [52186371]: BUILD PASSED |
70e1930 to
663957b
Compare
|
@greptile review |
5fbb2bd to
aa1fc18
Compare
|
!build |
| } | ||
|
|
||
| WorkerThread cpu_thread_, mixed_thread_, gpu_thread_; | ||
| std::unique_ptr<WorkerThread> cpu_thread_, mixed_thread_, gpu_thread_; |
There was a problem hiding this comment.
The particular executor implementations are not a part of a public interface and therefore could use the implementation directly.
There was a problem hiding this comment.
Now the internal are moved to cc file, would you like to have them in internal header which can be included directly here?
There was a problem hiding this comment.
Reworked, please check again.
|
CI MESSAGE: [52208284]: BUILD STARTED |
Make WorkerThread a virtual interface and add a factory that returns the private final implementation. Update InputOperator, async pipelined executors, and the legacy video loader to store WorkerThread through the interface, keeping private worker state out of public and template-visible types. Keep OldThreadPool and NewThreadPool as direct concrete implementations instead of adding pImpl storage inside the thread pools. ThreadPool is already the virtual interface used by operators, so the concrete pool classes keep direct access to their synchronization, queue, thread, and NVML-dependent storage. Make WorkerThread::CheckForErrors throw queued worker failures instead of discarding them. Async pipelined executors route those failures through their stop-signaling paths before rethrowing, so dependent stages are woken consistently. Avoid initializing NVML when worker CPU affinity is not requested. Keep WorkerThread GTests CPU-only so they validate thread/error handling without requiring a loaded CUDA driver. Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
aa1fc18 to
dfaf9ba
Compare
Move the private WorkerThread implementation declaration into an internal header so async executor classes can store WorkerThreadImpl directly. Keep the public WorkerThread factory for external users while avoiding heap indirection in executor worker storage. Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
bd37af0 to
72835f4
Compare
Hide WorkerThread implementation details
Makes WorkerThread a virtual interface with a factory that returns the private
implementation.
Moves WorkerThreadImpl and its helper state into an internal header so async
pipelined executors can include it and store WorkerThreadImpl directly,
without exposing the implementation through the public WorkerThread API.
Updates public/template-visible users such as InputOperator and the legacy
video loader to store WorkerThread through the interface and factory, keeping
private WorkerThread layout out of external operator plugin headers.
Keeps OldThreadPool and NewThreadPool as direct concrete implementations.
ThreadPool and ThreadPoolBase already provide the virtual interface used by
operators, so the concrete pools do not add internal pImpl storage.
Makes WorkerThread::CheckForErrors surface queued worker failures and routes
async executor worker errors through their stop-signaling paths.
Category:
Refactoring (Redesign of existing code that doesn't affect functionality)
Description:
This PR moves WorkerThread's private state out of the public WorkerThread header
behind a stable interface and factory.
WorkerThread is used by public/template-visible DALI types. Keeping its private
state in public headers makes external operator plugins depend on DALI-private
build flags, including NVML_ENABLED, because those flags can change class
layout.
CreateWorkerThread() now returns the private WorkerThread implementation, and
the public WorkerThread header exposes only the interface. Public-header users
such as InputOperator and the legacy video loader store std::unique_ptr.
Async pipelined executors are built inside DALI and need direct concrete storage,
so they include worker_thread_internal.h and store WorkerThreadImpl members
directly.
OldThreadPool and NewThreadPool intentionally remain direct concrete
implementations. Operators already use the ThreadPool/ThreadPoolBase virtual
interfaces, so adding pImpl inside the concrete thread pools would add an extra
pointer chase without improving that boundary.
WorkerThread::CheckForErrors() now throws queued worker failures instead of
discarding them. The async pipelined executors catch those failures, mark the
executor as failed, signal stop, notify dependent stage condition variables
where needed, and then rethrow.
WorkerThread also avoids initializing NVML unless CPU affinity setup is
requested, and its GTests use the CPU-only device sentinel so they validate
thread/error handling without requiring a loaded CUDA driver.
Additional information:
Affected modules and functionalities:
Key points relevant for the review:
NVML-dependent members.
std::unique_ptr.
use internal pImpl.
nvml.h, or <condition_variable> directly.
executor stop path before being rethrown.
Tests:
Focused validation:
Issue #6361 repro validation:
observed after refreshing headers and rebuilding backend_impl.
custom fixed NextBatchSize()/no-op Advance() and default prefetching, it can
report input depletion after consuming the queued batch. A queue-aware variant
using InputOperator's base NextBatchSize()/Advance() and prefetch_queue_depth=1
passed two feed_input + run iterations.
Checklist
Documentation
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: N/A