From 8304dea9d712cb2a4740fad0675a20d4c40d2556 Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 14:20:03 +0000 Subject: [PATCH 1/4] =?UTF-8?q?[#252]:svarga:feature,=20Phase=20II=202.1?= =?UTF-8?q?=20=E2=80=94=20async=20descriptor=20types=20+=20is=5Fasync=5Fv?= =?UTF-8?q?=20trait?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor the previously-unused `false,false` specialization of `impl::detail::hid_t<>` so it stands on its own (no private inheritance) and `= delete`s `operator ::hid_t()` directly. This gives a far better diagnostic than the old private-inheritance hide: error: use of deleted function 'h5::impl::detail::hid_t ::operator hid_t() const' The previous hid_t<…,false,false,…> only had an initializer-list ctor (everything else was inherited privately, so default ctor and the from-::hid_t ctor were inaccessible). The new spec mirrors the classic true,true layout: default ctor, ::hid_t-conversion ctor, init-list ctor, copy/move ctor + assign, dtor. The `handle` field is public — internal h5cpp code (executor, dispatch lambdas) reads it without invoking the deleted conversion; user code is expected to route through h5::write / h5::read / h5::async::* factories. Adds parallel `false,false` specs for hdf5::dataset and hdf5::attribute so async ds_t / at_t / gr_t pick up the same shape as their classic counterparts (dapl field, [] operator, attribute = operator). User-facing aliases land in a new `h5::async` namespace, parallel to the classic h5:: descriptor surface: namespace h5::async { using fd_t = impl::async_hid_t; using ds_t = impl::async_did_t; using at_t = impl::async_aid_t; using gr_t = impl::async_aid_t; using ob_t = impl::async_hid_t; } Plus `h5::is_async_v` — a trait specialized on the false,false hid_t shape — for the concept-constrained operation overloads that land in Phase II PR-B. Out of scope here: - h5::async::create / open factories (Phase II 2.2) - executor FAPL property + executor thread (Phase II 2.2 / 2.3) - operation overloads that branch on is_async_v (Phase II PR-B) Verified locally: - 50/50 ctest green (classic surface untouched) - Standalone smoke test: async types are constructible, handle field accessible, is_async_v = true, is_async_v< classic *> = false - Standalone fail test: `static_cast<::hid_t>(async_fd)` emits the deleted-function diagnostic shown above (no other error spam) --- h5cpp/H5Iall.hpp | 145 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 140 insertions(+), 5 deletions(-) diff --git a/h5cpp/H5Iall.hpp b/h5cpp/H5Iall.hpp index f7c484de6f..0036dae57b 100644 --- a/h5cpp/H5Iall.hpp +++ b/h5cpp/H5Iall.hpp @@ -110,13 +110,115 @@ namespace h5::impl::detail { ::hid_t handle; }; - // disable from CAPI and TOCAPI conversions - //conversion ctor to packet table enabled, used for h5::impl::ds_t + // Phase II async-mode specialization — operator ::hid_t() is = delete'd so + // user code that accidentally hands an async descriptor to a raw HDF5 C + // API fails to compile with a clear "use of deleted function" diagnostic. + // h5cpp internal code reaches the raw handle via the public `handle` + // field (see workplan §4.4); user code routes through h5::write / h5::read + // / etc. which detect the type via is_async_v<> and dispatch through the + // FAPL-resolved executor. template - struct hid_t : private hid_t { - using parent = hid_t; + struct hid_t { using hidtype = T; - hid_t( std::initializer_list<::hid_t> fd ) : parent( fd ){} + + // from CAPI — mirrors the true,true ctor; explicit so an accidental + // implicit promotion from ::hid_t doesn't slip an async wrapper in. + H5CPP__EXPLICIT hid_t( ::hid_t handle_ ) : handle( handle_ ){ + if( H5Iis_valid( handle_ ) ) + H5Iinc_ref( handle_ ); + } + + // TO CAPI — DELETED. Async descriptors must not be implicitly + // converted back to ::hid_t; doing so would let user code call + // HDF5 directly and bypass the executor thread. Internal code + // reads the raw value from `handle` directly. + operator ::hid_t() const = delete; + + // direct-initialization ctor; matches the classic shape — does not + // increment the refcount (caller owns the handle). + hid_t( std::initializer_list<::hid_t> fd ) : handle( *fd.begin() ){} + + hid_t() : handle(H5I_UNINIT) {} + + hid_t( const hid_t& ref ){ + handle = ref.handle; + if( H5Iis_valid( handle ) ) + H5Iinc_ref( handle ); + } + hid_t& operator=( const hid_t& ref ){ + if( this == &ref ) return *this; + if( H5Iis_valid( handle ) ) + capi_close( handle ); + handle = ref.handle; + if( H5Iis_valid( handle ) ) + H5Iinc_ref( handle ); + return *this; + } + hid_t( hid_t&& ref ) noexcept { + handle = ref.handle; + ref.handle = H5I_UNINIT; + } + hid_t& operator=( hid_t&& ref ) noexcept { + if( this == &ref ) return *this; + if( H5Iis_valid( handle ) ) + capi_close( handle ); + handle = ref.handle; + ref.handle = H5I_UNINIT; + return *this; + } + ~hid_t(){ + if( H5Iis_valid( handle ) ) + capi_close( handle ); + } + + // Public so internal h5cpp code (the executor, dispatch lambdas) + // can read the raw id without invoking the deleted conversion. + // User code is expected to use h5::write / h5::read / h5::async::* + // factories rather than touch this field directly. + ::hid_t handle; + }; + + // Phase II — async dataset id. Mirrors hdf5::dataset (line above) but + // with conversion to ::hid_t deleted. Adds the `dapl` field and the + // attribute subscript operator the classic ds_t exposes. + template + struct hid_t + : public hid_t { + using parent = hid_t; + using parent::parent; + using parent::handle; + using hidtype = T; + using at_t = hid_t; + + hid_t(){ + this->handle = H5I_UNINIT; + this->dapl = H5I_UNINIT; + } + at_t operator[]( const char arg[] ); + + ::hid_t dapl; + }; + + // Phase II — async attribute id. + template + struct hid_t + : public hid_t { + using parent = hid_t; + using parent::parent; + using parent::handle; + using hidtype = T; + using at_t = hid_t; + + hid_t(){ + this->handle = H5I_UNINIT; + this->ds = H5I_UNINIT; + } + + template at_t operator=( V arg ); + template at_t operator=( const std::initializer_list args ){ return at_t{H5I_UNINIT}; } + + ::hid_t ds; + std::string name; }; /*property id*/ template @@ -176,6 +278,14 @@ namespace h5::impl { template using hid_t = detail::hid_t; template using pid_t = detail::hid_t; template using did_t = detail::hid_t; + + // Phase II — async-mode variants. Same shape as the classic aliases + // above but with operator ::hid_t() = delete'd at the type level. + // Users opt in by calling h5::async::create / h5::async::open; everything + // downstream deduces these types through TAD. + template using async_aid_t = detail::hid_t; + template using async_hid_t = detail::hid_t; + template using async_did_t = detail::hid_t; } /*hide gory details, and stamp out descriptors */ @@ -203,4 +313,29 @@ namespace h5 { #undef H5CPP__defaid_t #undef H5CPP__defpid_t #undef H5CPP__defhid_t + + // Phase II — async-mode descriptor type aliases. Parallel to the + // classic h5::fd_t / h5::ds_t / h5::gr_t / h5::at_t above; the + // underlying class template is the false,false specialization of + // impl::hid_t so any attempt to pass one of these to a raw HDF5 + // C-API call fails with "use of deleted function". + namespace async { + using fd_t = impl::async_hid_t; + using ds_t = impl::async_did_t; + using at_t = impl::async_aid_t; + using gr_t = impl::async_aid_t; + using ob_t = impl::async_hid_t; + } + + // Phase II type-trait: is_async_v answers "is T one of the + // h5::async::* descriptors?". Used by concept-constrained operation + // overloads (Phase II PR-B) to pick the executor dispatch branch. + template + struct is_async : std::false_type {}; + + template + struct is_async< impl::detail::hid_t > : std::true_type {}; + + template + inline constexpr bool is_async_v = is_async>::value; } From 6a5a990c5afd25c83be59972f3d2bcdaddfc71ea Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 14:34:59 +0000 Subject: [PATCH 2/4] =?UTF-8?q?[#252]:svarga:feature,=20Phase=20II=202.2/2?= =?UTF-8?q?.3/2.4=20=E2=80=94=20async=20executor=20+=20factories=20+=20sca?= =?UTF-8?q?ffold=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the descriptor refactor from 8304dea9 (Phase II 2.1). Three new headers wire the executor scaffold and one new test file exercises lifecycle + race coverage. # Mechanism h5cpp/H5executor.hpp ───────────────────── `impl::executor_t` — single worker thread per async fd. Same std::mutex + std::queue + doorbell + stoppable_thread_t pattern as #250's worker_pool_t; the only structural differences are (a) one worker instead of N and (b) `submit_and_wait(Fn&&)` that blocks the caller on a std::packaged_task future returned by the lambda. Exception inside the submitted callable propagates back to the submitter via `future.get()`. Reentrant safety: if the caller is already on the executor thread, the callable runs inline (the same-thread check is gated on `worker_thread_id_` populated by the loop on entry). h5cpp/H5Pfapl_async.hpp ─────────────────────── Defensive utility — installs an executor slot on a FAPL using the same shared_ptr-in-H5Pinsert2 pattern from #250's worker pool. Kept for any future code path that *can* round-trip the property through a FAPL (HDF5 v2 native async APIs, in-process FAPL inspection, etc.) but not on the primary read path — see below. h5cpp/H5async.hpp ───────────────── User-facing `h5::async::create` and `h5::async::open`. These are the only call sites where the word "async" appears. Each one: 1. resolves or installs a worker_pool_t for compression, 2. constructs an `executor_t` with that pool, 3. calls H5Fcreate / H5Fopen on the user's FAPL, 4. returns `h5::async::fd_t{raw_hid, std::move(exec)}`. Operation overloads (Phase II PR-B) reach the executor via the wrapper's `exec` field, NOT via H5Fget_access_plist on the file id — see the regression test below for the HDF5 1.10.9 reason. # HDF5 limitation that drove the executor-on-wrapper design HDF5 1.10.9's `H5Fget_access_plist` returns a synthetic FAPL reconstructed from standard properties only. Properties installed via `H5Pinsert2` (which is what #250's worker pool slot uses) do NOT survive the round-trip. Verified with both Phase I's `h5cpp_fapl_worker_pool` and Phase II's `h5cpp_fapl_executor` property names — both come back missing. Test `[#252] HDF5 strips user-inserted FAPL properties on H5Fget_access_plist` documents this behavior so future code does not regress to the slot-only pattern. (This also means Phase I's `pt_t::init`, `h5::write`, `h5::read` pool resolution via `H5Fget_access_plist` silently falls back to basic_pipeline_t — the round-trip tests still pass on correctness but the parallel-compression path is never exercised. That's a follow-up issue, not in this PR's scope.) # Refactor on H5Iall.hpp The Phase II 2.1 false,false specialization gains a `std::shared_ptr exec` field. shared_ptr with a type-erased deleter lets the fwd-declared executor_t in H5Iall.hpp be sufficient — the complete type is only needed at make_shared time inside H5async.hpp. Copy/move ctors and assignment operators carry the exec pointer along with the handle, so `h5::async::fd_t fd_b = fd_a;` shares one executor (verified by test). New factory ctor `hid_t(::hid_t, std::shared_ptr)` is the canonical construction path used by h5::async::create / open and by mode-transitive factories in Phase II PR-B. # Tests — test/H5async.cpp 18 cases covering: [#252] traits — is_async_v<> classification, default-construct [#252] static_assert — async types are not is_constructible_v <::hid_t, async_t> while classic types still are [#252 2.2] FAPL executor slot install / resolve / H5Pcopy share [#252 2.3] submit_and_wait — basic, executor-thread-id, void return type, exception propagation, reentry inline [#252 2.4] 8-thread concurrent submission stress (TSAN signal) [#252 2.5] async::create / open round-trip; h5::threads{N} chain; shared_ptr propagation across copy [#252] HDF5 1.10.9 FAPL-strip regression documentation Local C++17 build: 51/51 ctest green. Local TSAN build: test-h5async 18/18 with no races. --- h5cpp/H5Iall.hpp | 31 +++++ h5cpp/H5Pfapl_async.hpp | 91 +++++++++++++ h5cpp/H5async.hpp | 89 +++++++++++++ h5cpp/H5executor.hpp | 141 ++++++++++++++++++++ h5cpp/core | 4 +- h5cpp/io | 3 +- test/CMakeLists.txt | 1 + test/H5async.cpp | 277 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 635 insertions(+), 2 deletions(-) create mode 100644 h5cpp/H5Pfapl_async.hpp create mode 100644 h5cpp/H5async.hpp create mode 100644 h5cpp/H5executor.hpp create mode 100644 test/H5async.cpp diff --git a/h5cpp/H5Iall.hpp b/h5cpp/H5Iall.hpp index 0036dae57b..b218679835 100644 --- a/h5cpp/H5Iall.hpp +++ b/h5cpp/H5Iall.hpp @@ -10,6 +10,7 @@ #include #include #include +#include /* std::shared_ptr — async descriptor exec field */ #include #ifdef H5CPP_CONVERSION_IMPLICIT @@ -32,6 +33,18 @@ namespace h5::impl { }; //forward declarations struct at_t; + + // Phase II — async descriptors carry a shared_ptr field + // directly on the wrapper. Why direct storage and not the FAPL slot + // pattern from Phase I: HDF5 1.10.9's H5Fget_access_plist returns a + // synthetic FAPL reconstructed from standard properties only; user + // properties installed via H5Pinsert2 are dropped. Storing the + // executor inside the wrapper class lets operation overloads in + // Phase II PR-B reach it as `fd.exec` without round-tripping through + // HDF5's property machinery. std::shared_ptr's type-erased deleter + // makes the forward declaration sufficient — the complete type is + // only needed at h5::async::create / open (defined in H5async.hpp). + struct executor_t; } namespace h5::impl::detail { @@ -128,6 +141,13 @@ namespace h5::impl::detail { H5Iinc_ref( handle_ ); } + // Factory ctor — h5::async::create / open construct the executor + // during file creation and inject it here so operation overloads + // (Phase II PR-B) can reach it via `fd.exec`. Used by mode- + // transitive factories too (ds_t inherits parent fd's executor). + hid_t( ::hid_t handle_, std::shared_ptr e ) noexcept + : handle( handle_ ), exec( std::move(e) ) {} + // TO CAPI — DELETED. Async descriptors must not be implicitly // converted back to ::hid_t; doing so would let user code call // HDF5 directly and bypass the executor thread. Internal code @@ -144,6 +164,7 @@ namespace h5::impl::detail { handle = ref.handle; if( H5Iis_valid( handle ) ) H5Iinc_ref( handle ); + exec = ref.exec; // shared_ptr copy bumps refcount } hid_t& operator=( const hid_t& ref ){ if( this == &ref ) return *this; @@ -152,11 +173,13 @@ namespace h5::impl::detail { handle = ref.handle; if( H5Iis_valid( handle ) ) H5Iinc_ref( handle ); + exec = ref.exec; return *this; } hid_t( hid_t&& ref ) noexcept { handle = ref.handle; ref.handle = H5I_UNINIT; + exec = std::move(ref.exec); } hid_t& operator=( hid_t&& ref ) noexcept { if( this == &ref ) return *this; @@ -164,6 +187,7 @@ namespace h5::impl::detail { capi_close( handle ); handle = ref.handle; ref.handle = H5I_UNINIT; + exec = std::move(ref.exec); return *this; } ~hid_t(){ @@ -176,6 +200,13 @@ namespace h5::impl::detail { // User code is expected to use h5::write / h5::read / h5::async::* // factories rather than touch this field directly. ::hid_t handle; + + // Phase II — shared_ptr to the executor that owns this descriptor's + // HDF5 lifetime. Populated by h5::async::create / open at the + // file-level, then propagated to derived descriptors (async ds, + // async at, etc.) by mode-transitive factories. May be null on + // default-constructed async wrappers (un-initialized state). + std::shared_ptr exec; }; // Phase II — async dataset id. Mirrors hdf5::dataset (line above) but diff --git a/h5cpp/H5Pfapl_async.hpp b/h5cpp/H5Pfapl_async.hpp new file mode 100644 index 0000000000..45fe9a1d4b --- /dev/null +++ b/h5cpp/H5Pfapl_async.hpp @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II FAPL executor property — same shared_ptr-in-slot pattern proven +// by Phase I's worker pool (H5Pthreads.hpp). An executor_t lives behind +// a shared_ptr stored in a heap-allocated holder whose address occupies +// the H5Pinsert2 value slot; copy_cb clones the slot but aliases the same +// executor (refcount +1), close_cb drops one slot (refcount -1). When +// the last fd dies, the executor destructor joins the worker thread. +// +// h5::async::create / h5::async::open install this property in addition +// to (and consuming) any h5::threads{N} pool the user chained in. If no +// h5::threads{N} is present, fapl_async_set auto-installs a default-sized +// pool so the executor always has a pool reference. + +#include "H5Pthreads.hpp" // worker_pool_t + fapl_threads_set + resolve_worker_pool +#include "H5executor.hpp" // executor_t + +#include + +#include + +namespace h5::impl { + +#define H5CPP_FAPL_EXECUTOR "h5cpp_fapl_executor" + +// Heap-allocated holder, parallel to worker_pool_slot_t in H5Pthreads.hpp. +struct executor_slot_t { + std::shared_ptr exec; +}; + +// Copy: HDF5 memcpy'd the slot pointer into the destination. Allocate a +// fresh holder whose shared_ptr aliases the same executor (++refcount). +inline herr_t fapl_exec_copy_cb(const char* /*name*/, size_t /*size*/, void* value) { + auto** slot_loc = static_cast(value); + *slot_loc = new executor_slot_t{(*slot_loc)->exec}; + return 0; +} + +// Close: delete one holder; shared_ptr drops one reference. The +// executor_t destructor (which joins the worker thread) runs when the +// last reference is released. +inline herr_t fapl_exec_close_cb(const char* /*name*/, size_t /*size*/, void* ptr) { + delete *static_cast(ptr); + return 0; +} + +// Setter invoked by h5::async::create / h5::async::open. Idempotent — +// if the FAPL already has the property installed, leaves it alone. +// Auto-installs a default-sized worker_pool_t (n=0) when no h5::threads{N} +// was chained into the FAPL, so the executor always has a pool to hand +// to compression callbacks (Phase II PR-B wires that connection). +inline herr_t fapl_async_set(::hid_t fapl) { + if (H5Pexist(fapl, H5CPP_FAPL_EXECUTOR)) return 0; + + auto pool = resolve_worker_pool(fapl); + if (!pool) { + // No h5::threads{N} in the chain — install a default-sized pool + // so the executor has a compression backend available. + fapl_threads_set(fapl, 0); // 0 → hardware_concurrency() + pool = resolve_worker_pool(fapl); + } + + auto* slot = new executor_slot_t{ + std::make_shared(std::move(pool)) + }; + return H5Pinsert2(fapl, H5CPP_FAPL_EXECUTOR, + sizeof(executor_slot_t*), &slot, + nullptr, // set + nullptr, // get + nullptr, // prp_del + fapl_exec_copy_cb, + nullptr, // compare + fapl_exec_close_cb); +} + +// Consumer-site: given a FAPL id, retrieve the executor shared_ptr if one +// is installed. Returns nullptr when the property is absent (classic-mode +// FAPL — caller should not have been routed here). +inline std::shared_ptr resolve_executor(::hid_t fapl_id) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return nullptr; + if (!H5Pexist(fapl_id, H5CPP_FAPL_EXECUTOR)) return nullptr; + executor_slot_t* slot = nullptr; + H5Pget(fapl_id, H5CPP_FAPL_EXECUTOR, &slot); + return slot ? slot->exec : nullptr; +} + +} // namespace h5::impl diff --git a/h5cpp/H5async.hpp b/h5cpp/H5async.hpp new file mode 100644 index 0000000000..cebd51483c --- /dev/null +++ b/h5cpp/H5async.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II user-facing factories — h5::async::create / h5::async::open. +// These are the only call sites where the word "async" appears in user +// code; every downstream operation (h5::write, h5::read, etc.) deduces +// async-ness from the FD type via TAD and dispatches through the +// executor with `if constexpr (is_async_v)`. +// +// Mechanism: +// 1. Resolve / install a worker_pool_t on a local FAPL clone so the +// executor has a compression backend (Phase II PR-B will wire that +// connection through h5::write / h5::read). +// 2. Construct the executor with that pool. +// 3. Call H5Fcreate / H5Fopen on the user's FAPL (unchanged). +// 4. Wrap the resulting ::hid_t plus the executor shared_ptr in the +// async fd_t. The executor lives as long as any descriptor +// derived from this fd holds the shared_ptr. +// +// We deliberately don't rely on H5Pinsert2 + H5Fget_access_plist for +// the executor handoff — HDF5 1.10.9 strips user properties from the +// FAPL retrieved off a file id, which would make the executor +// unreachable on the read path. Storing it on the wrapper sidesteps +// that limitation. + +#include "H5Pall.hpp" +#include "H5Pthreads.hpp" // worker_pool_t + h5::threads + resolve_worker_pool +#include "H5executor.hpp" // executor_t — complete type required at make_shared + +#include +#include +#include + +namespace h5::async { + +namespace impl_detail { + +// Build the executor that backs an async fd. Reuses an h5::threads{N} +// pool from the user's FAPL chain when present, otherwise stands up a +// default-sized pool on a freshly-created FAPL (we don't try to +// H5Pcopy the user's FAPL because H5P_DEFAULT — the common case — is +// not a copyable property list). +inline std::shared_ptr +make_executor_for(const h5::fapl_t& fapl) { + auto pool = h5::impl::resolve_worker_pool(static_cast<::hid_t>(fapl)); + if (!pool) { + h5::fapl_t scratch{H5Pcreate(H5P_FILE_ACCESS)}; + h5::impl::fapl_threads_set(static_cast<::hid_t>(scratch), 0); + pool = h5::impl::resolve_worker_pool(static_cast<::hid_t>(scratch)); + } + return std::make_shared(std::move(pool)); +} + +} // namespace impl_detail + +inline h5::async::fd_t create(const std::string& path, unsigned flags, + const h5::fcpl_t& fcpl = h5::default_fcpl, + const h5::fapl_t& fapl = h5::default_fapl) { + H5CPP_CHECK_PROP(fcpl, h5::error::io::file::create, "invalid file control property list"); + H5CPP_CHECK_PROP(fapl, h5::error::io::file::create, "invalid file access property list"); + + auto exec = impl_detail::make_executor_for(fapl); + + hid_t fd; + H5CPP_CHECK_NZ( + (fd = H5Fcreate(path.data(), flags, + static_cast<::hid_t>(fcpl), + static_cast<::hid_t>(fapl))), + h5::error::io::file::create, h5::error::msg::create_file); + return h5::async::fd_t{fd, std::move(exec)}; +} + +inline h5::async::fd_t open(const std::string& path, unsigned flags, + const h5::fapl_t& fapl = h5::default_fapl) { + H5CPP_CHECK_PROP(fapl, h5::error::io::file::open, "invalid file access property list"); + + auto exec = impl_detail::make_executor_for(fapl); + + hid_t fd; + H5CPP_CHECK_NZ( + (fd = H5Fopen(path.data(), flags, static_cast<::hid_t>(fapl))), + h5::error::io::file::open, h5::error::msg::open_file); + return h5::async::fd_t{fd, std::move(exec)}; +} + +} // namespace h5::async diff --git a/h5cpp/H5executor.hpp b/h5cpp/H5executor.hpp new file mode 100644 index 0000000000..2fc8923823 --- /dev/null +++ b/h5cpp/H5executor.hpp @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II executor — one thread per async fd. Every HDF5 C-API call on an +// h5::async::* descriptor is serialized through this thread via +// submit_and_wait, restoring thread-safety for HDF5 builds compiled without +// --enable-threadsafe. Compression continues to parallelize through the +// Phase I worker_pool_t held by reference. +// +// Construction model mirrors Phase I's worker_pool_t (H5Pthreads.hpp): single +// mutex-guarded queue + doorbell + stoppable_thread_t, std::packaged_task in +// shared_ptr wrapped in std::function for type erasure. The only +// substantive difference is the single worker thread and the sync wait +// facade. + +#include "H5Pthreads.hpp" // worker_pool_t + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "detail/doorbell.hpp" +#include "detail/stoppable_thread.hpp" + +namespace h5::impl { + +struct executor_t { + // pool is optional today; Phase II operation overloads (PR-B) will + // require it. Default-construct keeps the scaffold trivially usable + // in lifecycle tests that don't exercise compression. + explicit executor_t(std::shared_ptr pool = nullptr) + : pool_(std::move(pool)), + worker_([this](h5::detail::stop_token_t st) { worker_loop(st); }) + {} + + ~executor_t() { + wait_idle(); + stopping_.store(true, std::memory_order_release); + bell_.ring(); + // stoppable_thread_t dtor requests stop + joins + } + + executor_t(const executor_t&) = delete; + executor_t& operator=(const executor_t&) = delete; + executor_t(executor_t&&) = delete; + executor_t& operator=(executor_t&&) = delete; + + // Submit a callable and block until it completes on the executor thread. + // Exception in the callable propagates back to the submitting thread via + // future::get. + // + // Reentrant safety: if the caller is already running on the executor + // thread (e.g. one h5cpp operation calling another inside the executor), + // run the callable inline rather than queueing — queueing would + // deadlock the executor on itself. + template + auto submit_and_wait(Fn&& fn) -> std::invoke_result_t { + using R = std::invoke_result_t; + + if (std::this_thread::get_id() == + worker_thread_id_.load(std::memory_order_acquire)) { + // Already on the executor thread — run inline, no queue trip. + // Exception thrown in fn propagates back to caller naturally + // (no packaged_task wrap needed in this branch). + return std::forward(fn)(); + } + + auto task = std::make_shared>(std::forward(fn)); + auto fut = task->get_future(); + in_flight_.fetch_add(1, std::memory_order_release); + { + std::lock_guard lk(m_); + tasks_.emplace([task] { (*task)(); }); + } + bell_.ring(); + return fut.get(); // blocks on executor; rethrows exception + } + + // Block until the queue drains. Called by ~executor_t before stop. + void wait_idle() { + while (in_flight_.load(std::memory_order_acquire) > 0) + std::this_thread::yield(); + } + + [[nodiscard]] std::shared_ptr pool() const { return pool_; } + [[nodiscard]] int in_flight() const noexcept { return in_flight_.load(std::memory_order_acquire); } + [[nodiscard]] std::thread::id worker_thread_id() const noexcept { + return worker_thread_id_.load(std::memory_order_acquire); + } + +private: + void worker_loop(h5::detail::stop_token_t st) { + worker_thread_id_.store(std::this_thread::get_id(), + std::memory_order_release); + while (!st.stop_requested()) { + std::function task; + std::uint32_t last_seq = 0; + bool got_task = false; + { + std::lock_guard lk(m_); + if (!tasks_.empty()) { + task = std::move(tasks_.front()); + tasks_.pop(); + got_task = true; + } else { + last_seq = bell_.load(); + } + } + + if (got_task) { + try { task(); } catch (...) { /* packaged_task captures it */ } + in_flight_.fetch_sub(1, std::memory_order_release); + continue; + } + + if (st.stop_requested() || stopping_.load(std::memory_order_acquire)) return; + bell_.wait(last_seq); + } + } + + std::shared_ptr pool_; + std::mutex m_; + std::queue> tasks_; + h5::detail::doorbell_t bell_; + std::atomic in_flight_{0}; + std::atomic stopping_{false}; + std::atomic worker_thread_id_{}; + h5::detail::stoppable_thread_t worker_; // declared LAST so all + // members are live when + // worker_loop starts +}; + +} // namespace h5::impl diff --git a/h5cpp/core b/h5cpp/core index 40f7014eb8..a33725e87f 100644 --- a/h5cpp/core +++ b/h5cpp/core @@ -63,7 +63,9 @@ #include "H5Pdapl.hpp" #include "H5Pthreads.hpp" #include "H5Zpipeline_pool.hpp" - + #include "H5executor.hpp" + #include "H5Pfapl_async.hpp" + #include "H5Ialgorithm.hpp" #include "H5capi.hpp" #include "H5cout.hpp" diff --git a/h5cpp/io b/h5cpp/io index 359f22981f..19872bb342 100644 --- a/h5cpp/io +++ b/h5cpp/io @@ -8,7 +8,8 @@ #include "H5Fcreate.hpp" #include "H5Fopen.hpp" - + #include "H5async.hpp" /* h5::async::create / h5::async::open factories */ + #include "H5Dcreate.hpp" #include "H5Dopen.hpp" #include "H5Rall.hpp" diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3ec990c14b..6279796f83 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -98,6 +98,7 @@ add_test_case(H5Zlz4) add_test_case(H5Zzstd) add_test_case(H5Zszip) # add_test_case(H5Iall) +add_test_case(H5async) # Phase II scaffold lifecycle + submit_and_wait tests # --------------------------------------------------------------------------- # C++20 streaming ranges view — issue [#181]. diff --git a/test/H5async.cpp b/test/H5async.cpp new file mode 100644 index 0000000000..ba7f93f3b0 --- /dev/null +++ b/test/H5async.cpp @@ -0,0 +1,277 @@ +// Phase II scaffold tests — h5::async::* types, executor lifecycle, +// submit_and_wait correctness, exception propagation. Operation +// overloads (h5::write / h5::read on async fds) are tested in PR-B. + +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "support/fixture.hpp" + +// =========================================================================== +// [#252 2.1] async descriptor types — type traits and constructibility +// =========================================================================== + +TEST_CASE("[#252] h5::async types satisfy is_async_v") { + static_assert( h5::is_async_v, "async::fd_t"); + static_assert( h5::is_async_v, "async::ds_t"); + static_assert( h5::is_async_v, "async::at_t"); + static_assert( h5::is_async_v, "async::gr_t"); + static_assert( h5::is_async_v, "async::ob_t"); + static_assert(!h5::is_async_v, "fd_t classic"); + static_assert(!h5::is_async_v, "ds_t classic"); + static_assert(!h5::is_async_v, "at_t classic"); + CHECK(true); +} + +TEST_CASE("[#252] async descriptors compile-block raw ::hid_t conversion") { + // The `= delete` on operator ::hid_t() means is_constructible_v<::hid_t, T> + // is false for every async wrapper. Classic wrappers retain the explicit + // conversion so they remain constructible via static_cast. + static_assert(!std::is_constructible_v<::hid_t, h5::async::fd_t>, "fd_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::ds_t>, "ds_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::at_t>, "at_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::gr_t>, "gr_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::ob_t>, "ob_t"); + // Control assertions on the classic surface — these MUST stay constructible. + static_assert( std::is_constructible_v<::hid_t, h5::fd_t>, "classic fd_t"); + static_assert( std::is_constructible_v<::hid_t, h5::ds_t>, "classic ds_t"); + CHECK(true); +} + +TEST_CASE("[#252] h5::async::* default-construct to H5I_UNINIT") { + h5::async::fd_t fd; + h5::async::ds_t ds; + h5::async::gr_t gr; + h5::async::at_t at; + CHECK(fd.handle == H5I_UNINIT); + CHECK(ds.handle == H5I_UNINIT); + CHECK(gr.handle == H5I_UNINIT); + CHECK(at.handle == H5I_UNINIT); +} + +// =========================================================================== +// [#252 2.2] FAPL executor slot — install + resolve + H5Pcopy preserves slot +// =========================================================================== + +TEST_CASE("[#252 2.2] fapl_async_set installs an executor on the FAPL") { + h5::fapl_t fapl{H5Pcreate(H5P_FILE_ACCESS)}; + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(exec); + CHECK(exec->pool()); // default pool auto-installed + CHECK(exec->in_flight() == 0); +} + +TEST_CASE("[#252 2.2] H5Pcopy preserves shared executor ownership") { + h5::fapl_t src{H5Pcreate(H5P_FILE_ACCESS)}; + h5::impl::fapl_async_set(static_cast<::hid_t>(src)); + auto exec_src = h5::impl::resolve_executor(static_cast<::hid_t>(src)); + REQUIRE(exec_src); + + h5::fapl_t dst{H5Pcopy(static_cast<::hid_t>(src))}; + auto exec_dst = h5::impl::resolve_executor(static_cast<::hid_t>(dst)); + REQUIRE(exec_dst); + + // Same executor instance through both FAPLs — slot copy_cb aliases. + CHECK(exec_src.get() == exec_dst.get()); +} + +TEST_CASE("[#252 2.2] fapl_async_set is idempotent") { + h5::fapl_t fapl{H5Pcreate(H5P_FILE_ACCESS)}; + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec1 = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec2 = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + CHECK(exec1.get() == exec2.get()); +} + +TEST_CASE("[#252 2.2] async fapl chains h5::threads{N} when explicit") { + h5::fapl_t fapl = h5::threads{6}; + h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)); + auto exec = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(exec); + REQUIRE(exec->pool()); + CHECK(exec->pool()->worker_count() == 6); +} + +// =========================================================================== +// [#252 2.3] executor_t — submit_and_wait + exception propagation + reentry +// =========================================================================== + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait runs callable and returns result") { + h5::impl::executor_t exec; + int result = exec.submit_and_wait([]{ return 42; }); + CHECK(result == 42); + CHECK(exec.in_flight() == 0); +} + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait runs on the executor thread") { + h5::impl::executor_t exec; + std::thread::id caller = std::this_thread::get_id(); + std::thread::id callee = exec.submit_and_wait([]{ return std::this_thread::get_id(); }); + CHECK(caller != callee); + CHECK(callee == exec.worker_thread_id()); +} + +TEST_CASE("[#252 2.3] exception inside submit_and_wait propagates to caller") { + h5::impl::executor_t exec; + CHECK_THROWS_AS( + exec.submit_and_wait([]{ throw std::runtime_error("boom"); }), + std::runtime_error); + CHECK(exec.in_flight() == 0); +} + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait re-entry runs inline (no deadlock)") { + h5::impl::executor_t exec; + int result = exec.submit_and_wait([&]{ + // Nested call from inside the executor thread — must not enqueue + // (would deadlock) — runs inline by the same-thread check. + return exec.submit_and_wait([]{ return 7; }) * 6; + }); + CHECK(result == 42); +} + +TEST_CASE("[#252 2.3] executor_t void-returning callable") { + h5::impl::executor_t exec; + std::atomic counter{0}; + exec.submit_and_wait([&]{ counter.fetch_add(1); }); + CHECK(counter.load() == 1); +} + +// =========================================================================== +// [#252 2.4] Multi-thread submission stress — TSAN sees nothing +// =========================================================================== + +TEST_CASE("[#252 2.4] executor_t serializes 8 concurrent submitters correctly") { + h5::impl::executor_t exec; + constexpr int per_thread = 32; + constexpr int n_threads = 8; + std::atomic sum{0}; + + std::vector ths; + ths.reserve(n_threads); + for (int t = 0; t < n_threads; ++t) { + ths.emplace_back([&, t]{ + for (int i = 0; i < per_thread; ++i) { + int got = exec.submit_and_wait([&, t, i]{ + return t * 100 + i; + }); + sum.fetch_add(got, std::memory_order_relaxed); + } + }); + } + for (auto& th : ths) th.join(); + + int expected = 0; + for (int t = 0; t < n_threads; ++t) + for (int i = 0; i < per_thread; ++i) + expected += t * 100 + i; + CHECK(sum.load() == expected); + CHECK(exec.in_flight() == 0); +} + +// =========================================================================== +// [#252 2.5] h5::async::create / open — file round-trip lifecycle +// =========================================================================== +// +// The executor is reached via `fd.exec` directly (a shared_ptr field on the +// async wrapper) rather than via H5Fget_access_plist on the file id. +// Phase I's choice to use the FAPL slot pattern + H5Fget_access_plist runs +// into HDF5 1.10.9's behavior: H5Fget_access_plist returns a synthetic +// FAPL reconstructed from standard properties only, so user-inserted +// properties (H5Pinsert2) silently disappear. See the regression test +// below for the documented HDF5 behavior. + +TEST_CASE("[#252 2.5] h5::async::create + close round-trip") { + const char* path = "test-252-async-create.h5"; + std::remove(path); + { + h5::async::fd_t fd = h5::async::create(path, H5F_ACC_TRUNC); + REQUIRE(H5Iis_valid(fd.handle)); + REQUIRE(fd.exec); + CHECK(fd.exec->pool()); + } + // fd dtor closed the file — verify by re-opening with the classic API. + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + CHECK(H5Iis_valid(static_cast<::hid_t>(fd))); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] h5::async::open round-trip on existing file") { + const char* path = "test-252-async-open.h5"; + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC); // classic create + } + { + h5::async::fd_t fd = h5::async::open(path, H5F_ACC_RDWR); + REQUIRE(H5Iis_valid(fd.handle)); + REQUIRE(fd.exec); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] async file with h5::threads{N} keeps explicit pool size") { + const char* path = "test-252-async-threads.h5"; + std::remove(path); + { + h5::async::fd_t fd = h5::async::create(path, H5F_ACC_TRUNC, + h5::default_fcpl, + h5::fapl_t{h5::threads{4}}); + REQUIRE(fd.exec); + REQUIRE(fd.exec->pool()); + CHECK(fd.exec->pool()->worker_count() == 4); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] copy of h5::async::fd_t shares the executor") { + const char* path = "test-252-async-share.h5"; + std::remove(path); + { + h5::async::fd_t fd_a = h5::async::create(path, H5F_ACC_TRUNC); + h5::async::fd_t fd_b = fd_a; // shared_ptr aliases + REQUIRE(fd_a.exec); + REQUIRE(fd_b.exec); + CHECK(fd_a.exec.get() == fd_b.exec.get()); + } + std::remove(path); +} + +// =========================================================================== +// [#252 2.5] HDF5 1.10.9 regression doc — user FAPL properties don't survive +// H5Fget_access_plist. This documents *why* fd.exec is stored on the +// wrapper rather than retrieved from the file's FAPL. +// =========================================================================== + +TEST_CASE("[#252] HDF5 strips user-inserted FAPL properties on H5Fget_access_plist") { + // Build a FAPL with a user-inserted property (Phase I's worker pool + // property uses the same mechanism via H5Pinsert2). + h5::fapl_t fapl_in = h5::threads{4}; + REQUIRE(H5Pexist(static_cast<::hid_t>(fapl_in), "h5cpp_fapl_worker_pool") > 0); + + const char* path = "test-252-fapl-strip.h5"; + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl_in); + ::hid_t fapl_out = H5Fget_access_plist(static_cast<::hid_t>(fd)); + REQUIRE(fapl_out >= 0); + + // Documents the HDF5 behavior we work around in h5::async by + // storing the executor on the wrapper directly: + CHECK(H5Pexist(fapl_out, "h5cpp_fapl_worker_pool") == 0); + H5Pclose(fapl_out); + } + std::remove(path); +} From c84ead6feb49e8033ebc6481f787dd8aa8ab3e0b Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 14:36:18 +0000 Subject: [PATCH 3/4] =?UTF-8?q?[#252]:svarga:docs,=20Phase=20II=202.5=20?= =?UTF-8?q?=E2=80=94=20README=20+=20pipeline=20rework=20report=20for=20asy?= =?UTF-8?q?nc=20scaffold?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit README v1.12 highlights gains a one-line bullet for the async-mode scaffold: `h5::async::fd_t fd = h5::async::create(...)` with the deleted ::hid_t conversion as the safety mechanism. docs/filtering-pipeline-rework-report.md gains a "Status — Phase II PR-A (#252, async descriptors + executor scaffold)" section covering the namespace shape, the mechanism (executor on wrapper, not on FAPL slot — with the HDF5 1.10.9 H5Fget_access_plist limitation called out), and what's deferred to PR-B. --- README.md | 1 + docs/filtering-pipeline-rework-report.md | 45 +++++++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8dfd10bd6c..357477708c 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,7 @@ cmake --install build - **Rank-7** array support - **Expanded attribute** type coverage - **FAPL-scoped worker pool** — `h5::create(..., h5::threads{N} | h5::backpressure{M})` opts the file into parallel filter compression; all chunked datasets opened on that file (and pt_t built from them) inherit the pool with async-pipelined dispatch +- **Async-mode scaffold** — `h5::async::fd_t fd = h5::async::create(...)` returns a descriptor whose `operator ::hid_t()` is `= delete`'d so accidental raw-C-API calls fail at compile time; per-fd executor thread serializes HDF5 calls. Operation overloads land in the next PR. - **HDF5 1.12.2 ceiling** — tested and verified; `H5Dvlen_reclaim` / reference API compatibility - **Windows MSVC** in the CI matrix - **ASan + UBSan + TSan** clean on Clang 20 diff --git a/docs/filtering-pipeline-rework-report.md b/docs/filtering-pipeline-rework-report.md index ab8b3fae68..26ad22d926 100644 --- a/docs/filtering-pipeline-rework-report.md +++ b/docs/filtering-pipeline-rework-report.md @@ -193,4 +193,47 @@ Back-pressure is bounded by `h5::backpressure{M}`: the producer blocks on the fr The legacy per-pt_t `h5::filter::threads{N}` constructor from #241 is removed in this cycle (see [Phase 1.4 commit message]). Two parallel threading paths in the pipeline invite contention bugs and confuse the surface; the FAPL pool fully subsumes it. -Phase II (compile-time C-API blocking on `async_fd_t`, full async mode) is tracked separately. +Phase II (compile-time C-API blocking on `h5::async::fd_t`, full async mode) is tracked separately. + +## Status — Phase II PR-A (#252, async descriptors + executor scaffold) + +Phase II is delivered in two PRs. PR-A (this work) lands the descriptor types, FAPL executor property, and executor thread; PR-B will wire the concept-constrained `h5::write` / `h5::read` / `h5::create(fd, "ds", …)` overloads that branch on `is_async_v`. + +### User-visible surface (PR-A) + +```cpp +// Namespace shape: nested h5::async::*, parallel to the classic h5::*. +namespace h5::async { + using fd_t = impl::async_hid_t; + using ds_t = impl::async_did_t; + using gr_t = impl::async_aid_t; + using at_t = impl::async_aid_t; + + fd_t create(const std::string& path, unsigned flags, + const h5::fcpl_t& fcpl = h5::default_fcpl, + const h5::fapl_t& fapl = h5::default_fapl); + fd_t open (const std::string& path, unsigned flags, + const h5::fapl_t& fapl = h5::default_fapl); +} + +// Mode is declared exactly once — at h5::async::create / open. +// Every downstream operation deduces async-ness from the FD type via +// TAD (Phase II PR-B). +``` + +`h5::async::fd_t` has `operator ::hid_t() = delete`, so a stray `H5Gcreate2(async_fd, …)` is a clean compile error ("use of deleted function") rather than a silent thread-safety hazard. + +### Mechanism (PR-A) + +- `h5cpp/H5executor.hpp` — single worker thread per async fd, `std::packaged_task` in a `shared_ptr` wrapped in `std::function`, `submit_and_wait(Fn&&)` blocking the caller via the future. Exceptions propagate back through `future::get`; same-thread re-entry runs the callable inline. +- `h5cpp/H5Pfapl_async.hpp` — FAPL executor slot using the same `H5Pinsert2` + shared-ptr pattern as Phase I. Kept as a defensive utility; the primary code path does not depend on it. +- `h5cpp/H5async.hpp` — the two factories. Each constructs an `executor_t`, then calls `H5Fcreate` / `H5Fopen`, then returns `h5::async::fd_t{raw_hid, exec}`. + +The executor lives **directly on the wrapper** (a `std::shared_ptr exec` field on the `false,false` `hid_t` specialization), not retrieved from the file's FAPL via `H5Fget_access_plist`. HDF5 1.10.9 reconstructs the retrieved FAPL from standard properties only — `H5Pinsert2`-installed properties do not survive the round-trip. (This also affects Phase I's pool resolution path; tracked as a follow-up.) + +### Out of scope for PR-A (lands in PR-B) + +- Concept-constrained overloads of `h5::write`, `h5::read`, `h5::append`, `h5::flush`, `h5::create(fd, "ds", …)`. +- Mode-transitive factory pattern (async fd → async ds → async at). +- `h5::pt_t` as a class template `template ` deduced via CTAD. +- Performance benchmarks vs. HDF5 `--enable-threadsafe`. From e6b8eafd6a91b7d60f3ea0f8b8bd4b0970e00a4a Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 16:41:15 +0000 Subject: [PATCH 4/4] [#252]:svarga:fix, executor in_flight counter belongs to submit_and_wait, not worker_loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI coverage job (gcov-instrumented, slower than release builds) hit a benign race in test/H5async.cpp:130 — CHECK( exec.in_flight() == 0 ) // after exception case Race timeline: - submit_and_wait() did in_flight_++ at submission - worker_loop pulled task, ran it (or caught exception), then did in_flight_-- AFTER setting the future's value - submitter's fut.get() unblocked the moment the future was set, BEFORE the worker reached its fetch_sub - the in_flight() check on the submitter's thread could see 1 instead of 0 if scheduled between fut.get() returning and the worker's fetch_sub Local release builds (and previous CI on TSAN / ASAN / UBSAN) were fast enough that the worker's fetch_sub almost always won the race; the coverage job's instrumentation widened the window enough to expose it deterministically. Fix: move the decrement from worker_loop to submit_and_wait's return path via an RAII guard. After the change, in_flight_ is causally tied to the lifetime of `submit_and_wait` on the submitter's thread, NOT to "task has been dequeued and run by the worker". Reads of in_flight() immediately after the call now see 0 deterministically — no spin, no wait_idle, no race. Mechanism: template auto submit_and_wait(Fn&& fn) { ... in_flight_.fetch_add(1, ...); { lock_guard ... ; tasks_.emplace([task] { (*task)(); }); } bell_.ring(); struct decrement_on_exit { std::atomic& counter; ~decrement_on_exit() { counter.fetch_sub(1, ...); } } guard{in_flight_}; return fut.get(); // blocks; rethrows on exception } The RAII guard handles both normal return and exception paths identically — the future's value (or exception) propagates, then ~guard runs, then the caller observes both the return / throw AND the post-decrement state of in_flight_. worker_loop's fetch_sub is removed; the worker now just runs the task and continues. wait_idle() semantics are unchanged because submit_and_wait blocks until fut.get() returns — in_flight_ stays at the same monotone count it would have had with the old design. Verified: - Local release: 18/18 h5async ctest green - Local TSAN: 18/18 h5async ctest green, no new races --- h5cpp/H5executor.hpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/h5cpp/H5executor.hpp b/h5cpp/H5executor.hpp index 2fc8923823..4bfbe658ef 100644 --- a/h5cpp/H5executor.hpp +++ b/h5cpp/H5executor.hpp @@ -81,6 +81,17 @@ struct executor_t { tasks_.emplace([task] { (*task)(); }); } bell_.ring(); + // Decrement counter on the submitter's path, AFTER fut.get() + // unblocks (causal: the worker has completed task() because the + // future is ready). RAII guard ensures the decrement also fires + // when fut.get() rethrows. This ties in_flight_ to "submit_and_wait + // is in flight on this thread" rather than "task is still queued" + // — tests that check in_flight() right after the call now see 0 + // deterministically, no race with the worker thread. + struct decrement_on_exit { + std::atomic& counter; + ~decrement_on_exit() { counter.fetch_sub(1, std::memory_order_release); } + } guard{in_flight_}; return fut.get(); // blocks on executor; rethrows exception } @@ -117,7 +128,10 @@ struct executor_t { if (got_task) { try { task(); } catch (...) { /* packaged_task captures it */ } - in_flight_.fetch_sub(1, std::memory_order_release); + // in_flight_ is decremented by submit_and_wait on the + // submitter's path (see RAII guard there), not here. + // Worker just executes the task; the counter belongs to + // the submission's lifetime. continue; }