Skip to content

Hide WorkerThread implementation details#6362

Open
JanuszL wants to merge 2 commits into
NVIDIA:mainfrom
JanuszL:fix-thread-pool-private-impl
Open

Hide WorkerThread implementation details#6362
JanuszL wants to merge 2 commits into
NVIDIA:mainfrom
JanuszL:fix-thread-pool-private-impl

Conversation

@JanuszL
Copy link
Copy Markdown
Contributor

@JanuszL JanuszL commented May 21, 2026

Hide WorkerThread implementation details

  • Makes WorkerThread a virtual interface with a factory that returns the private
    implementation.

  • Moves WorkerThreadImpl and its helper state into an internal header so async
    pipelined executors can include it and store WorkerThreadImpl directly,
    without exposing the implementation through the public WorkerThread API.

  • Updates public/template-visible users such as InputOperator and the legacy
    video loader to store WorkerThread through the interface and factory, keeping
    private WorkerThread layout out of external operator plugin headers.

  • Keeps OldThreadPool and NewThreadPool as direct concrete implementations.
    ThreadPool and ThreadPoolBase already provide the virtual interface used by
    operators, so the concrete pools do not add internal pImpl storage.

  • Makes WorkerThread::CheckForErrors surface queued worker failures and routes
    async executor worker errors through their stop-signaling paths.

Category:

Refactoring (Redesign of existing code that doesn't affect functionality)

Description:

This PR moves WorkerThread's private state out of the public WorkerThread header
behind a stable interface and factory.

WorkerThread is used by public/template-visible DALI types. Keeping its private
state in public headers makes external operator plugins depend on DALI-private
build flags, including NVML_ENABLED, because those flags can change class
layout.

CreateWorkerThread() now returns the private WorkerThread implementation, and
the public WorkerThread header exposes only the interface. Public-header users
such as InputOperator and the legacy video loader store std::unique_ptr.
Async pipelined executors are built inside DALI and need direct concrete storage,
so they include worker_thread_internal.h and store WorkerThreadImpl members
directly.

OldThreadPool and NewThreadPool intentionally remain direct concrete
implementations. Operators already use the ThreadPool/ThreadPoolBase virtual
interfaces, so adding pImpl inside the concrete thread pools would add an extra
pointer chase without improving that boundary.

WorkerThread::CheckForErrors() now throws queued worker failures instead of
discarding them. The async pipelined executors catch those failures, mark the
executor as failed, signal stop, notify dependent stage condition variables
where needed, and then rethrow.

WorkerThread also avoids initializing NVML unless CPU affinity setup is
requested, and its GTests use the CPU-only device sentinel so they validate
thread/error handling without requiring a loaded CUDA driver.

Additional information:

Affected modules and functionalities:

  • dali/pipeline/util/worker_thread.*
  • dali/pipeline/util/worker_thread_internal.h
  • dali/pipeline/util/thread_pool.*
  • dali/pipeline/util/new_thread_pool.*
  • Async pipelined executor worker storage and worker-error propagation
  • InputOperator and legacy video-loader worker storage
  • Direct include cleanup in input operator, C API, and video reader code

Key points relevant for the review:

  • WorkerThread public headers no longer include NVML headers or expose
    NVML-dependent members.
  • WorkerThreadImpl is final and declared in an internal DALI header.
  • Public-header users depend on the WorkerThread interface and factory.
  • Async pipelined executors store WorkerThreadImpl directly rather than through
    std::unique_ptr.
  • OldThreadPool and NewThreadPool keep direct implementation storage and do not
    use internal pImpl.
  • Call sites that previously relied on transitive includes now include nvtx.h,
    nvml.h, or <condition_variable> directly.
  • Worker-thread failures surfaced by CheckForErrors() now trigger the async
    executor stop path before being rethrown.

Tests:

  • Existing tests apply
  • New tests added
    • Python tests
    • GTests
    • Benchmark
    • Other
  • N/A

Focused validation:

cmake --build compile/dali --target dali_test -j$(nproc)
compile/dali/dali/python/nvidia/dali/test/dali_test.bin --gtest_filter=ThreadPool.*:WorkerThread.*

cmake --build compile/dali --target pipeline/executor/async_pipelined_executor.o
cmake --build compile/dali --target pipeline/executor/async_separated_pipelined_executor.o
cmake --build compile/dali --target dali
cmake --build compile/dali --target install_headers
cmake --build compile --target dali_python_3.12
compile/dali/python/nvidia/dali/test/dali_test.bin --gtest_filter=WorkerThread.*
compile/dali/python/nvidia/dali/test/dali_test.bin --gtest_filter=ExecutorTest/3.TestRunBasicGraph:ExecutorTest/4.TestRunBasicGraph

Issue #6361 repro validation:

  • Rebuilt the external feed-bug plugin against the refreshed staged DALI headers.
  • The original AccessOrder::wait segfault / invalid stream path is no longer
    observed after refreshing headers and rebuilding backend_impl.
  • The original repro still needs to account for executor2 lookahead: with its
    custom fixed NextBatchSize()/no-op Advance() and default prefetching, it can
    report input depletion after consuming the queued batch. A queue-aware variant
    using InputOperator's base NextBatchSize()/Advance() and prefetch_queue_depth=1
    passed two feed_input + run iterations.

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: N/A

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 21, 2026

Greptile Summary

This PR hides WorkerThread's private state behind a pure virtual interface and a CreateWorkerThread() factory, resolving issue #6361 where external operator plugins depended on DALI-private build flags (including NVML_ENABLED) through WorkerThread's layout.

  • WorkerThreadImpl (concrete, final) is declared in the new internal header worker_thread_internal.h; async pipelined executors include it directly and store members by value, while external-facing types (InputOperator, legacy VideoLoader) use std::unique_ptr<WorkerThread> via the factory.
  • WorkerThread::CheckForErrors() now correctly surfaces queued worker failures by throwing instead of silently discarding them; the async executors catch, set exec_error_, signal stop and dependent condition variables, then rethrow.
  • NVML is no longer initialized unconditionally for every GPU-bound worker thread — the nvml_handle_ is now created only when set_affinity=true, avoiding unnecessary driver load for the common set_affinity=false callers (InputOperator, VideoLoader).

Confidence Score: 5/5

This PR is safe to merge — it is a well-scoped refactoring with no new correctness issues introduced.

The change is mechanically straightforward: interface extraction, factory introduction, and moving implementation into a .cc file. The only behavioral delta is that CheckForErrors() now throws instead of being a silent no-op, which is a deliberate bug fix validated by a new gtest. RAII ownership is correct throughout — unique_ptr destructors call Shutdown() via the virtual destructor chain. The NVML initialization guard change is conservative and correct.

No files require special attention.

Important Files Changed

Filename Overview
dali/pipeline/util/worker_thread.h Reduced to a pure abstract interface + factory declaration; all private state, mutex, queue, thread, and NVML members removed; copyright year updated to 2026.
dali/pipeline/util/worker_thread_internal.h New internal header declaring WorkerThreadImpl (final, DLL_PUBLIC) and detail::Barrier; NVML header guarded inside NVML_ENABLED; all members correctly use trailing-underscore convention.
dali/pipeline/util/worker_thread.cc New implementation file; CheckForErrors() now correctly throws queued worker failures instead of silently discarding them; NVML handle initialized only when set_affinity=true.
dali/pipeline/util/worker_thread_test.cc Tests migrated to factory pattern and CPU_ONLY_DEVICE_ID sentinel; new CheckForErrorsErrorHandling test validates that queued errors are now surfaced via throw.
dali/pipeline/executor/async_pipelined_executor.h Switches to WorkerThreadImpl direct storage; CheckForErrors() now properly sets exec_error_, calls SignalStop(), and notifies mixed/gpu condition variables before rethrowing.
dali/pipeline/executor/async_separated_pipelined_executor.h Parallel change to async_pipelined_executor.h; CheckForErrors() uses exec_error_+SignalStop() only (no mixed/gpu CVs — correctly absent from this class); WorkerThreadImpl stored directly.
dali/pipeline/operator/builtin/input_operator.h sync_worker_ moved from WorkerThread value-member to unique_ptr initialized via CreateWorkerThread factory; all access sites updated to arrow syntax.
dali/operators/video/legacy/reader/video_loader.h thread_file_reader_ migrated from WorkerThread value to unique_ptr; destructor continues to call ForceStop()+Shutdown() before unique_ptr teardown, preserving correct cleanup order.

Sequence Diagram

sequenceDiagram
    participant Caller as RunCPU / RunMixed / RunGPU
    participant Exec as AsyncPipelinedExecutor
    participant WTImpl as WorkerThreadImpl
    participant Thread as Worker Thread

    Caller->>Exec: CheckForErrors()
    Exec->>WTImpl: cpu_thread_.CheckForErrors()
    alt errors_ non-empty
        WTImpl-->>WTImpl: "pop error, running_=false, notify cv_"
        WTImpl--xExec: throw runtime_error
        Exec-->>Exec: "exec_error_=true, SignalStop()"
        Exec-->>Thread: mixed_work_cv_.notify_all() / gpu_work_cv_.notify_all()
        Exec--xCaller: rethrow
    else no errors
        WTImpl-->>Exec: (no-op)
        Exec->>WTImpl: mixed/gpu_thread_.CheckForErrors()
        WTImpl-->>Exec: (no-op)
        Exec-->>Caller: return
    end

    Caller->>WTImpl: DoWork(lambda)
    WTImpl-->>Thread: wake via cv_.notify_one()
    Thread->>Thread: execute lambda
    alt success
        Thread-->>WTImpl: "work_complete_=true, completed_.notify_one()"
    else exception
        Thread-->>WTImpl: "errors_.push(), running_=false, completed_.notify_one()"
    end
Loading

Reviews (12): Last reviewed commit: "Expose WorkerThreadImpl to async executo..." | Re-trigger Greptile

Comment thread dali/pipeline/util/worker_thread.cc Outdated
Comment thread dali/pipeline/util/new_thread_pool.cc Outdated
@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch from 75c1662 to 738ce61 Compare May 21, 2026 22:33
@JanuszL
Copy link
Copy Markdown
Contributor Author

JanuszL commented May 21, 2026

@greptile review

@JanuszL
Copy link
Copy Markdown
Contributor Author

JanuszL commented May 21, 2026

!build

@dali-automaton
Copy link
Copy Markdown
Collaborator

CI MESSAGE: [52151479]: BUILD STARTED

@dali-automaton
Copy link
Copy Markdown
Collaborator

CI MESSAGE: [52151479]: BUILD FAILED

@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch from 738ce61 to b1db590 Compare May 22, 2026 04:53
@JanuszL
Copy link
Copy Markdown
Contributor Author

JanuszL commented May 22, 2026

!build

@dali-automaton
Copy link
Copy Markdown
Collaborator

CI MESSAGE: [52186371]: BUILD STARTED

Comment thread dali/pipeline/util/worker_thread.cc Outdated
@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch from b1db590 to 85d1da9 Compare May 22, 2026 06:18
@dali-automaton
Copy link
Copy Markdown
Collaborator

CI MESSAGE: [52186371]: BUILD PASSED

@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch 3 times, most recently from 70e1930 to 663957b Compare May 22, 2026 08:22
@JanuszL
Copy link
Copy Markdown
Contributor Author

JanuszL commented May 22, 2026

@greptile review

@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch 2 times, most recently from 5fbb2bd to aa1fc18 Compare May 22, 2026 08:41
@JanuszL
Copy link
Copy Markdown
Contributor Author

JanuszL commented May 22, 2026

!build

}

WorkerThread cpu_thread_, mixed_thread_, gpu_thread_;
std::unique_ptr<WorkerThread> cpu_thread_, mixed_thread_, gpu_thread_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The particular executor implementations are not a part of a public interface and therefore could use the implementation directly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the internal are moved to cc file, would you like to have them in internal header which can be included directly here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked, please check again.

@dali-automaton
Copy link
Copy Markdown
Collaborator

CI MESSAGE: [52208284]: BUILD STARTED

Make WorkerThread a virtual interface and add a factory that returns the
private final implementation. Update InputOperator, async pipelined
executors, and the legacy video loader to store WorkerThread through the
interface, keeping private worker state out of public and template-visible
types.

Keep OldThreadPool and NewThreadPool as direct concrete implementations
instead of adding pImpl storage inside the thread pools. ThreadPool is
already the virtual interface used by operators, so the concrete pool
classes keep direct access to their synchronization, queue, thread, and
NVML-dependent storage.

Make WorkerThread::CheckForErrors throw queued worker failures instead of
discarding them. Async pipelined executors route those failures through
their stop-signaling paths before rethrowing, so dependent stages are
woken consistently.

Avoid initializing NVML when worker CPU affinity is not requested. Keep
WorkerThread GTests CPU-only so they validate thread/error handling
without requiring a loaded CUDA driver.

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL JanuszL changed the title Hide thread pool implementation details Hide WorkerThread implementation details May 22, 2026
@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch from aa1fc18 to dfaf9ba Compare May 22, 2026 09:19
Move the private WorkerThread implementation declaration into
an internal header so async executor classes can store
WorkerThreadImpl directly. Keep the public WorkerThread
factory for external users while avoiding heap indirection
in executor worker storage.

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL JanuszL force-pushed the fix-thread-pool-private-impl branch from bd37af0 to 72835f4 Compare May 22, 2026 16:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants