diff --git a/README.md b/README.md index 8dfd10bd6c..357477708c 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,7 @@ cmake --install build - **Rank-7** array support - **Expanded attribute** type coverage - **FAPL-scoped worker pool** — `h5::create(..., h5::threads{N} | h5::backpressure{M})` opts the file into parallel filter compression; all chunked datasets opened on that file (and pt_t built from them) inherit the pool with async-pipelined dispatch +- **Async-mode scaffold** — `h5::async::fd_t fd = h5::async::create(...)` returns a descriptor whose `operator ::hid_t()` is `= delete`'d so accidental raw-C-API calls fail at compile time; per-fd executor thread serializes HDF5 calls. Operation overloads land in the next PR. - **HDF5 1.12.2 ceiling** — tested and verified; `H5Dvlen_reclaim` / reference API compatibility - **Windows MSVC** in the CI matrix - **ASan + UBSan + TSan** clean on Clang 20 diff --git a/docs/filtering-pipeline-rework-report.md b/docs/filtering-pipeline-rework-report.md index ab8b3fae68..26ad22d926 100644 --- a/docs/filtering-pipeline-rework-report.md +++ b/docs/filtering-pipeline-rework-report.md @@ -193,4 +193,47 @@ Back-pressure is bounded by `h5::backpressure{M}`: the producer blocks on the fr The legacy per-pt_t `h5::filter::threads{N}` constructor from #241 is removed in this cycle (see [Phase 1.4 commit message]). Two parallel threading paths in the pipeline invite contention bugs and confuse the surface; the FAPL pool fully subsumes it. -Phase II (compile-time C-API blocking on `async_fd_t`, full async mode) is tracked separately. +Phase II (compile-time C-API blocking on `h5::async::fd_t`, full async mode) is tracked separately. + +## Status — Phase II PR-A (#252, async descriptors + executor scaffold) + +Phase II is delivered in two PRs. PR-A (this work) lands the descriptor types, FAPL executor property, and executor thread; PR-B will wire the concept-constrained `h5::write` / `h5::read` / `h5::create(fd, "ds", …)` overloads that branch on `is_async_v`. + +### User-visible surface (PR-A) + +```cpp +// Namespace shape: nested h5::async::*, parallel to the classic h5::*. +namespace h5::async { + using fd_t = impl::async_hid_t; + using ds_t = impl::async_did_t; + using gr_t = impl::async_aid_t; + using at_t = impl::async_aid_t; + + fd_t create(const std::string& path, unsigned flags, + const h5::fcpl_t& fcpl = h5::default_fcpl, + const h5::fapl_t& fapl = h5::default_fapl); + fd_t open (const std::string& path, unsigned flags, + const h5::fapl_t& fapl = h5::default_fapl); +} + +// Mode is declared exactly once — at h5::async::create / open. +// Every downstream operation deduces async-ness from the FD type via +// TAD (Phase II PR-B). +``` + +`h5::async::fd_t` has `operator ::hid_t() = delete`, so a stray `H5Gcreate2(async_fd, …)` is a clean compile error ("use of deleted function") rather than a silent thread-safety hazard. + +### Mechanism (PR-A) + +- `h5cpp/H5executor.hpp` — single worker thread per async fd, `std::packaged_task` in a `shared_ptr` wrapped in `std::function`, `submit_and_wait(Fn&&)` blocking the caller via the future. Exceptions propagate back through `future::get`; same-thread re-entry runs the callable inline. +- `h5cpp/H5Pfapl_async.hpp` — FAPL executor slot using the same `H5Pinsert2` + shared-ptr pattern as Phase I. Kept as a defensive utility; the primary code path does not depend on it. +- `h5cpp/H5async.hpp` — the two factories. Each constructs an `executor_t`, then calls `H5Fcreate` / `H5Fopen`, then returns `h5::async::fd_t{raw_hid, exec}`. + +The executor lives **directly on the wrapper** (a `std::shared_ptr exec` field on the `false,false` `hid_t` specialization), not retrieved from the file's FAPL via `H5Fget_access_plist`. HDF5 1.10.9 reconstructs the retrieved FAPL from standard properties only — `H5Pinsert2`-installed properties do not survive the round-trip. (This also affects Phase I's pool resolution path; tracked as a follow-up.) + +### Out of scope for PR-A (lands in PR-B) + +- Concept-constrained overloads of `h5::write`, `h5::read`, `h5::append`, `h5::flush`, `h5::create(fd, "ds", …)`. +- Mode-transitive factory pattern (async fd → async ds → async at). +- `h5::pt_t` as a class template `template ` deduced via CTAD. +- Performance benchmarks vs. HDF5 `--enable-threadsafe`. diff --git a/h5cpp/H5Iall.hpp b/h5cpp/H5Iall.hpp index f7c484de6f..b218679835 100644 --- a/h5cpp/H5Iall.hpp +++ b/h5cpp/H5Iall.hpp @@ -10,6 +10,7 @@ #include #include #include +#include /* std::shared_ptr — async descriptor exec field */ #include #ifdef H5CPP_CONVERSION_IMPLICIT @@ -32,6 +33,18 @@ namespace h5::impl { }; //forward declarations struct at_t; + + // Phase II — async descriptors carry a shared_ptr field + // directly on the wrapper. Why direct storage and not the FAPL slot + // pattern from Phase I: HDF5 1.10.9's H5Fget_access_plist returns a + // synthetic FAPL reconstructed from standard properties only; user + // properties installed via H5Pinsert2 are dropped. Storing the + // executor inside the wrapper class lets operation overloads in + // Phase II PR-B reach it as `fd.exec` without round-tripping through + // HDF5's property machinery. std::shared_ptr's type-erased deleter + // makes the forward declaration sufficient — the complete type is + // only needed at h5::async::create / open (defined in H5async.hpp). + struct executor_t; } namespace h5::impl::detail { @@ -110,13 +123,133 @@ namespace h5::impl::detail { ::hid_t handle; }; - // disable from CAPI and TOCAPI conversions - //conversion ctor to packet table enabled, used for h5::impl::ds_t + // Phase II async-mode specialization — operator ::hid_t() is = delete'd so + // user code that accidentally hands an async descriptor to a raw HDF5 C + // API fails to compile with a clear "use of deleted function" diagnostic. + // h5cpp internal code reaches the raw handle via the public `handle` + // field (see workplan §4.4); user code routes through h5::write / h5::read + // / etc. which detect the type via is_async_v<> and dispatch through the + // FAPL-resolved executor. template - struct hid_t : private hid_t { - using parent = hid_t; + struct hid_t { + using hidtype = T; + + // from CAPI — mirrors the true,true ctor; explicit so an accidental + // implicit promotion from ::hid_t doesn't slip an async wrapper in. + H5CPP__EXPLICIT hid_t( ::hid_t handle_ ) : handle( handle_ ){ + if( H5Iis_valid( handle_ ) ) + H5Iinc_ref( handle_ ); + } + + // Factory ctor — h5::async::create / open construct the executor + // during file creation and inject it here so operation overloads + // (Phase II PR-B) can reach it via `fd.exec`. Used by mode- + // transitive factories too (ds_t inherits parent fd's executor). + hid_t( ::hid_t handle_, std::shared_ptr e ) noexcept + : handle( handle_ ), exec( std::move(e) ) {} + + // TO CAPI — DELETED. Async descriptors must not be implicitly + // converted back to ::hid_t; doing so would let user code call + // HDF5 directly and bypass the executor thread. Internal code + // reads the raw value from `handle` directly. + operator ::hid_t() const = delete; + + // direct-initialization ctor; matches the classic shape — does not + // increment the refcount (caller owns the handle). + hid_t( std::initializer_list<::hid_t> fd ) : handle( *fd.begin() ){} + + hid_t() : handle(H5I_UNINIT) {} + + hid_t( const hid_t& ref ){ + handle = ref.handle; + if( H5Iis_valid( handle ) ) + H5Iinc_ref( handle ); + exec = ref.exec; // shared_ptr copy bumps refcount + } + hid_t& operator=( const hid_t& ref ){ + if( this == &ref ) return *this; + if( H5Iis_valid( handle ) ) + capi_close( handle ); + handle = ref.handle; + if( H5Iis_valid( handle ) ) + H5Iinc_ref( handle ); + exec = ref.exec; + return *this; + } + hid_t( hid_t&& ref ) noexcept { + handle = ref.handle; + ref.handle = H5I_UNINIT; + exec = std::move(ref.exec); + } + hid_t& operator=( hid_t&& ref ) noexcept { + if( this == &ref ) return *this; + if( H5Iis_valid( handle ) ) + capi_close( handle ); + handle = ref.handle; + ref.handle = H5I_UNINIT; + exec = std::move(ref.exec); + return *this; + } + ~hid_t(){ + if( H5Iis_valid( handle ) ) + capi_close( handle ); + } + + // Public so internal h5cpp code (the executor, dispatch lambdas) + // can read the raw id without invoking the deleted conversion. + // User code is expected to use h5::write / h5::read / h5::async::* + // factories rather than touch this field directly. + ::hid_t handle; + + // Phase II — shared_ptr to the executor that owns this descriptor's + // HDF5 lifetime. Populated by h5::async::create / open at the + // file-level, then propagated to derived descriptors (async ds, + // async at, etc.) by mode-transitive factories. May be null on + // default-constructed async wrappers (un-initialized state). + std::shared_ptr exec; + }; + + // Phase II — async dataset id. Mirrors hdf5::dataset (line above) but + // with conversion to ::hid_t deleted. Adds the `dapl` field and the + // attribute subscript operator the classic ds_t exposes. + template + struct hid_t + : public hid_t { + using parent = hid_t; + using parent::parent; + using parent::handle; using hidtype = T; - hid_t( std::initializer_list<::hid_t> fd ) : parent( fd ){} + using at_t = hid_t; + + hid_t(){ + this->handle = H5I_UNINIT; + this->dapl = H5I_UNINIT; + } + at_t operator[]( const char arg[] ); + + ::hid_t dapl; + }; + + // Phase II — async attribute id. + template + struct hid_t + : public hid_t { + using parent = hid_t; + using parent::parent; + using parent::handle; + using hidtype = T; + using at_t = hid_t; + + hid_t(){ + this->handle = H5I_UNINIT; + this->ds = H5I_UNINIT; + } + + template at_t operator=( V arg ); + template at_t operator=( const std::initializer_list args ){ return at_t{H5I_UNINIT}; } + + ::hid_t ds; + std::string name; }; /*property id*/ template @@ -176,6 +309,14 @@ namespace h5::impl { template using hid_t = detail::hid_t; template using pid_t = detail::hid_t; template using did_t = detail::hid_t; + + // Phase II — async-mode variants. Same shape as the classic aliases + // above but with operator ::hid_t() = delete'd at the type level. + // Users opt in by calling h5::async::create / h5::async::open; everything + // downstream deduces these types through TAD. + template using async_aid_t = detail::hid_t; + template using async_hid_t = detail::hid_t; + template using async_did_t = detail::hid_t; } /*hide gory details, and stamp out descriptors */ @@ -203,4 +344,29 @@ namespace h5 { #undef H5CPP__defaid_t #undef H5CPP__defpid_t #undef H5CPP__defhid_t + + // Phase II — async-mode descriptor type aliases. Parallel to the + // classic h5::fd_t / h5::ds_t / h5::gr_t / h5::at_t above; the + // underlying class template is the false,false specialization of + // impl::hid_t so any attempt to pass one of these to a raw HDF5 + // C-API call fails with "use of deleted function". + namespace async { + using fd_t = impl::async_hid_t; + using ds_t = impl::async_did_t; + using at_t = impl::async_aid_t; + using gr_t = impl::async_aid_t; + using ob_t = impl::async_hid_t; + } + + // Phase II type-trait: is_async_v answers "is T one of the + // h5::async::* descriptors?". Used by concept-constrained operation + // overloads (Phase II PR-B) to pick the executor dispatch branch. + template + struct is_async : std::false_type {}; + + template + struct is_async< impl::detail::hid_t > : std::true_type {}; + + template + inline constexpr bool is_async_v = is_async>::value; } diff --git a/h5cpp/H5Pfapl_async.hpp b/h5cpp/H5Pfapl_async.hpp new file mode 100644 index 0000000000..45fe9a1d4b --- /dev/null +++ b/h5cpp/H5Pfapl_async.hpp @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II FAPL executor property — same shared_ptr-in-slot pattern proven +// by Phase I's worker pool (H5Pthreads.hpp). An executor_t lives behind +// a shared_ptr stored in a heap-allocated holder whose address occupies +// the H5Pinsert2 value slot; copy_cb clones the slot but aliases the same +// executor (refcount +1), close_cb drops one slot (refcount -1). When +// the last fd dies, the executor destructor joins the worker thread. +// +// h5::async::create / h5::async::open install this property in addition +// to (and consuming) any h5::threads{N} pool the user chained in. If no +// h5::threads{N} is present, fapl_async_set auto-installs a default-sized +// pool so the executor always has a pool reference. + +#include "H5Pthreads.hpp" // worker_pool_t + fapl_threads_set + resolve_worker_pool +#include "H5executor.hpp" // executor_t + +#include + +#include + +namespace h5::impl { + +#define H5CPP_FAPL_EXECUTOR "h5cpp_fapl_executor" + +// Heap-allocated holder, parallel to worker_pool_slot_t in H5Pthreads.hpp. +struct executor_slot_t { + std::shared_ptr exec; +}; + +// Copy: HDF5 memcpy'd the slot pointer into the destination. Allocate a +// fresh holder whose shared_ptr aliases the same executor (++refcount). +inline herr_t fapl_exec_copy_cb(const char* /*name*/, size_t /*size*/, void* value) { + auto** slot_loc = static_cast(value); + *slot_loc = new executor_slot_t{(*slot_loc)->exec}; + return 0; +} + +// Close: delete one holder; shared_ptr drops one reference. The +// executor_t destructor (which joins the worker thread) runs when the +// last reference is released. +inline herr_t fapl_exec_close_cb(const char* /*name*/, size_t /*size*/, void* ptr) { + delete *static_cast(ptr); + return 0; +} + +// Setter invoked by h5::async::create / h5::async::open. Idempotent — +// if the FAPL already has the property installed, leaves it alone. +// Auto-installs a default-sized worker_pool_t (n=0) when no h5::threads{N} +// was chained into the FAPL, so the executor always has a pool to hand +// to compression callbacks (Phase II PR-B wires that connection). +inline herr_t fapl_async_set(::hid_t fapl) { + if (H5Pexist(fapl, H5CPP_FAPL_EXECUTOR)) return 0; + + auto pool = resolve_worker_pool(fapl); + if (!pool) { + // No h5::threads{N} in the chain — install a default-sized pool + // so the executor has a compression backend available. + fapl_threads_set(fapl, 0); // 0 → hardware_concurrency() + pool = resolve_worker_pool(fapl); + } + + auto* slot = new executor_slot_t{ + std::make_shared(std::move(pool)) + }; + return H5Pinsert2(fapl, H5CPP_FAPL_EXECUTOR, + sizeof(executor_slot_t*), &slot, + nullptr, // set + nullptr, // get + nullptr, // prp_del + fapl_exec_copy_cb, + nullptr, // compare + fapl_exec_close_cb); +} + +// Consumer-site: given a FAPL id, retrieve the executor shared_ptr if one +// is installed. Returns nullptr when the property is absent (classic-mode +// FAPL — caller should not have been routed here). +inline std::shared_ptr resolve_executor(::hid_t fapl_id) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return nullptr; + if (!H5Pexist(fapl_id, H5CPP_FAPL_EXECUTOR)) return nullptr; + executor_slot_t* slot = nullptr; + H5Pget(fapl_id, H5CPP_FAPL_EXECUTOR, &slot); + return slot ? slot->exec : nullptr; +} + +} // namespace h5::impl diff --git a/h5cpp/H5async.hpp b/h5cpp/H5async.hpp new file mode 100644 index 0000000000..cebd51483c --- /dev/null +++ b/h5cpp/H5async.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II user-facing factories — h5::async::create / h5::async::open. +// These are the only call sites where the word "async" appears in user +// code; every downstream operation (h5::write, h5::read, etc.) deduces +// async-ness from the FD type via TAD and dispatches through the +// executor with `if constexpr (is_async_v)`. +// +// Mechanism: +// 1. Resolve / install a worker_pool_t on a local FAPL clone so the +// executor has a compression backend (Phase II PR-B will wire that +// connection through h5::write / h5::read). +// 2. Construct the executor with that pool. +// 3. Call H5Fcreate / H5Fopen on the user's FAPL (unchanged). +// 4. Wrap the resulting ::hid_t plus the executor shared_ptr in the +// async fd_t. The executor lives as long as any descriptor +// derived from this fd holds the shared_ptr. +// +// We deliberately don't rely on H5Pinsert2 + H5Fget_access_plist for +// the executor handoff — HDF5 1.10.9 strips user properties from the +// FAPL retrieved off a file id, which would make the executor +// unreachable on the read path. Storing it on the wrapper sidesteps +// that limitation. + +#include "H5Pall.hpp" +#include "H5Pthreads.hpp" // worker_pool_t + h5::threads + resolve_worker_pool +#include "H5executor.hpp" // executor_t — complete type required at make_shared + +#include +#include +#include + +namespace h5::async { + +namespace impl_detail { + +// Build the executor that backs an async fd. Reuses an h5::threads{N} +// pool from the user's FAPL chain when present, otherwise stands up a +// default-sized pool on a freshly-created FAPL (we don't try to +// H5Pcopy the user's FAPL because H5P_DEFAULT — the common case — is +// not a copyable property list). +inline std::shared_ptr +make_executor_for(const h5::fapl_t& fapl) { + auto pool = h5::impl::resolve_worker_pool(static_cast<::hid_t>(fapl)); + if (!pool) { + h5::fapl_t scratch{H5Pcreate(H5P_FILE_ACCESS)}; + h5::impl::fapl_threads_set(static_cast<::hid_t>(scratch), 0); + pool = h5::impl::resolve_worker_pool(static_cast<::hid_t>(scratch)); + } + return std::make_shared(std::move(pool)); +} + +} // namespace impl_detail + +inline h5::async::fd_t create(const std::string& path, unsigned flags, + const h5::fcpl_t& fcpl = h5::default_fcpl, + const h5::fapl_t& fapl = h5::default_fapl) { + H5CPP_CHECK_PROP(fcpl, h5::error::io::file::create, "invalid file control property list"); + H5CPP_CHECK_PROP(fapl, h5::error::io::file::create, "invalid file access property list"); + + auto exec = impl_detail::make_executor_for(fapl); + + hid_t fd; + H5CPP_CHECK_NZ( + (fd = H5Fcreate(path.data(), flags, + static_cast<::hid_t>(fcpl), + static_cast<::hid_t>(fapl))), + h5::error::io::file::create, h5::error::msg::create_file); + return h5::async::fd_t{fd, std::move(exec)}; +} + +inline h5::async::fd_t open(const std::string& path, unsigned flags, + const h5::fapl_t& fapl = h5::default_fapl) { + H5CPP_CHECK_PROP(fapl, h5::error::io::file::open, "invalid file access property list"); + + auto exec = impl_detail::make_executor_for(fapl); + + hid_t fd; + H5CPP_CHECK_NZ( + (fd = H5Fopen(path.data(), flags, static_cast<::hid_t>(fapl))), + h5::error::io::file::open, h5::error::msg::open_file); + return h5::async::fd_t{fd, std::move(exec)}; +} + +} // namespace h5::async diff --git a/h5cpp/H5executor.hpp b/h5cpp/H5executor.hpp new file mode 100644 index 0000000000..4bfbe658ef --- /dev/null +++ b/h5cpp/H5executor.hpp @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// Phase II executor — one thread per async fd. Every HDF5 C-API call on an +// h5::async::* descriptor is serialized through this thread via +// submit_and_wait, restoring thread-safety for HDF5 builds compiled without +// --enable-threadsafe. Compression continues to parallelize through the +// Phase I worker_pool_t held by reference. +// +// Construction model mirrors Phase I's worker_pool_t (H5Pthreads.hpp): single +// mutex-guarded queue + doorbell + stoppable_thread_t, std::packaged_task in +// shared_ptr wrapped in std::function for type erasure. The only +// substantive difference is the single worker thread and the sync wait +// facade. + +#include "H5Pthreads.hpp" // worker_pool_t + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "detail/doorbell.hpp" +#include "detail/stoppable_thread.hpp" + +namespace h5::impl { + +struct executor_t { + // pool is optional today; Phase II operation overloads (PR-B) will + // require it. Default-construct keeps the scaffold trivially usable + // in lifecycle tests that don't exercise compression. + explicit executor_t(std::shared_ptr pool = nullptr) + : pool_(std::move(pool)), + worker_([this](h5::detail::stop_token_t st) { worker_loop(st); }) + {} + + ~executor_t() { + wait_idle(); + stopping_.store(true, std::memory_order_release); + bell_.ring(); + // stoppable_thread_t dtor requests stop + joins + } + + executor_t(const executor_t&) = delete; + executor_t& operator=(const executor_t&) = delete; + executor_t(executor_t&&) = delete; + executor_t& operator=(executor_t&&) = delete; + + // Submit a callable and block until it completes on the executor thread. + // Exception in the callable propagates back to the submitting thread via + // future::get. + // + // Reentrant safety: if the caller is already running on the executor + // thread (e.g. one h5cpp operation calling another inside the executor), + // run the callable inline rather than queueing — queueing would + // deadlock the executor on itself. + template + auto submit_and_wait(Fn&& fn) -> std::invoke_result_t { + using R = std::invoke_result_t; + + if (std::this_thread::get_id() == + worker_thread_id_.load(std::memory_order_acquire)) { + // Already on the executor thread — run inline, no queue trip. + // Exception thrown in fn propagates back to caller naturally + // (no packaged_task wrap needed in this branch). + return std::forward(fn)(); + } + + auto task = std::make_shared>(std::forward(fn)); + auto fut = task->get_future(); + in_flight_.fetch_add(1, std::memory_order_release); + { + std::lock_guard lk(m_); + tasks_.emplace([task] { (*task)(); }); + } + bell_.ring(); + // Decrement counter on the submitter's path, AFTER fut.get() + // unblocks (causal: the worker has completed task() because the + // future is ready). RAII guard ensures the decrement also fires + // when fut.get() rethrows. This ties in_flight_ to "submit_and_wait + // is in flight on this thread" rather than "task is still queued" + // — tests that check in_flight() right after the call now see 0 + // deterministically, no race with the worker thread. + struct decrement_on_exit { + std::atomic& counter; + ~decrement_on_exit() { counter.fetch_sub(1, std::memory_order_release); } + } guard{in_flight_}; + return fut.get(); // blocks on executor; rethrows exception + } + + // Block until the queue drains. Called by ~executor_t before stop. + void wait_idle() { + while (in_flight_.load(std::memory_order_acquire) > 0) + std::this_thread::yield(); + } + + [[nodiscard]] std::shared_ptr pool() const { return pool_; } + [[nodiscard]] int in_flight() const noexcept { return in_flight_.load(std::memory_order_acquire); } + [[nodiscard]] std::thread::id worker_thread_id() const noexcept { + return worker_thread_id_.load(std::memory_order_acquire); + } + +private: + void worker_loop(h5::detail::stop_token_t st) { + worker_thread_id_.store(std::this_thread::get_id(), + std::memory_order_release); + while (!st.stop_requested()) { + std::function task; + std::uint32_t last_seq = 0; + bool got_task = false; + { + std::lock_guard lk(m_); + if (!tasks_.empty()) { + task = std::move(tasks_.front()); + tasks_.pop(); + got_task = true; + } else { + last_seq = bell_.load(); + } + } + + if (got_task) { + try { task(); } catch (...) { /* packaged_task captures it */ } + // in_flight_ is decremented by submit_and_wait on the + // submitter's path (see RAII guard there), not here. + // Worker just executes the task; the counter belongs to + // the submission's lifetime. + continue; + } + + if (st.stop_requested() || stopping_.load(std::memory_order_acquire)) return; + bell_.wait(last_seq); + } + } + + std::shared_ptr pool_; + std::mutex m_; + std::queue> tasks_; + h5::detail::doorbell_t bell_; + std::atomic in_flight_{0}; + std::atomic stopping_{false}; + std::atomic worker_thread_id_{}; + h5::detail::stoppable_thread_t worker_; // declared LAST so all + // members are live when + // worker_loop starts +}; + +} // namespace h5::impl diff --git a/h5cpp/core b/h5cpp/core index 40f7014eb8..a33725e87f 100644 --- a/h5cpp/core +++ b/h5cpp/core @@ -63,7 +63,9 @@ #include "H5Pdapl.hpp" #include "H5Pthreads.hpp" #include "H5Zpipeline_pool.hpp" - + #include "H5executor.hpp" + #include "H5Pfapl_async.hpp" + #include "H5Ialgorithm.hpp" #include "H5capi.hpp" #include "H5cout.hpp" diff --git a/h5cpp/io b/h5cpp/io index 359f22981f..19872bb342 100644 --- a/h5cpp/io +++ b/h5cpp/io @@ -8,7 +8,8 @@ #include "H5Fcreate.hpp" #include "H5Fopen.hpp" - + #include "H5async.hpp" /* h5::async::create / h5::async::open factories */ + #include "H5Dcreate.hpp" #include "H5Dopen.hpp" #include "H5Rall.hpp" diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3ec990c14b..6279796f83 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -98,6 +98,7 @@ add_test_case(H5Zlz4) add_test_case(H5Zzstd) add_test_case(H5Zszip) # add_test_case(H5Iall) +add_test_case(H5async) # Phase II scaffold lifecycle + submit_and_wait tests # --------------------------------------------------------------------------- # C++20 streaming ranges view — issue [#181]. diff --git a/test/H5async.cpp b/test/H5async.cpp new file mode 100644 index 0000000000..ba7f93f3b0 --- /dev/null +++ b/test/H5async.cpp @@ -0,0 +1,277 @@ +// Phase II scaffold tests — h5::async::* types, executor lifecycle, +// submit_and_wait correctness, exception propagation. Operation +// overloads (h5::write / h5::read on async fds) are tested in PR-B. + +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "support/fixture.hpp" + +// =========================================================================== +// [#252 2.1] async descriptor types — type traits and constructibility +// =========================================================================== + +TEST_CASE("[#252] h5::async types satisfy is_async_v") { + static_assert( h5::is_async_v, "async::fd_t"); + static_assert( h5::is_async_v, "async::ds_t"); + static_assert( h5::is_async_v, "async::at_t"); + static_assert( h5::is_async_v, "async::gr_t"); + static_assert( h5::is_async_v, "async::ob_t"); + static_assert(!h5::is_async_v, "fd_t classic"); + static_assert(!h5::is_async_v, "ds_t classic"); + static_assert(!h5::is_async_v, "at_t classic"); + CHECK(true); +} + +TEST_CASE("[#252] async descriptors compile-block raw ::hid_t conversion") { + // The `= delete` on operator ::hid_t() means is_constructible_v<::hid_t, T> + // is false for every async wrapper. Classic wrappers retain the explicit + // conversion so they remain constructible via static_cast. + static_assert(!std::is_constructible_v<::hid_t, h5::async::fd_t>, "fd_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::ds_t>, "ds_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::at_t>, "at_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::gr_t>, "gr_t"); + static_assert(!std::is_constructible_v<::hid_t, h5::async::ob_t>, "ob_t"); + // Control assertions on the classic surface — these MUST stay constructible. + static_assert( std::is_constructible_v<::hid_t, h5::fd_t>, "classic fd_t"); + static_assert( std::is_constructible_v<::hid_t, h5::ds_t>, "classic ds_t"); + CHECK(true); +} + +TEST_CASE("[#252] h5::async::* default-construct to H5I_UNINIT") { + h5::async::fd_t fd; + h5::async::ds_t ds; + h5::async::gr_t gr; + h5::async::at_t at; + CHECK(fd.handle == H5I_UNINIT); + CHECK(ds.handle == H5I_UNINIT); + CHECK(gr.handle == H5I_UNINIT); + CHECK(at.handle == H5I_UNINIT); +} + +// =========================================================================== +// [#252 2.2] FAPL executor slot — install + resolve + H5Pcopy preserves slot +// =========================================================================== + +TEST_CASE("[#252 2.2] fapl_async_set installs an executor on the FAPL") { + h5::fapl_t fapl{H5Pcreate(H5P_FILE_ACCESS)}; + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(exec); + CHECK(exec->pool()); // default pool auto-installed + CHECK(exec->in_flight() == 0); +} + +TEST_CASE("[#252 2.2] H5Pcopy preserves shared executor ownership") { + h5::fapl_t src{H5Pcreate(H5P_FILE_ACCESS)}; + h5::impl::fapl_async_set(static_cast<::hid_t>(src)); + auto exec_src = h5::impl::resolve_executor(static_cast<::hid_t>(src)); + REQUIRE(exec_src); + + h5::fapl_t dst{H5Pcopy(static_cast<::hid_t>(src))}; + auto exec_dst = h5::impl::resolve_executor(static_cast<::hid_t>(dst)); + REQUIRE(exec_dst); + + // Same executor instance through both FAPLs — slot copy_cb aliases. + CHECK(exec_src.get() == exec_dst.get()); +} + +TEST_CASE("[#252 2.2] fapl_async_set is idempotent") { + h5::fapl_t fapl{H5Pcreate(H5P_FILE_ACCESS)}; + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec1 = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)) == 0); + auto exec2 = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + CHECK(exec1.get() == exec2.get()); +} + +TEST_CASE("[#252 2.2] async fapl chains h5::threads{N} when explicit") { + h5::fapl_t fapl = h5::threads{6}; + h5::impl::fapl_async_set(static_cast<::hid_t>(fapl)); + auto exec = h5::impl::resolve_executor(static_cast<::hid_t>(fapl)); + REQUIRE(exec); + REQUIRE(exec->pool()); + CHECK(exec->pool()->worker_count() == 6); +} + +// =========================================================================== +// [#252 2.3] executor_t — submit_and_wait + exception propagation + reentry +// =========================================================================== + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait runs callable and returns result") { + h5::impl::executor_t exec; + int result = exec.submit_and_wait([]{ return 42; }); + CHECK(result == 42); + CHECK(exec.in_flight() == 0); +} + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait runs on the executor thread") { + h5::impl::executor_t exec; + std::thread::id caller = std::this_thread::get_id(); + std::thread::id callee = exec.submit_and_wait([]{ return std::this_thread::get_id(); }); + CHECK(caller != callee); + CHECK(callee == exec.worker_thread_id()); +} + +TEST_CASE("[#252 2.3] exception inside submit_and_wait propagates to caller") { + h5::impl::executor_t exec; + CHECK_THROWS_AS( + exec.submit_and_wait([]{ throw std::runtime_error("boom"); }), + std::runtime_error); + CHECK(exec.in_flight() == 0); +} + +TEST_CASE("[#252 2.3] executor_t::submit_and_wait re-entry runs inline (no deadlock)") { + h5::impl::executor_t exec; + int result = exec.submit_and_wait([&]{ + // Nested call from inside the executor thread — must not enqueue + // (would deadlock) — runs inline by the same-thread check. + return exec.submit_and_wait([]{ return 7; }) * 6; + }); + CHECK(result == 42); +} + +TEST_CASE("[#252 2.3] executor_t void-returning callable") { + h5::impl::executor_t exec; + std::atomic counter{0}; + exec.submit_and_wait([&]{ counter.fetch_add(1); }); + CHECK(counter.load() == 1); +} + +// =========================================================================== +// [#252 2.4] Multi-thread submission stress — TSAN sees nothing +// =========================================================================== + +TEST_CASE("[#252 2.4] executor_t serializes 8 concurrent submitters correctly") { + h5::impl::executor_t exec; + constexpr int per_thread = 32; + constexpr int n_threads = 8; + std::atomic sum{0}; + + std::vector ths; + ths.reserve(n_threads); + for (int t = 0; t < n_threads; ++t) { + ths.emplace_back([&, t]{ + for (int i = 0; i < per_thread; ++i) { + int got = exec.submit_and_wait([&, t, i]{ + return t * 100 + i; + }); + sum.fetch_add(got, std::memory_order_relaxed); + } + }); + } + for (auto& th : ths) th.join(); + + int expected = 0; + for (int t = 0; t < n_threads; ++t) + for (int i = 0; i < per_thread; ++i) + expected += t * 100 + i; + CHECK(sum.load() == expected); + CHECK(exec.in_flight() == 0); +} + +// =========================================================================== +// [#252 2.5] h5::async::create / open — file round-trip lifecycle +// =========================================================================== +// +// The executor is reached via `fd.exec` directly (a shared_ptr field on the +// async wrapper) rather than via H5Fget_access_plist on the file id. +// Phase I's choice to use the FAPL slot pattern + H5Fget_access_plist runs +// into HDF5 1.10.9's behavior: H5Fget_access_plist returns a synthetic +// FAPL reconstructed from standard properties only, so user-inserted +// properties (H5Pinsert2) silently disappear. See the regression test +// below for the documented HDF5 behavior. + +TEST_CASE("[#252 2.5] h5::async::create + close round-trip") { + const char* path = "test-252-async-create.h5"; + std::remove(path); + { + h5::async::fd_t fd = h5::async::create(path, H5F_ACC_TRUNC); + REQUIRE(H5Iis_valid(fd.handle)); + REQUIRE(fd.exec); + CHECK(fd.exec->pool()); + } + // fd dtor closed the file — verify by re-opening with the classic API. + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + CHECK(H5Iis_valid(static_cast<::hid_t>(fd))); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] h5::async::open round-trip on existing file") { + const char* path = "test-252-async-open.h5"; + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC); // classic create + } + { + h5::async::fd_t fd = h5::async::open(path, H5F_ACC_RDWR); + REQUIRE(H5Iis_valid(fd.handle)); + REQUIRE(fd.exec); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] async file with h5::threads{N} keeps explicit pool size") { + const char* path = "test-252-async-threads.h5"; + std::remove(path); + { + h5::async::fd_t fd = h5::async::create(path, H5F_ACC_TRUNC, + h5::default_fcpl, + h5::fapl_t{h5::threads{4}}); + REQUIRE(fd.exec); + REQUIRE(fd.exec->pool()); + CHECK(fd.exec->pool()->worker_count() == 4); + } + std::remove(path); +} + +TEST_CASE("[#252 2.5] copy of h5::async::fd_t shares the executor") { + const char* path = "test-252-async-share.h5"; + std::remove(path); + { + h5::async::fd_t fd_a = h5::async::create(path, H5F_ACC_TRUNC); + h5::async::fd_t fd_b = fd_a; // shared_ptr aliases + REQUIRE(fd_a.exec); + REQUIRE(fd_b.exec); + CHECK(fd_a.exec.get() == fd_b.exec.get()); + } + std::remove(path); +} + +// =========================================================================== +// [#252 2.5] HDF5 1.10.9 regression doc — user FAPL properties don't survive +// H5Fget_access_plist. This documents *why* fd.exec is stored on the +// wrapper rather than retrieved from the file's FAPL. +// =========================================================================== + +TEST_CASE("[#252] HDF5 strips user-inserted FAPL properties on H5Fget_access_plist") { + // Build a FAPL with a user-inserted property (Phase I's worker pool + // property uses the same mechanism via H5Pinsert2). + h5::fapl_t fapl_in = h5::threads{4}; + REQUIRE(H5Pexist(static_cast<::hid_t>(fapl_in), "h5cpp_fapl_worker_pool") > 0); + + const char* path = "test-252-fapl-strip.h5"; + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl_in); + ::hid_t fapl_out = H5Fget_access_plist(static_cast<::hid_t>(fd)); + REQUIRE(fapl_out >= 0); + + // Documents the HDF5 behavior we work around in h5::async by + // storing the executor on the wrapper directly: + CHECK(H5Pexist(fapl_out, "h5cpp_fapl_worker_pool") == 0); + H5Pclose(fapl_out); + } + std::remove(path); +}