diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c1660f610..2e5973da06 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -499,10 +499,86 @@ jobs: name: status-ubsan path: badge-status/status-ubsan.json + tsan: + name: tsan / ubuntu-24.04 / clang-20 + runs-on: ubuntu-24.04 + timeout-minutes: 45 + + concurrency: + group: tsan-${{ github.ref }} + cancel-in-progress: true + + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Install dependencies + shell: bash + run: | + set -euxo pipefail + sudo apt-get update + sudo apt-get install -y cmake ninja-build wget gnupg lsb-release software-properties-common libhdf5-dev + wget https://apt.llvm.org/llvm.sh + chmod +x llvm.sh + sudo ./llvm.sh 20 all + echo "CC=$(command -v clang-20)" >> "$GITHUB_ENV" + echo "CXX=$(command -v clang++-20)" >> "$GITHUB_ENV" + + - name: Configure + shell: bash + run: | + set -euxo pipefail + cmake -S . -B build -G Ninja \ + -DCMAKE_C_COMPILER="$CC" \ + -DCMAKE_CXX_COMPILER="$CXX" \ + -DCMAKE_CXX_STANDARD=17 \ + -DCMAKE_BUILD_TYPE=Debug \ + -DCMAKE_C_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \ + -DCMAKE_CXX_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \ + -DCMAKE_EXE_LINKER_FLAGS="-fsanitize=thread" \ + -DH5CPP_BUILD_TESTS=ON + + - name: Build + shell: bash + run: cmake --build build --parallel + + - name: Test + shell: bash + env: + # halt_on_error=1: TSAN exits on the first data race so CI fails loud. + # second_deadlock_stack=1: print full stack of the lock that completed + # the cycle (default prints only one stack for the offending pair). + TSAN_OPTIONS: "halt_on_error=1:second_deadlock_stack=1" + run: ctest --test-dir build --output-on-failure + + - name: Record Badge Status + if: always() + shell: bash + run: | + set -euxo pipefail + mkdir -p badge-status + cat < badge-status/status-tsan.json + { + "os": "ubuntu-24.04", + "compiler": "clang-20", + "label": "TSan", + "status": "${{ job.status }}" + } + EOF + + - name: Upload Status Artifact + if: always() + uses: actions/upload-artifact@v7 + with: + name: status-tsan + path: badge-status/status-tsan.json + badge: name: Generate SVG Badges if: always() - needs: [build, asan, ubsan] + needs: [build, asan, ubsan, tsan] runs-on: ubuntu-24.04 steps: @@ -648,8 +724,8 @@ jobs: -DCMAKE_CXX_STANDARD=20 \ -DCMAKE_C_COMPILER=gcc-14 \ -DCMAKE_CXX_COMPILER=g++-14 \ - -DCMAKE_C_FLAGS="--coverage -O0 -g" \ - -DCMAKE_CXX_FLAGS="--coverage -O0 -g" \ + -DCMAKE_C_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \ + -DCMAKE_CXX_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \ -DH5CPP_BUILD_TESTS=ON \ -DH5CPP_BUILD_EXAMPLES=ON diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index 6643627c76..062174cf67 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -86,10 +86,12 @@ jobs: - name: Configure run: | + HDF5_PREFIX="$(brew --prefix hdf5)" cmake -B build \ -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_INSTALL_PREFIX=/usr/local \ - -DHDF5_ROOT=$(brew --prefix hdf5) \ + -DHDF5_ROOT="$HDF5_PREFIX" \ + -DCMAKE_PREFIX_PATH="$HDF5_PREFIX" \ -DH5CPP_BUILD_EXAMPLES=OFF \ -DH5CPP_BUILD_TESTS=OFF \ -DH5CPP_BUILD_BENCH=OFF @@ -107,21 +109,100 @@ jobs: if-no-files-found: error # ── Windows (NSIS .exe) ─────────────────────────────────────────────────────── + # No Chocolatey 'hdf5' package exists; mirror the build-from-source approach + # from ci.yml. zlib is built fresh each run (~30s); HDF5 install prefix is + # cached between runs. package-windows: name: windows / x64 / NSIS runs-on: windows-latest + env: + HDF5_VERSION: 1.12.2 + HDF5_CACHE_VERSION: v3 steps: - uses: actions/checkout@v4 - - name: Install HDF5 - run: choco install hdf5 -y --no-progress + - name: Build zlib from source + shell: powershell + run: | + $ErrorActionPreference = "Stop" + $zlib_version = "1.3.1" + $zlib_archive = "$env:RUNNER_TEMP\zlib-$zlib_version.tar.gz" + $zlib_source = "$env:RUNNER_TEMP\zlib-$zlib_version" + $zlib_build = "$env:RUNNER_TEMP\zlib-build" + $zlib_prefix = "$env:RUNNER_TEMP\zlib-install" + + Invoke-WebRequest ` + -Uri "https://github.com/madler/zlib/archive/refs/tags/v$zlib_version.tar.gz" ` + -OutFile $zlib_archive + tar -xzf $zlib_archive -C $env:RUNNER_TEMP + + cmake -S $zlib_source -B $zlib_build ` + -G "Visual Studio 17 2022" -A x64 ` + -DCMAKE_INSTALL_PREFIX="$zlib_prefix" + cmake --build $zlib_build --parallel --config Release + cmake --install $zlib_build --config Release + + - name: Restore HDF5 cache + id: cache-hdf5 + uses: actions/cache/restore@v5 + with: + path: ${{ runner.temp }}\hdf5-${{ env.HDF5_VERSION }}-install + key: hdf5-windows-vs2022-${{ env.HDF5_VERSION }}-pkg-${{ env.HDF5_CACHE_VERSION }} + + - name: Build HDF5 from source + if: steps.cache-hdf5.outputs.cache-hit != 'true' + shell: powershell + run: | + $ErrorActionPreference = "Stop" + $hdf5_version = "${{ env.HDF5_VERSION }}" + $hdf5_archive = "$env:RUNNER_TEMP\hdf5-$hdf5_version.tar.gz" + $hdf5_source = "$env:RUNNER_TEMP\hdf5-$hdf5_version" + $hdf5_build = "$env:RUNNER_TEMP\hdf5-$hdf5_version-build" + $hdf5_prefix = "$env:RUNNER_TEMP\hdf5-$hdf5_version-install" + + Invoke-WebRequest ` + -Uri "https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-$hdf5_version/src/hdf5-$hdf5_version.tar.gz" ` + -OutFile $hdf5_archive + tar -xzf $hdf5_archive -C $env:RUNNER_TEMP + + cmake -S $hdf5_source -B $hdf5_build ` + -G "Visual Studio 17 2022" -A x64 ` + -DCMAKE_INSTALL_PREFIX="$hdf5_prefix" ` + -DHDF_CFG_NAME=Release ` + -DBUILD_SHARED_LIBS=ON ` + -DBUILD_TESTING=OFF ` + -DHDF5_BUILD_TOOLS=OFF ` + -DHDF5_BUILD_UTILS=OFF ` + -DHDF5_BUILD_EXAMPLES=OFF ` + -DHDF5_BUILD_CPP_LIB=OFF ` + -DHDF5_BUILD_HL_LIB=OFF ` + -DHDF5_BUILD_FORTRAN=OFF ` + -DHDF5_ENABLE_Z_LIB_SUPPORT=ON ` + -DHDF5_ENABLE_SZIP_SUPPORT=OFF ` + -DZLIB_USE_EXTERNAL=OFF ` + -DZLIB_INCLUDE_DIR="$env:RUNNER_TEMP/zlib-install/include" ` + -DZLIB_LIBRARY="$env:RUNNER_TEMP/zlib-install/lib/zlib.lib" + + cmake --build $hdf5_build --parallel --config Release + cmake --install $hdf5_build --config Release + + - name: Save HDF5 cache + if: steps.cache-hdf5.outputs.cache-hit != 'true' + uses: actions/cache/save@v5 + with: + path: ${{ runner.temp }}\hdf5-${{ env.HDF5_VERSION }}-install + key: ${{ steps.cache-hdf5.outputs.cache-primary-key }} - name: Configure shell: pwsh run: | + $hdf5_prefix = "$env:RUNNER_TEMP/hdf5-${{ env.HDF5_VERSION }}-install" + $zlib_prefix = "$env:RUNNER_TEMP/zlib-install" cmake -B build ` -DCMAKE_BUILD_TYPE=Release ` + -DHDF5_ROOT="$hdf5_prefix" ` + -DCMAKE_PREFIX_PATH="$hdf5_prefix;$zlib_prefix" ` -DH5CPP_BUILD_EXAMPLES=OFF ` -DH5CPP_BUILD_TESTS=OFF ` -DH5CPP_BUILD_BENCH=OFF ` diff --git a/CMakeLists.txt b/CMakeLists.txt index e4dc8f70be..f25c2ff48e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,24 @@ if(APPLE) set(CMAKE_OSX_DEPLOYMENT_TARGET "14.4" CACHE STRING "Minimum macOS deployment version") endif() -project(libh5cpp-dev VERSION 1.10.4.6 LANGUAGES CXX C) +# Derive package version from the latest git tag before project(), so CPack +# embeds it as PROJECT_VERSION rather than a stale hardcoded number. Falls +# back to a placeholder when building from a tarball without git history. +find_package(Git QUIET) +set(H5CPP_PROJECT_VERSION "1.12.0") +if(GIT_FOUND AND EXISTS "${CMAKE_SOURCE_DIR}/.git") + execute_process( + COMMAND ${GIT_EXECUTABLE} describe --tags --abbrev=0 --match "v[0-9]*" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + OUTPUT_VARIABLE H5CPP_GIT_TAG + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET) + if(H5CPP_GIT_TAG MATCHES "^v([0-9]+(\\.[0-9]+)*)$") + set(H5CPP_PROJECT_VERSION "${CMAKE_MATCH_1}") + endif() +endif() + +project(libh5cpp-dev VERSION ${H5CPP_PROJECT_VERSION} LANGUAGES CXX C) # ─── Standard Settings ──────────────────────────────────────────────────────────── set(CMAKE_CXX_STANDARD 20) @@ -90,9 +107,22 @@ find_package(Threads REQUIRED QUIET) # ─── HDF5 ───────────────────────────────────────────────────────────────────────── find_package(HDF5 REQUIRED COMPONENTS C) -if(HDF5_VERSION VERSION_LESS ${H5CPP_BASE_VERSION}) +# HDF5 floor is decoupled from h5cpp package version. Previously the check +# compared HDF5_VERSION against PROJECT_VERSION, which coupled package +# versioning to HDF5 minimums by coincidence. After #247 made +# PROJECT_VERSION track the git tag, the comparison stopped being meaningful. +# Pin the floor explicitly. +# +# 1.10.4 matches the prior implicit floor (the stale project(VERSION 1.10.4.6) +# line that #247 removed). The CI matrix runs Ubuntu 22.04 with system +# HDF5 1.10.7 (floor coverage restored by #235); raising the floor above +# 1.10.4 would break that matrix entry. When dropping 1.10.x coverage in +# a future cohort, bump this constant deliberately and remove the 22.04 +# matrix entry in the same commit. +set(H5CPP_HDF5_FLOOR "1.10.4") +if(HDF5_VERSION VERSION_LESS ${H5CPP_HDF5_FLOOR}) message(FATAL_ERROR - "-- !!! H5CPP examples require HDF5 v${H5CPP_BASE_VERSION} or greater !!!" + "-- !!! H5CPP requires HDF5 v${H5CPP_HDF5_FLOOR} or greater (found ${HDF5_VERSION}) !!!" ) else() message(STATUS @@ -382,6 +412,14 @@ set(CPACK_NSIS_ENABLE_UNINSTALL_BEFORE_INSTALL ON) # productbuild (macOS .pkg) set(CPACK_PRODUCTBUILD_IDENTIFIER "org.h5cpp.h5cpp") +# Override the default per-generator filename so that amd64 / arm64 / x86_64 / +# aarch64 / darwin-arm64 / windows-amd64 produce distinct names when collected +# into a single release upload directory. Without this, multi-arch CI matrix +# builds emit identical filenames and overwrite each other on the Release page. +set(CPACK_PACKAGE_FILE_NAME + "${CPACK_PACKAGE_NAME}-v${PROJECT_VERSION}-${CMAKE_SYSTEM_NAME}-${CMAKE_SYSTEM_PROCESSOR}") +string(TOLOWER "${CPACK_PACKAGE_FILE_NAME}" CPACK_PACKAGE_FILE_NAME) + include(CPack) # ─── Developer convenience targets ─────────────────────────────────────────── diff --git a/README.md b/README.md index 10089cf7a1..8dfd10bd6c 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,6 @@ [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.20123216.svg)](https://doi.org/10.5281/zenodo.20123216) [![GitHub release](https://img.shields.io/github/v/release/vargalabs/h5cpp.svg)](https://github.com/vargalabs/h5cpp/releases) [![Documentation](https://img.shields.io/badge/docs-stable-blue)](https://vargalabs.github.io/h5cpp) -[![Downloads](https://img.shields.io/github/downloads/vargalabs/h5cpp/total)](https://github.com/vargalabs/h5cpp/releases) # H5CPP — High-Performance [HDF5][hdf5] for Modern C++ @@ -92,10 +91,10 @@ cmake --install build - **`std::float16_t`** (C++23 IEEE 754 half-precision) - **Rank-7** array support - **Expanded attribute** type coverage -- **Threaded I/O pipeline** for filter chains +- **FAPL-scoped worker pool** — `h5::create(..., h5::threads{N} | h5::backpressure{M})` opts the file into parallel filter compression; all chunked datasets opened on that file (and pt_t built from them) inherit the pool with async-pipelined dispatch - **HDF5 1.12.2 ceiling** — tested and verified; `H5Dvlen_reclaim` / reference API compatibility - **Windows MSVC** in the CI matrix -- **ASan + UBSan** clean on Clang 20 +- **ASan + UBSan + TSan** clean on Clang 20 ## Documentation diff --git a/docs/filtering-pipeline-rework-report.md b/docs/filtering-pipeline-rework-report.md index 75d4f8d080..ab8b3fae68 100644 --- a/docs/filtering-pipeline-rework-report.md +++ b/docs/filtering-pipeline-rework-report.md @@ -30,7 +30,7 @@ The current implementation is an experimental skeleton rather than a production | Multi-filter read | Throws for more than one filter | Reverse-order decode through the complete filter plan | | Buffer sizing | Uses chunk-sized scratch buffers | Encoded buffers must allow compression expansion | | Filter mask | Partial handling | Preserve HDF5 chunk filter-mask semantics | -| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling | +| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling (delivered in #250 as FAPL-scoped `pool_pipeline_t`) | | Portability | Linux path is the only recently verified path | Linux, macOS, and Windows allocation/build behavior | Focused baseline probes confirmed two important failures: @@ -175,3 +175,22 @@ Threading should initially use C++17 standard library primitives. Avoid platform Start with correctness, not SIMD. The highest-value first milestone is a serial `filter_plan` that can round-trip standard HDF5 filters and reject unsupported filters explicitly. Once that foundation is correct, SIMD and multithreading become execution-policy improvements rather than a risky rewrite. The strategic direction is to make H5CPP's filtering chain a modern CPU execution engine while preserving HDF5-compatible metadata and file interoperability. + +## Status — Phase I (#250, FAPL worker pool) + +Phase I of the threading workplan is delivered on PR #251. The design and trade-offs are summarised in `tasks/h5cpp-fapl-multithreading-workplan.md`; the user-visible surface is one line in the file's FAPL: + +```cpp +h5::fd_t fd = h5::create( + "data.h5", H5F_ACC_TRUNC, + h5::default_fcpl, + h5::threads{N} | h5::backpressure{M}); // M default = 8 × N +``` + +When `h5::threads{N}` is installed, the FAPL allocates a `worker_pool_t` and parks a `shared_ptr<>` to it inside an `H5Pinsert2` slot. Every dataset created/opened on that file inherits the pool via `H5Fget_access_plist`. When a dataset's DAPL has `h5::high_throughput`, `h5::write` and `h5::read` construct a local `pool_pipeline_t` that submits per-chunk compression closures to the pool and drains in submission order; `H5Dwrite_chunk` still runs on the calling thread. `pt_t` resolves the same pool in `init()` and uses `pool_pipeline_t` as a variant alternative. + +Back-pressure is bounded by `h5::backpressure{M}`: the producer blocks on the front future once the in-flight deque hits `M`. Default is 8 × worker count. + +The legacy per-pt_t `h5::filter::threads{N}` constructor from #241 is removed in this cycle (see [Phase 1.4 commit message]). Two parallel threading paths in the pipeline invite contention bugs and confuse the surface; the FAPL pool fully subsumes it. + +Phase II (compile-time C-API blocking on `async_fd_t`, full async mode) is tracked separately. diff --git a/h5cpp/H5Dappend.hpp b/h5cpp/H5Dappend.hpp index 82e78fdb47..a1e4b7fb08 100644 --- a/h5cpp/H5Dappend.hpp +++ b/h5cpp/H5Dappend.hpp @@ -7,10 +7,13 @@ #include "H5capi.hpp" #include "H5Tmeta.hpp" #include "H5cout.hpp" -#include "H5Zpipeline_threaded.hpp" #include #include #include +#include +#include +#include +#include #include #include #include @@ -22,17 +25,22 @@ namespace h5 { std::ostream& operator<<(std::ostream& os, const h5::pt_t& pt); namespace h5::impl { - // pt_t::pipeline selects between the synchronous basic pipeline (default, - // bytewise-identical to pre-241 behavior) and the parallel threaded pipeline. - // Both alternatives are indirect-owned through unique_ptr so that the - // variant remains move-assignable regardless of the underlying pipeline's - // move semantics (threaded_pipeline_t deletes moves because it owns - // std::jthread workers and atomics; basic_pipeline_t inherits a manually- - // written move-assign from pipeline_t that suppresses the - // implicit move-ctor needed by variant assignment). + // pt_t::pipeline selects between two pipeline implementations: + // + // basic_pipeline_t — synchronous filter chain on the calling + // thread; default when the file's FAPL has + // no h5::threads{N} pool installed. + // pool_pipeline_t — FAPL-scoped shared worker pool, async- + // pipelined dispatch with back-pressure. + // Selected when init() resolves a pool + // from the file's FAPL. + // + // Both are indirect-owned through unique_ptr so the variant + // remains move-assignable regardless of the underlying pipeline's + // move semantics. using pt_pipeline_t = std::variant< std::unique_ptr, - std::unique_ptr + std::unique_ptr >; } @@ -40,10 +48,9 @@ namespace h5::impl { namespace h5 { struct pt_t { pt_t(); - pt_t( const h5::ds_t& handle ); // conversion ctor — synchronous pipeline - pt_t( const h5::ds_t& handle, h5::filter::threads workers ); // threaded pipeline - // deep copy with own cache memory — always uses the synchronous pipeline, - // since the threaded pipeline owns workers that cannot be duplicated. + pt_t( const h5::ds_t& handle ); // FAPL-aware ctor: pool when h5::threads{N} is set, basic otherwise + // deep copy with own cache memory — re-runs init(), so the copy + // resolves its own pipeline from the dataset's file FAPL. pt_t( const h5::pt_t& pt ) : h5::pt_t(pt.ds) { }; ~pt_t(); @@ -115,6 +122,11 @@ namespace h5 { chunk_dims[H5CPP_MAX_RANK], count[H5CPP_MAX_RANK]; size_t block_size,element_size,N,n,rank; void *ptr, *fill_value; + + // Phase 1.3.3 — chunk dispatch is uniform across all variant + // alternatives via visit_pipeline + write_chunk. pool_pipeline_t + // holds the pool reference, in-flight deque, and back-pressure + // logic internally; pt_t no longer needs per-instance pool fields. }; } @@ -128,7 +140,9 @@ inline h5::pt_t::pt_t() : count[i] = 1, offset[i] = 0; } -// conversion ctor — synchronous pipeline (default) +// FAPL-aware conversion ctor — init() resolves pool from the dataset's +// FAPL and swaps the variant to pool_pipeline_t when h5::threads{N} is +// installed. Otherwise the default basic_pipeline_t stays active. inline h5::pt_t::pt_t( const h5::ds_t& handle ) : pt_t() { /*default ctor has an invalid state -- skip initialization */ @@ -136,16 +150,6 @@ h5::pt_t::pt_t( const h5::ds_t& handle ) : pt_t() { init(handle); } -// conversion ctor — threaded pipeline with N compression workers -inline -h5::pt_t::pt_t( const h5::ds_t& handle, h5::filter::threads workers ) : pt_t() { - if( !is_valid(handle) ) return; - auto threaded = std::make_unique(); - threaded->set_worker_count(workers.n); - pipeline.emplace>(std::move(threaded)); - init(handle); -} - inline h5::pt_t::~pt_t(){ /*default ctor has an invalid state -- skip flushing cache */ @@ -170,6 +174,21 @@ void h5::pt_t::init( const h5::ds_t& handle ){ H5Pset_chunk_cache(dapl, 0, 0, H5D_CHUNK_CACHE_W0_DEFAULT); ds = h5::ds_t{H5Dopen2(fid, dname.data(), dapl)}; H5Pclose(dapl); + + // Phase 1.3.3 — resolve the file's FAPL pool while we still hold + // a live fid. When the FAPL has h5::threads{N} installed, swap + // the variant from basic_pipeline_t (default) to pool_pipeline_t + // constructed with the pool + back-pressure cap. When no pool + // is present, the default basic_pipeline_t stays — synchronous + // behavior, identical to pre-Phase-I. + hid_t fapl = H5Fget_access_plist(fid); + if (auto pool = impl::resolve_worker_pool(fapl)) { + const unsigned cap = impl::resolve_backpressure(fapl, pool->worker_count()); + pipeline.emplace>( + std::make_unique(std::move(pool), cap)); + } + H5Pclose(fapl); + H5Fclose(fid); dt = h5::dt_t{H5Dget_type(static_cast(ds))}; h5::sp_t file_space = h5::get_space( handle ); @@ -194,6 +213,7 @@ void h5::pt_t::init( const h5::ds_t& handle ){ throw h5::error::io::packet_table::misc( H5CPP_ERROR_MSG("CTOR: unable to create handle from dataset...")); } } + template inline std::enable_if_t< h5::meta::is_scalar::value, void> h5::pt_t::append( const T* ptr ) try { //PTR: write directly chunk size from provided buffer/ptr @@ -308,28 +328,37 @@ void> h5::pt_t::append( const T& ref ) try { inline void h5::pt_t::flush(){ - if( n == 0 ) return; - *offset = *current_dims; - *current_dims += *chunk_dims; - h5::set_extent(ds, current_dims); - - if( H5Tis_variable_str(this->dt)) { - hsize_t block = 1, count = n; - h5::sp_t mem_space{H5Screate_simple(static_cast(rank), &count, nullptr )}; - h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )}; - h5::select_all( mem_space ); - H5Sselect_hyperslab( static_cast(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count); - - H5Dwrite( static_cast( ds ), - dt, mem_space, file_space, static_cast(dxpl), ptr); - } else { - // the remainder of last chunk must be set to fill_value; arbitrary type size supported - for(hsize_t i=0; i<(N-n); i++) - for(size_t j=0; j < element_size; j++) - static_cast( ptr )[(n + i) * element_size + j] = static_cast( fill_value )[ j ]; - visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); }); + if( n != 0 ) { + *offset = *current_dims; + *current_dims += *chunk_dims; + h5::set_extent(ds, current_dims); + + if( H5Tis_variable_str(this->dt)) { + hsize_t block = 1, count = n; + h5::sp_t mem_space{H5Screate_simple(static_cast(rank), &count, nullptr )}; + h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )}; + h5::select_all( mem_space ); + H5Sselect_hyperslab( static_cast(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count); + + H5Dwrite( static_cast( ds ), + dt, mem_space, file_space, static_cast(dxpl), ptr); + } else { + // the remainder of last chunk must be set to fill_value; arbitrary type size supported + for(hsize_t i=0; i<(N-n); i++) + for(size_t j=0; j < element_size; j++) + static_cast( ptr )[(n + i) * element_size + j] = static_cast( fill_value )[ j ]; + visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); }); + } + n = 0; } - n = 0; + // Pool path: drain in-flight chunks so flush() honors the "data + // on disk after this returns" contract. basic_pipeline_t writes + // inline; the visit is a no-op for that alternative. + std::visit([](auto& p) { + using T = std::decay_t; + if constexpr (std::is_same_v) + p->drain(); + }, pipeline); } inline void h5::pt_t::reset() { diff --git a/h5cpp/H5Dread.hpp b/h5cpp/H5Dread.hpp index 9fcca983ba..d80fdfff8f 100644 --- a/h5cpp/H5Dread.hpp +++ b/h5cpp/H5Dread.hpp @@ -74,9 +74,32 @@ namespace h5 { return layout == H5D_CHUNKED; }(); if( use_pipeline ){ - h5::impl::pipeline_t* filters; - H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); - filters->read(ds, offset, stride, block, count, dxpl, ptr); + // Phase 1.3.3 — if the file's FAPL has h5::threads{N}, route + // reads through a local pool_pipeline_t. Currently pool_pipeline_t:: + // read_chunk_impl is synchronous (parallel decompress is Phase 1.5+), + // so the FAPL-pool branch is semantically equivalent to the DAPL + // path today; the structure is in place for the read-ahead + // optimization to land later without changing call sites. + hid_t fid = H5Iget_file_id(static_cast(ds)); + hid_t fapl = H5Fget_access_plist(fid); + auto pool = h5::impl::resolve_worker_pool(fapl); + if (pool) { + const unsigned cap = h5::impl::resolve_backpressure( + fapl, pool->worker_count()); + h5::impl::pool_pipeline_t pipe(std::move(pool), cap); + h5::dcpl_t dcpl{H5Dget_create_plist(static_cast(ds))}; + hid_t type_id = H5Dget_type(static_cast(ds)); + size_t elem_sz = H5Tget_size(type_id); + H5Tclose(type_id); + pipe.set_cache(dcpl, elem_sz); + pipe.read(ds, offset, stride, block, count, dxpl, ptr); + } else { + h5::impl::pipeline_t* filters; + H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); + filters->read(ds, offset, stride, block, count, dxpl, ptr); + } + H5Pclose(fapl); + H5Fclose(fid); }else{ h5::sp_t mem_space = h5::create_simple( size ); h5::select_all( mem_space ); diff --git a/h5cpp/H5Dwrite.hpp b/h5cpp/H5Dwrite.hpp index 1f760ac424..9fe9e97763 100644 --- a/h5cpp/H5Dwrite.hpp +++ b/h5cpp/H5Dwrite.hpp @@ -104,10 +104,33 @@ namespace h5 { const h5::block_t& block = arg::get( h5::default_block, args...); const h5::offset_t& offset = arg::get( h5::default_offset, args...); const h5::stride_t& stride = arg::get( h5::default_stride, args...); - - h5::impl::pipeline_t* filters; - H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); - filters->write(ds, offset, stride, block, count, dxpl, ptr); + + // Phase 1.3.3 — if the file's FAPL has h5::threads{N}, route + // compress work through the shared pool via a local + // pool_pipeline_t. Otherwise use the existing DAPL-stored + // basic_pipeline_t pointer for synchronous filter chain. + hid_t fid = H5Iget_file_id(static_cast(ds)); + hid_t fapl = H5Fget_access_plist(fid); + auto pool = h5::impl::resolve_worker_pool(fapl); + if (pool) { + const unsigned cap = h5::impl::resolve_backpressure( + fapl, pool->worker_count()); + h5::impl::pool_pipeline_t pipe(std::move(pool), cap); + // set_cache populates the filter chain from the dataset's DCPL. + h5::dcpl_t dcpl{H5Dget_create_plist(static_cast(ds))}; + hid_t type_id = H5Dget_type(static_cast(ds)); + size_t elem_sz = H5Tget_size(type_id); + H5Tclose(type_id); + pipe.set_cache(dcpl, elem_sz); + pipe.write(ds, offset, stride, block, count, dxpl, ptr); + // pipe destructor drains in_flight before pool refcount drop. + } else { + h5::impl::pipeline_t* filters; + H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); + filters->write(ds, offset, stride, block, count, dxpl, ptr); + } + H5Pclose(fapl); + H5Fclose(fid); } else { h5::sp_t mem_space = h5::create_simple( n_elements ); h5::select_all( mem_space ); diff --git a/h5cpp/H5Pthreads.hpp b/h5cpp/H5Pthreads.hpp new file mode 100644 index 0000000000..f1bfec2995 --- /dev/null +++ b/h5cpp/H5Pthreads.hpp @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// FAPL-scoped worker pool — opt-in via h5::threads{N}. +// +// User-facing API: +// +// h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8}); +// h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{}); // hw_concurrency +// +// Per-dataset opt-in is the orthogonal h5::high_throughput DAPL flag +// (existing). This FAPL property controls "is there a pool, how many +// workers"; whether any given dataset uses it is the DAPL's call. +// +// Storage mechanism mirrors h5::high_throughput (H5Pdapl.hpp): +// +// - H5Pinsert2 stores a pointer to a worker_pool_slot_t in the FAPL +// skip list. +// - The slot owns a std::shared_ptr. +// - HDF5 internally copies the FAPL during H5Fopen/H5Fcreate; the +// copy callback allocates a fresh slot aliasing the same pool +// (shared_ptr refcount++). Every FAPL copy shares the pool. +// - The close callback drops the slot. Pool is destroyed when the +// last live FAPL copy releases its slot, at which point worker +// std::jthreads receive request_stop() and join cleanly. +// +// This is the shared-ownership variant of the H5Pinsert2 + slot pattern. +// Compare with H5Pdapl.hpp's fresh-allocation-per-copy semantics used by +// the high_throughput pipeline property: that pattern allocates a fresh +// pipeline scratch buffer per copy because pipelines are per-write +// scratch state; this pattern shares one live resource across all +// copies because workers ARE the resource we want shared. See +// tasks/h5cpp-fapl-multithreading-workplan.md §2-§3. +// +// PHASE 1.1 STATUS: lifecycle scaffolding only. worker_pool_t owns N +// std::jthreads whose only job today is to honor std::stop_token on +// shutdown. Phase 1.2 extends with bounded MPMC queues, compress_sync / +// compress_async API, and integration with the filter pipeline. Phase +// 1.3 wires pt_t / h5::write / h5::read consumer sites. + +#include "H5Pall.hpp" +#include "detail/doorbell.hpp" +#include "detail/stoppable_thread.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define H5CPP_FAPL_WORKER_POOL "h5cpp_fapl_worker_pool" +#define H5CPP_FAPL_BACKPRESSURE "h5cpp_fapl_backpressure" + +// Default in-flight chunk cap when h5::threads{N} is set without an +// accompanying h5::backpressure{N}. Resolves to 8 × worker_count, which +// is loose enough to keep all workers fed during steady-state streaming +// and tight enough to bound memory growth (chunk_size × cap bytes per pt_t). +// Override at compile time via -DH5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR=K. +#ifndef H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR +#define H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR 8u +#endif + +namespace h5::impl { + +// ─── Pool ──────────────────────────────────────────────────────────────────── +// +// Generic compute pool: workers pull type-erased tasks off a single MPMC-ish +// queue (std::mutex + std::queue + doorbell signal) and execute them. The +// submit(callable) API returns a std::future of the callable's result type, +// using std::packaged_task wrapped in a std::function for storage. +// +// The pool is deliberately HDF5-agnostic at this layer. Consumer sites +// (Phase 1.3 — pt_t, h5::write, h5::read) wrap their HDF5-specific compress +// logic in a closure and submit() it. This keeps the pool reusable for +// Phase II's executor and any future async work. +struct worker_pool_t { + // Pool size is fixed at construction; cannot resize at runtime. + // n == 0 means "use std::thread::hardware_concurrency()". + explicit worker_pool_t(unsigned n) { + const unsigned count = n ? n : std::max(1u, std::thread::hardware_concurrency()); + workers_.reserve(count); + for (unsigned i = 0; i < count; ++i) + workers_.emplace_back([this](h5::detail::stop_token_t st) { worker_loop(st); }); + } + + // Destruction sequence: + // 1. wait_idle() — let all submitted tasks complete (caller can also + // have called this earlier). + // 2. Set stopping_ flag and ring the doorbell so all waiters wake. + // 3. stoppable_thread_t destructors automatically request stop + join. + ~worker_pool_t() { + wait_idle(); + stopping_.store(true, std::memory_order_release); + bell_.ring_all(); + } + + worker_pool_t(const worker_pool_t&) = delete; + worker_pool_t& operator=(const worker_pool_t&) = delete; + worker_pool_t(worker_pool_t&&) = delete; + worker_pool_t& operator=(worker_pool_t&&) = delete; + + // Introspection for tests and scheduling decisions. + [[nodiscard]] unsigned worker_count() const noexcept { + return static_cast(workers_.size()); + } + + // Submit a callable for execution by any worker. Returns a future of + // the callable's result type. Callable must be invocable with no + // arguments; capture state via closure if needed. + // + // Internally uses std::packaged_task to bridge the move-only callable + // into a copyable std::function (wrapped in shared_ptr) so it can sit + // in the queue. + template + auto submit(Fn&& fn) -> std::future> { + using R = std::invoke_result_t; + auto task = std::make_shared>(std::forward(fn)); + auto fut = task->get_future(); + in_flight_.fetch_add(1, std::memory_order_release); + { + std::lock_guard lk(m_); + tasks_.emplace([task] { (*task)(); }); + } + bell_.ring(); + return fut; + } + + // Block until all submitted tasks have completed. Intended for clean + // shutdown or as a synchronization barrier between submission phases. + void wait_idle() { + while (in_flight_.load(std::memory_order_acquire) > 0) + std::this_thread::yield(); + } + + // Coarse approximation of pending+running work. Useful for tests and + // monitoring; not a strict ordering guarantee. + [[nodiscard]] int in_flight() const noexcept { + return in_flight_.load(std::memory_order_acquire); + } + +private: + void worker_loop(h5::detail::stop_token_t st) { + while (!st.stop_requested()) { + std::function task; + std::uint32_t last_seq = 0; + bool got_task = false; + { + std::lock_guard lk(m_); + if (!tasks_.empty()) { + task = std::move(tasks_.front()); + tasks_.pop(); + got_task = true; + } else { + // Snapshot the doorbell sequence WHILE HOLDING the lock. + // submit() will ring the bell only AFTER it has released + // the same lock, so any subsequent ring advances past + // last_seq — bell_.wait(last_seq) below cannot miss it. + last_seq = bell_.load(); + } + } + + if (got_task) { + try { task(); } catch (...) { /* packaged_task captures it */ } + in_flight_.fetch_sub(1, std::memory_order_release); + continue; + } + + // Queue was empty — wait for a ring or for shutdown. + if (st.stop_requested() || + stopping_.load(std::memory_order_acquire)) return; + bell_.wait(last_seq); + } + } + + std::mutex m_; + std::queue> tasks_; + h5::detail::doorbell_t bell_; + std::atomic in_flight_{0}; + std::atomic stopping_{false}; + std::vector workers_; +}; + +// ─── FAPL slot + lifecycle callbacks ───────────────────────────────────────── + +// The heap-allocated holder whose pointer lives in the FAPL skip-list +// value slot. Indirection is necessary because H5Pinsert2 stores raw +// bytes — it cannot run shared_ptr's constructor/destructor for us. +struct worker_pool_slot_t { + std::shared_ptr pool; +}; + +// Copy callback: HDF5 cloned the property bytes (the slot pointer was +// memcpy'd into the destination's value slot). Allocate a NEW slot whose +// shared_ptr aliases the same pool — refcount++ via shared_ptr copy. +// +// This is the contract that makes "every FAPL copy shares one pool" work. +inline herr_t fapl_pool_copy_cb(const char* /*name*/, size_t /*size*/, void* value) { + auto** slot_loc = static_cast(value); + *slot_loc = new worker_pool_slot_t{(*slot_loc)->pool}; + return 0; +} + +// Close callback: drop one slot. shared_ptr inside the slot releases its +// reference; worker_pool_t destructor runs when the last slot is freed +// (refcount reaches 0), which stops and joins the jthreads. +inline herr_t fapl_pool_close_cb(const char* /*name*/, size_t /*size*/, void* ptr) { + delete *static_cast(ptr); + return 0; +} + +// Setter invoked when the user applies h5::threads{N} to an FAPL. +// Idempotent — if a pool property is already installed, leaves it untouched. +// +// n == 0 maps to std::thread::hardware_concurrency() inside worker_pool_t. +inline herr_t fapl_threads_set(::hid_t fapl, unsigned n) { + if (H5Pexist(fapl, H5CPP_FAPL_WORKER_POOL)) return 0; + auto* slot = new worker_pool_slot_t{ + std::make_shared(n) + }; + return H5Pinsert2(fapl, H5CPP_FAPL_WORKER_POOL, + sizeof(worker_pool_slot_t*), &slot, + nullptr, // set + nullptr, // get + nullptr, // prp_del + fapl_pool_copy_cb, + nullptr, // compare + fapl_pool_close_cb); +} + +// Consumer-site helper: given a FAPL id, retrieve the worker pool shared_ptr +// if one is installed. Returns nullptr (= no pool, fall back to synchronous +// pipeline) if the property is absent. Used by pt_t, h5::write, h5::read +// in Phase 1.3. +inline std::shared_ptr resolve_worker_pool(::hid_t fapl_id) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return nullptr; + if (!H5Pexist(fapl_id, H5CPP_FAPL_WORKER_POOL)) return nullptr; + worker_pool_slot_t* slot = nullptr; + H5Pget(fapl_id, H5CPP_FAPL_WORKER_POOL, &slot); + return slot ? slot->pool : nullptr; +} + +// ─── Back-pressure cap (separate FAPL property) ────────────────────────────── + +// Setter invoked when the user applies h5::backpressure{N} to an FAPL. +// Stores a plain unsigned by memcpy semantics — no lifecycle callbacks +// needed since the value is trivially copyable and owns no heap. +// +// The cap is consumed by pt_t (and Phase 1.3's h5::write/read) when +// queueing work to the pool: write_chunk blocks on drain_completed +// once the in-flight deque reaches the cap. +inline herr_t fapl_backpressure_set(::hid_t fapl, unsigned cap) { + if (H5Pexist(fapl, H5CPP_FAPL_BACKPRESSURE)) return 0; + return H5Pinsert2(fapl, H5CPP_FAPL_BACKPRESSURE, + sizeof(unsigned), &cap, + nullptr, nullptr, nullptr, + nullptr, // copy: memcpy is correct for POD + nullptr, + nullptr); // close: nothing to release +} + +// Consumer-site helper: returns the user-set back-pressure cap, or computes +// the default (H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR × worker_count) when +// no h5::backpressure{N} was applied. Returns 0 only when no pool is +// installed either — caller should already have bailed in that case. +inline unsigned resolve_backpressure(::hid_t fapl_id, + unsigned worker_count) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return 0; + if (H5Pexist(fapl_id, H5CPP_FAPL_BACKPRESSURE)) { + unsigned cap = 0; + H5Pget(fapl_id, H5CPP_FAPL_BACKPRESSURE, &cap); + if (cap > 0) return cap; + } + return H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR * worker_count; +} + +} // namespace h5::impl + +namespace h5 { +// User-facing tags. Applied to a fapl_t via the property-chain mechanism. +// +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8}) +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{}) // hw_concurrency +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8} +// | h5::backpressure{32}) // 8 workers, 32-chunk cap +// +// h5::backpressure{N} without h5::threads{N} is silently a no-op: +// without a pool, there is no queue to bound. Document at user-facing +// level; do not warn at runtime. +using threads = impl::fapl_call, + impl::fapl_threads_set>; +using backpressure = impl::fapl_call, + impl::fapl_backpressure_set>; +} diff --git a/h5cpp/H5Tmeta.hpp b/h5cpp/H5Tmeta.hpp index 8e4b16146c..866d43a116 100644 --- a/h5cpp/H5Tmeta.hpp +++ b/h5cpp/H5Tmeta.hpp @@ -283,23 +283,6 @@ namespace h5::meta { template struct storage_representation : detail_capabilities::storage_representation_impl> {}; template constexpr storage_representation_t storage_representation_v = storage_representation::value; - inline constexpr std::uint32_t metadata_version = 1; - - /** Base class for compiler-emitted reflected field descriptors. */ - template - struct field_descriptor_t { - using owner_type = owner_t; - using field_type = field_t; - }; - - /** Specialize to std::true_type for any struct described by compiler_meta_t. */ - template - struct is_reflected_compound_t : std::false_type {}; - - /** Specialize to provide the field-descriptor tuple for a reflected compound. */ - template - struct compiler_meta_t; - template struct storage_traits_impl_t; template @@ -395,33 +378,6 @@ namespace h5::meta { } }; - template - struct storage_traits_impl_t::value>> { - static_assert(compiler_meta_t::version == metadata_version, - "H5CPP compiler metadata version mismatch"); - static constexpr bool supported = true; - static constexpr bool owns_handle = true; - static hid_t create_type() noexcept { - using fields_t = typename compiler_meta_t::fields_t; - hid_t dt = H5Tcreate(H5T_COMPOUND, sizeof(T)); - insert_fields(dt, - std::make_index_sequence>{}); - return dt; - } - private: - template - static void insert_fields(hid_t dt, std::index_sequence) noexcept { - (insert_field>(dt), ...); - } - template - static void insert_field(hid_t dt) noexcept { - using field_t = typename FieldDesc::field_type; - hid_t field_dt = storage_traits_t::create_type(); - H5Tinsert(dt, FieldDesc::name(), FieldDesc::offset, field_dt); - if constexpr (storage_traits_t::owns_handle) H5Tclose(field_dt); - } - }; - template struct is_transport_contiguous_impl_t : std::false_type {}; template struct is_transport_contiguous_impl_t>> : std::true_type {}; template struct is_transport_contiguous_impl_t::value>> : std::true_type {}; @@ -429,30 +385,15 @@ namespace h5::meta { is_array_like::value && !is_text_like::value>> : is_transport_contiguous_t::type> {}; - template struct is_transport_contiguous_impl_t::value>> { - private: - static_assert(compiler_meta_t::version == metadata_version, - "H5CPP compiler metadata version mismatch"); - using fields_t = typename compiler_meta_t::fields_t; - template - static constexpr bool check_contiguous(std::index_sequence) noexcept { - return (... && is_transport_contiguous_v< - typename std::tuple_element_t::field_type>); - } - public: - static constexpr bool value = check_contiguous(std::make_index_sequence>{}); - }; - // Gap 1: contiguous STL sequence containers (vector, span, linalg types, etc.) // Triggers when T exposes a data() pointer and size(), but is not a C/std::array, - // not text, not arithmetic, and not a reflected compound. + // not text, and not arithmetic. // The element type inferred from data() must be standard-layout and trivial // (prevents nested containers like vector> or vector from matching). template struct is_transport_contiguous_impl_t::value && !is_text_like::value && - !is_reflected_compound_t::value && !std::is_arithmetic_v && has_data_pointer::value && meta::has_size::value && @@ -626,19 +567,6 @@ namespace h5::meta { static constexpr std::size_t bytes(const T&) noexcept { return sizeof(T); } }; - // Reflected compound structs - template - struct access_traits_t::value>> { - using element_t = T; - using pointer_t = const T*; - static constexpr access_t kind = access_t::object; - static constexpr bool is_trivially_packable = is_transport_contiguous_v; - static const T* data(const T& v) noexcept { return &v; } - static T* data(T& v) noexcept { return &v; } - static constexpr std::array size(const T&) noexcept { return {}; } - static constexpr std::size_t bytes(const T&) noexcept { return sizeof(T); } - }; - // Text-like types (std::string, std::string_view, etc.) — handled by HDF5 string types, not raw memcpy template struct access_traits_t>::value && !std::is_array_v && - !is_reflected_compound_t::value && has_data_pointer::value && meta::has_size::value && is_transport_contiguous_v>> { @@ -693,7 +620,6 @@ namespace h5::meta { struct access_traits_t>::value && !std::is_array_v && - !is_reflected_compound_t::value && has_data_pointer::value && meta::has_size::value && compat::is_detected>::value && diff --git a/h5cpp/H5Zpipeline.hpp b/h5cpp/H5Zpipeline.hpp index 5e6e273e84..572b8ddb87 100644 --- a/h5cpp/H5Zpipeline.hpp +++ b/h5cpp/H5Zpipeline.hpp @@ -134,7 +134,6 @@ namespace h5{ namespace impl { void write_chunk_impl( const hsize_t* offset, size_t nbytes, const void* ptr ); void read_chunk_impl( const hsize_t* offset, size_t nbytes, void* ptr ); }; - // threaded_pipeline_t is defined in H5Zpipeline_threaded.hpp struct romio_pipeline_t : public pipeline_t{ void write_chunk_impl( const hsize_t* offset, size_t nbytes, const void* ptr ){ (void)offset; (void)nbytes; (void)ptr; diff --git a/h5cpp/H5Zpipeline_pool.hpp b/h5cpp/H5Zpipeline_pool.hpp new file mode 100644 index 0000000000..37fa12b296 --- /dev/null +++ b/h5cpp/H5Zpipeline_pool.hpp @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// FAPL-scoped parallel filter pipeline — second CRTP descendant of +// pipeline_t alongside basic_pipeline_t (synchronous). +// +// pool_pipeline_t owns a strong reference to an external worker_pool_t +// (resolved from the file's FAPL) and submits compress closures to it; +// H5Dwrite_chunk runs on the calling thread. +// +// The pipeline reuses pipeline_t<>::write/read (and via that +// split_to_chunk_write/read) for chunk decomposition, so consumer sites +// — pt_t, h5::write, h5::read — never see the pool dispatch directly. +// They just construct a pool_pipeline_t when their file's FAPL has one. + +#include "H5Zpipeline.hpp" +#include "H5Pthreads.hpp" + +#include +#include +#include +#include +#include + +namespace h5::impl { + +struct pool_pipeline_t : public pipeline_t { + pool_pipeline_t() = default; // variant requires default-constructible + // when this is the chosen alternative; pool_ stays null + // and dispatch fast-fails (no pool installed). + + pool_pipeline_t(std::shared_ptr pool, unsigned cap) + : pool_(std::move(pool)), cap_(cap) {} + + ~pool_pipeline_t() { + // Drain any remaining in-flight work in submission order before the + // pipeline (and its shared_ptr to the pool) goes away. Blocks the + // destructor on each front future. + while (!in_flight_.empty()) drain_in_flight(/*blocking=*/true); + } + + pool_pipeline_t(const pool_pipeline_t&) = delete; + pool_pipeline_t& operator=(const pool_pipeline_t&) = delete; + pool_pipeline_t(pool_pipeline_t&&) = delete; + pool_pipeline_t& operator=(pool_pipeline_t&&) = delete; + + // CRTP entry points called by pipeline_t<>::write / pipeline_t<>::read + // via split_to_chunk_write / split_to_chunk_read. + void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* src); + void read_chunk_impl (const hsize_t* offset, std::size_t nbytes, void* dst); + + // Public quiesce — block until every in-flight future has been + // drained. Called by pt_t::flush so users get the expected + // "data on disk after flush returns" semantics for the pool path. + void drain() { + while (!in_flight_.empty()) drain_in_flight(/*blocking=*/true); + } + +private: + // Worker-produced compressed chunk, ready for H5Dwrite_chunk on the + // calling thread. unique_ptr keeps it move-only and + // aligned to operator new[]'s default alignment. + struct result_t { + std::unique_ptr data; + std::size_t nbytes{0}; + std::uint32_t mask{0}; + std::array offset{}; + }; + + void drain_in_flight(bool blocking); + + std::shared_ptr pool_; + unsigned cap_{0}; + std::deque> in_flight_; +}; + +// ─── write path ────────────────────────────────────────────────────────────── + +inline void pool_pipeline_t::drain_in_flight(bool blocking) { + using namespace std::chrono_literals; + if (blocking) { + if (in_flight_.empty()) return; + auto r = in_flight_.front().get(); // blocks + H5Dwrite_chunk(static_cast<::hid_t>(this->ds), static_cast<::hid_t>(this->dxpl), + r.mask, r.offset.data(), r.nbytes, r.data.get()); + in_flight_.pop_front(); + return; + } + while (!in_flight_.empty() && + in_flight_.front().wait_for(0s) == std::future_status::ready) { + auto r = in_flight_.front().get(); + H5Dwrite_chunk(static_cast<::hid_t>(this->ds), static_cast<::hid_t>(this->dxpl), + r.mask, r.offset.data(), r.nbytes, r.data.get()); + in_flight_.pop_front(); + } +} + +inline void pool_pipeline_t::write_chunk_impl(const hsize_t* offset_in, + std::size_t nbytes, + const void* src) { + if (!pool_) { + // No pool was wired in (default-constructed alternative). Fall + // through to the synchronous filter chain identical to + // basic_pipeline_t::write_chunk_impl. This branch shouldn't be + // reached in normal use; it exists to keep the variant safe. + size_t length = nbytes; + void *in = chunk0, *out = chunk1, *tmp = chunk0; + std::uint32_t mask = 0; + switch (tail) { + case 0: + H5Dwrite_chunk(static_cast<::hid_t>(ds), static_cast<::hid_t>(dxpl), + 0, offset_in, nbytes, src); + return; + case 1: + length = filter[0](out, src, nbytes, flags[0], cd_size[0], cd_values[0]); + if (!length) mask = 1u; + [[fallthrough]]; + default: + for (hsize_t j = 1; j < tail; ++j) { + tmp = in; in = out; out = tmp; + length = filter[j](out, in, length, flags[j], cd_size[j], cd_values[j]); + if (!length) mask |= (1u << j); + } + H5Dwrite_chunk(static_cast<::hid_t>(ds), static_cast<::hid_t>(dxpl), + mask, offset_in, length, out); + } + return; + } + + // Snapshot filter chain into a POD captured by value in the closure. + struct filter_chain_t { + filter::call_t filter[H5CPP_MAX_FILTER]; + unsigned flags[H5CPP_MAX_FILTER]; + std::size_t cd_size[H5CPP_MAX_FILTER]; + unsigned cd_values[H5CPP_MAX_FILTER][H5CPP_MAX_FILTER_PARAM]; + hsize_t tail; + }; + filter_chain_t fc{}; + fc.tail = this->tail; + std::memcpy(fc.filter, this->filter, sizeof(fc.filter)); + std::memcpy(fc.flags, this->flags, sizeof(fc.flags)); + std::memcpy(fc.cd_size, this->cd_size, sizeof(fc.cd_size)); + std::memcpy(fc.cd_values, this->cd_values, sizeof(fc.cd_values)); + + // Worker-owned input buffer. + auto raw = std::make_unique(nbytes); + std::memcpy(raw.get(), src, nbytes); + + std::array off{}; + std::copy(offset_in, offset_in + this->rank, off.begin()); + + auto fut = pool_->submit( + [raw = std::move(raw), nbytes, off, fc]() mutable -> result_t { + result_t out; + out.offset = off; + + if (fc.tail == 0) { + out.data = std::move(raw); + out.nbytes = nbytes; + out.mask = 0; + return out; + } + + const std::size_t scratch = filter::filter_scratch_bound(nbytes); + auto wbuf0 = std::make_unique(scratch); + auto wbuf1 = std::make_unique(scratch); + + std::size_t length = nbytes; + std::uint32_t mask = 0; + + length = fc.filter[0](wbuf0.get(), raw.get(), length, + fc.flags[0], fc.cd_size[0], fc.cd_values[0]); + if (!length) mask |= 1u; + + void* in_buf = wbuf0.get(); + void* out_buf = wbuf1.get(); + for (hsize_t j = 1; j < fc.tail; ++j) { + length = fc.filter[j](out_buf, in_buf, length, + fc.flags[j], fc.cd_size[j], fc.cd_values[j]); + if (!length) mask |= (1u << j); + std::swap(in_buf, out_buf); + } + + out.data = std::make_unique(length); + std::memcpy(out.data.get(), in_buf, length); + out.nbytes = length; + out.mask = mask; + return out; + }); + + in_flight_.push_back(std::move(fut)); + + // Opportunistic drain. + drain_in_flight(/*blocking=*/false); + + // Bounded back-pressure: block on the front future when the deque + // reaches the cap. Producer memory ≤ cap × chunk_size per pipeline. + while (in_flight_.size() >= cap_) + drain_in_flight(/*blocking=*/true); +} + +// ─── read path (synchronous for v1) ────────────────────────────────────────── +// +// Phase 1.3.3 keeps read synchronous, identical to basic_pipeline_t::read_chunk_impl. +// Parallel decompression is a deliberate follow-up: it requires read-ahead +// (the read path consumes chunks in order, but the user's buffer slots are +// known up-front, so prefetching can fan out across the pool). Tracked in +// Phase 1.5+ work. + +inline void pool_pipeline_t::read_chunk_impl(const hsize_t* offset_in, + std::size_t nbytes, + void* /*dst*/) { + std::size_t length = nbytes; + std::uint32_t filter_mask; + + if (tail == 0) { +#if H5_VERSION_GE(2,0,0) + std::size_t buf_size = nbytes; + H5Dread_chunk2(ds, dxpl, offset_in, &filter_mask, chunk0, &buf_size); +#else + H5Dread_chunk(ds, dxpl, offset_in, &filter_mask, chunk0); +#endif + return; + } + + void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; +#if H5_VERSION_GE(2,0,0) + std::size_t buf_size = nbytes; + H5Dread_chunk2(ds, dxpl, offset_in, &filter_mask, read_target, &buf_size); +#else + H5Dread_chunk(ds, dxpl, offset_in, &filter_mask, read_target); +#endif + + void* src = read_target; + void* dst = (read_target == chunk0) ? static_cast(chunk1) + : static_cast(chunk0); + for (hsize_t j = tail; j > 0; --j) { + const hsize_t fi = j - 1; + length = filter[fi](dst, src, length, + flags[fi] | H5Z_FLAG_REVERSE, + cd_size[fi], cd_values[fi]); + std::swap(src, dst); + } +} + +} // namespace h5::impl diff --git a/h5cpp/H5Zpipeline_threaded.hpp b/h5cpp/H5Zpipeline_threaded.hpp deleted file mode 100644 index 82e94eec0e..0000000000 --- a/h5cpp/H5Zpipeline_threaded.hpp +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Copyright (c) 2018-2020 Steven Varga, Toronto,ON Canada - * Author: Varga, Steven - */ -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include "H5Zpipeline.hpp" -#if __cplusplus >= 202002L -# include "H5Qall.hpp" -#endif -#include "detail/doorbell.hpp" -#include "detail/stoppable_thread.hpp" - -// Workers handle compression only; H5Dwrite_chunk is always called from the -// main thread. Work items use raw ::hid_t integers (not h5cpp RAII wrappers) -// so no HDF5 API calls occur on worker threads, preserving compatibility with -// non-thread-safe HDF5 builds. - -namespace h5::filter { - // Opt-in tag selecting the threaded filter pipeline. - // h5::filter::threads{} -> std::thread::hardware_concurrency() workers - // h5::filter::threads{N} -> N compression workers - // Passed to pt_t constructors (and h5::write) to switch from the default - // synchronous basic_pipeline_t to the parallel threaded_pipeline_t. - struct threads { - unsigned n; - constexpr threads() noexcept : n(0) {} - constexpr explicit threads(unsigned count) noexcept : n(count) {} - }; -} - -namespace h5::impl { - -namespace detail { - - // Uncompressed chunk dispatched to a worker for filter application. - struct raw_work_t { - std::unique_ptr data; - std::array offset{}; - std::size_t nbytes{0}; - ::hid_t ds_id{H5I_UNINIT}; - ::hid_t dxpl_id{H5I_UNINIT}; - - raw_work_t() = default; - raw_work_t(raw_work_t&&) = default; - raw_work_t& operator=(raw_work_t&&) = default; - raw_work_t(const raw_work_t&) = delete; - raw_work_t& operator=(const raw_work_t&) = delete; - }; - - // Filtered chunk ready for H5Dwrite_chunk, returned to main thread. - struct done_work_t { - aligned_ptr data; // compressed/filtered bytes (aligned allocation) - std::array offset{}; - std::size_t nbytes{0}; // compressed size - std::uint32_t mask{0}; - ::hid_t ds_id{H5I_UNINIT}; - ::hid_t dxpl_id{H5I_UNINIT}; - - done_work_t() = default; - done_work_t(done_work_t&&) = default; - done_work_t& operator=(done_work_t&&) = default; - done_work_t(const done_work_t&) = delete; - done_work_t& operator=(const done_work_t&) = delete; - }; - -} // namespace detail - -#if __cplusplus >= 202002L -// ============================================================================ -// C++20 path: uses bounded lock-free queues from H5Qall.hpp and std::jthread. -// This block is byte-for-byte identical to the original implementation. -// ============================================================================ - -struct threaded_pipeline_t : public pipeline_t { - threaded_pipeline_t() = default; - - ~threaded_pipeline_t() { - // Drain all in-flight chunks (workers compress, main thread writes). - flush(); - // std::jthread dtors call request_stop() then join() automatically. - } - - threaded_pipeline_t(const threaded_pipeline_t&) = delete; - threaded_pipeline_t& operator=(const threaded_pipeline_t&) = delete; - threaded_pipeline_t(threaded_pipeline_t&&) = delete; - threaded_pipeline_t& operator=(threaded_pipeline_t&&) = delete; - - // Runtime override of worker pool size. If non-zero, takes precedence over - // the H5CPP_PIPELINE_WORKERS compile-time default. Must be called before - // the first write_chunk_impl, since workers spawn lazily via std::call_once. - void set_worker_count(unsigned n) noexcept { worker_count_override_ = n; } - - // Drains all in-flight work and calls H5Dwrite_chunk from the calling - // (main) thread. Must be called before reading back written data. - void flush() { - while (in_flight_.load(std::memory_order_acquire) > 0) { - drain_done(); - std::this_thread::yield(); - } - drain_done(); // final sweep after counter hits zero - } - - void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* ptr) { - ensure_workers(); - drain_done(); // opportunistic: write any already-compressed chunks - - detail::raw_work_t work; - work.data = std::make_unique(nbytes); - std::memcpy(work.data.get(), ptr, nbytes); - std::copy(offset, offset + this->rank, work.offset.data()); - work.nbytes = nbytes; - work.ds_id = static_cast<::hid_t>(this->ds); - work.dxpl_id = static_cast<::hid_t>(this->dxpl); - - in_flight_.fetch_add(1, std::memory_order_release); - while (!compress_queue_.push(std::move(work))) { - drain_done(); // relieve back-pressure - std::this_thread::yield(); - } - } - - void read_chunk_impl(const hsize_t* offset, std::size_t nbytes, void* /*ptr*/) { - uint32_t filter_mask; - std::size_t length = nbytes; - if (tail == 0) { -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, chunk0, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, chunk0); -#endif - return; - } - void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, read_target, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, read_target); -#endif - void* src = read_target; - void* dst = (read_target == chunk0) - ? static_cast(chunk1) : static_cast(chunk0); - for (hsize_t j = tail; j > 0; --j) { - const hsize_t fi = j - 1; - length = filter[fi](dst, src, length, - flags[fi] | H5Z_FLAG_REVERSE, cd_size[fi], cd_values[fi]); - void* tmp = src; src = dst; dst = tmp; - } - } - -private: - // Called from main thread only: pop finished compressed chunks and write. - void drain_done() { - detail::done_work_t result; - while (done_queue_.pop(result)) { - H5Dwrite_chunk(result.ds_id, result.dxpl_id, result.mask, - result.offset.data(), result.nbytes, result.data.get()); - in_flight_.fetch_sub(1, std::memory_order_release); - } - } - - void ensure_workers() { - std::call_once(init_flag_, [this] { - constexpr unsigned cfg = static_cast(H5CPP_PIPELINE_WORKERS); - const unsigned n = - (worker_count_override_ > 0) ? worker_count_override_ - : (cfg > 0) ? cfg - : std::max(1u, std::thread::hardware_concurrency()); - workers_.reserve(n); - for (unsigned i = 0; i < n; ++i) - workers_.emplace_back([this](std::stop_token st) { worker_loop(st); }); - }); - } - - void worker_loop(std::stop_token st) { - const std::size_t scratch = filter::filter_scratch_bound(block_size); - aligned_ptr wbuf0 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - aligned_ptr wbuf1 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - - detail::raw_work_t work; - while (compress_queue_.wait_pop(work, st)) { - detail::done_work_t result = compress(work, wbuf0, wbuf1); - while (!done_queue_.push(std::move(result))) - std::this_thread::yield(); - } - } - - detail::done_work_t compress(const detail::raw_work_t& work, - aligned_ptr& wbuf0, aligned_ptr& wbuf1) - { - detail::done_work_t out; - out.offset = work.offset; - out.ds_id = work.ds_id; - out.dxpl_id = work.dxpl_id; - - std::size_t length = work.nbytes; - - if (tail == 0) { - // No filters: copy raw bytes into an aligned buffer. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), work.data.get(), length); - out.nbytes = length; - out.mask = 0; - return out; - } - - // First filter: work.data → wbuf0. - length = filter[0](wbuf0.get(), work.data.get(), length, - flags[0], cd_size[0], cd_values[0]); - if (!length) out.mask |= 1u; - - // Subsequent filters: ping-pong between wbuf0 and wbuf1. - void* src = wbuf0.get(); - void* dst = wbuf1.get(); - for (hsize_t j = 1; j < tail; ++j) { - length = filter[j](dst, src, length, flags[j], cd_size[j], cd_values[j]); - if (!length) out.mask |= (1u << j); - std::swap(src, dst); - } - - // Copy filtered result into fresh aligned buffer so scratch is reusable. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), src, length); - out.nbytes = length; - return out; - } - - // compress_queue_: main thread pushes raw chunks; workers pop and compress. - bounded::spmc::queue_t compress_queue_; - // done_queue_: workers push compressed chunks; main thread pops and writes. - bounded::mpsc::queue_t done_queue_; - - std::atomic in_flight_{0}; - std::vector workers_; - std::once_flag init_flag_; - unsigned worker_count_override_{0}; -}; - -#else -// ============================================================================ -// C++17 fallback path: mutex-based queues + h5::detail::stoppable_thread_t. -// ============================================================================ - -namespace detail { - - // Simple mutex-protected FIFO queue for C++17, signalled via doorbell_t. - // Replaces bounded::spmc and bounded::mpsc for the C++17 build. - template - class c17_queue_t { - public: - c17_queue_t() = default; - c17_queue_t(const c17_queue_t&) = delete; - c17_queue_t& operator=(const c17_queue_t&) = delete; - - // Non-blocking push — always succeeds (unbounded). - // Returns true for API compatibility with bounded queues. - bool push(T&& v) { - { - std::lock_guard lk(m_); - q_.push(std::move(v)); - } - bell_.ring(); - return true; - } - - // Non-blocking pop. Returns true if an item was retrieved. - bool pop(T& out) { - std::lock_guard lk(m_); - if (q_.empty()) return false; - out = std::move(q_.front()); - q_.pop(); - return true; - } - - // Blocking pop: waits until an item is available or stop is requested. - // Returns true if an item was retrieved, false if stop was requested. - bool wait_pop(T& out, h5::detail::stop_token_t st) { - while (!st.stop_requested()) { - { - std::lock_guard lk(m_); - if (!q_.empty()) { - out = std::move(q_.front()); - q_.pop(); - return true; - } - } - const auto last = bell_.load(); - // Recheck under lock before waiting to avoid missed wakeup. - { - std::lock_guard lk(m_); - if (!q_.empty()) { - out = std::move(q_.front()); - q_.pop(); - return true; - } - } - if (!st.stop_requested()) - bell_.wait(last); - } - return false; - } - - // Wake all waiters (used on shutdown to unblock wait_pop). - void notify_all() { bell_.ring_all(); } - - private: - std::mutex m_; - std::queue q_; - h5::detail::doorbell_t bell_; - }; - -} // namespace detail - -struct threaded_pipeline_t : public pipeline_t { - threaded_pipeline_t() = default; - - ~threaded_pipeline_t() { - // Drain all in-flight chunks (workers compress, main thread writes). - flush(); - // Notify workers so they observe stop and wake from wait_pop. - compress_queue_.notify_all(); - // h5::detail::stoppable_thread_t dtors call request_stop() then join(). - } - - threaded_pipeline_t(const threaded_pipeline_t&) = delete; - threaded_pipeline_t& operator=(const threaded_pipeline_t&) = delete; - threaded_pipeline_t(threaded_pipeline_t&&) = delete; - threaded_pipeline_t& operator=(threaded_pipeline_t&&) = delete; - - // Runtime override of worker pool size. If non-zero, takes precedence over - // the H5CPP_PIPELINE_WORKERS compile-time default. Must be called before - // the first write_chunk_impl, since workers spawn lazily via std::call_once. - void set_worker_count(unsigned n) noexcept { worker_count_override_ = n; } - - // Drains all in-flight work and calls H5Dwrite_chunk from the calling - // (main) thread. Must be called before reading back written data. - void flush() { - while (in_flight_.load(std::memory_order_acquire) > 0) { - drain_done(); - std::this_thread::yield(); - } - drain_done(); // final sweep after counter hits zero - } - - void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* ptr) { - ensure_workers(); - drain_done(); // opportunistic: write any already-compressed chunks - - detail::raw_work_t work; - work.data = std::make_unique(nbytes); - std::memcpy(work.data.get(), ptr, nbytes); - std::copy(offset, offset + this->rank, work.offset.data()); - work.nbytes = nbytes; - work.ds_id = static_cast<::hid_t>(this->ds); - work.dxpl_id = static_cast<::hid_t>(this->dxpl); - - in_flight_.fetch_add(1, std::memory_order_release); - while (!compress_queue_.push(std::move(work))) { - drain_done(); // relieve back-pressure - std::this_thread::yield(); - } - } - - void read_chunk_impl(const hsize_t* offset, std::size_t nbytes, void* /*ptr*/) { - uint32_t filter_mask; - std::size_t length = nbytes; - if (tail == 0) { -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, chunk0, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, chunk0); -#endif - return; - } - void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, read_target, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, read_target); -#endif - void* src = read_target; - void* dst = (read_target == chunk0) - ? static_cast(chunk1) : static_cast(chunk0); - for (hsize_t j = tail; j > 0; --j) { - const hsize_t fi = j - 1; - length = filter[fi](dst, src, length, - flags[fi] | H5Z_FLAG_REVERSE, cd_size[fi], cd_values[fi]); - void* tmp = src; src = dst; dst = tmp; - } - } - -private: - // Called from main thread only: pop finished compressed chunks and write. - void drain_done() { - detail::done_work_t result; - while (done_queue_.pop(result)) { - H5Dwrite_chunk(result.ds_id, result.dxpl_id, result.mask, - result.offset.data(), result.nbytes, result.data.get()); - in_flight_.fetch_sub(1, std::memory_order_release); - } - } - - void ensure_workers() { - std::call_once(init_flag_, [this] { - constexpr unsigned cfg = static_cast(H5CPP_PIPELINE_WORKERS); - const unsigned n = - (worker_count_override_ > 0) ? worker_count_override_ - : (cfg > 0) ? cfg - : std::max(1u, std::thread::hardware_concurrency()); - workers_.reserve(n); - for (unsigned i = 0; i < n; ++i) - workers_.emplace_back( - [this](h5::detail::stop_token_t st) { worker_loop(st); }); - }); - } - - void worker_loop(h5::detail::stop_token_t st) { - const std::size_t scratch = filter::filter_scratch_bound(block_size); - aligned_ptr wbuf0 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - aligned_ptr wbuf1 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - - detail::raw_work_t work; - while (compress_queue_.wait_pop(work, st)) { - detail::done_work_t result = compress(work, wbuf0, wbuf1); - while (!done_queue_.push(std::move(result))) - std::this_thread::yield(); - } - } - - detail::done_work_t compress(const detail::raw_work_t& work, - aligned_ptr& wbuf0, aligned_ptr& wbuf1) - { - detail::done_work_t out; - out.offset = work.offset; - out.ds_id = work.ds_id; - out.dxpl_id = work.dxpl_id; - - std::size_t length = work.nbytes; - - if (tail == 0) { - // No filters: copy raw bytes into an aligned buffer. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), work.data.get(), length); - out.nbytes = length; - out.mask = 0; - return out; - } - - // First filter: work.data → wbuf0. - length = filter[0](wbuf0.get(), work.data.get(), length, - flags[0], cd_size[0], cd_values[0]); - if (!length) out.mask |= 1u; - - // Subsequent filters: ping-pong between wbuf0 and wbuf1. - void* src = wbuf0.get(); - void* dst = wbuf1.get(); - for (hsize_t j = 1; j < tail; ++j) { - length = filter[j](dst, src, length, flags[j], cd_size[j], cd_values[j]); - if (!length) out.mask |= (1u << j); - std::swap(src, dst); - } - - // Copy filtered result into fresh aligned buffer so scratch is reusable. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), src, length); - out.nbytes = length; - return out; - } - - // compress_queue_: main thread pushes raw chunks; workers pop and compress. - detail::c17_queue_t compress_queue_; - // done_queue_: workers push compressed chunks; main thread pops and writes. - detail::c17_queue_t done_queue_; - - std::atomic in_flight_{0}; - std::vector workers_; - std::once_flag init_flag_; - unsigned worker_count_override_{0}; -}; - -#endif // __cplusplus >= 202002L - -} // namespace h5::impl diff --git a/h5cpp/core b/h5cpp/core index 868e043898..40f7014eb8 100644 --- a/h5cpp/core +++ b/h5cpp/core @@ -60,8 +60,9 @@ #include "H5Pall.hpp" #include "H5Zpipeline.hpp" #include "H5Zpipeline_basic.hpp" - #include "H5Zpipeline_threaded.hpp" #include "H5Pdapl.hpp" + #include "H5Pthreads.hpp" + #include "H5Zpipeline_pool.hpp" #include "H5Ialgorithm.hpp" #include "H5capi.hpp" diff --git a/test/H5Aall.cpp b/test/H5Aall.cpp index 8580312ad3..c1d1e8470d 100644 --- a/test/H5Aall.cpp +++ b/test/H5Aall.cpp @@ -141,11 +141,9 @@ TEST_CASE("awrite/aread std::array of arithmetic") { // --------------------------------------------------------------------------- // POD struct (compound type) — KNOWN LIMITATION (not tested here) -// h5::awrite/aread for compound structs requires the h5cpp compiler plugin to -// emit compiler_meta_t and register the HDF5 compound type. Without the -// plugin, H5CPP_REGISTER_STRUCT only returns H5I_UNINIT and H5Acreate2 fails. -// Compound attribute round-trips work correctly when the plugin is used to -// generate the reflection shim — no fix needed in H5Awrite.hpp / H5Aread.hpp. +// h5::awrite/aread for compound structs requires a manual h5::create() +// specialization (dt_t path) or C++26 auto-reflection. Without either, +// H5CPP_REGISTER_STRUCT returns H5I_UNINIT and H5Acreate2 fails. // --------------------------------------------------------------------------- // --------------------------------------------------------------------------- diff --git a/test/H5Dappend.cpp b/test/H5Dappend.cpp index bbdccd92fd..e67932a829 100644 --- a/test/H5Dappend.cpp +++ b/test/H5Dappend.cpp @@ -163,84 +163,11 @@ TEST_CASE("packet table output stream for invalid handle") { } // ===================================================================== -// [#241] Threaded filter pipeline opt-in via h5::filter::threads{N} +// (#241 h5::filter::threads tests removed in #250 — the per-pt_t worker +// pool API is superseded by FAPL-scoped h5::threads{N}. Coverage moved +// to test/H5Pall.cpp ([#250 1.3.3] cases).) // ===================================================================== -TEST_CASE("[#241] h5::filter::threads tag construction") { - // Default-constructed tag means "use hardware_concurrency() workers". - constexpr h5::filter::threads default_t{}; - CHECK(default_t.n == 0); - - // Explicit count. - constexpr h5::filter::threads explicit_t{4}; - CHECK(explicit_t.n == 4); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — basic round-trip (no filter)") { - h5::test::file_fixture_t f("test-pt-threaded-nofilter.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); - { - h5::pt_t pt(ds, h5::filter::threads{2}); - for (int i = 0; i < 64; ++i) - h5::append(pt, i); - h5::flush(pt); // ensure all workers drain before close - } - auto readback = h5::read>(f.fd, "ds"); - REQUIRE(readback.size() == 64); - for (int i = 0; i < 64; ++i) CHECK(readback[i] == i); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — gzip-compressed bytewise equivalence") { - // Write the same data with basic and threaded pipelines into two files, - // read back, assert content matches. Different filter chain ordering can - // produce different on-disk bytes; we assert decompressed content equivalence. - constexpr int N = 256; - std::vector expected(N); - for (int i = 0; i < N; ++i) expected[i] = i * 7 + 3; - - auto write_file = [&](const char* path, auto pt_factory) { - h5::test::file_fixture_t f(path); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{6}); - { - auto pt = pt_factory(ds); - for (int v : expected) h5::append(pt, v); - h5::flush(pt); - } - }; - - write_file("test-pt-basic-gzip.h5", - [](const h5::ds_t& ds) { return h5::pt_t(ds); }); - write_file("test-pt-threaded-gzip.h5", - [](const h5::ds_t& ds) { return h5::pt_t(ds, h5::filter::threads{4}); }); - - h5::fd_t basic = h5::open("test-pt-basic-gzip.h5", H5F_ACC_RDONLY); - h5::fd_t threaded = h5::open("test-pt-threaded-gzip.h5", H5F_ACC_RDONLY); - auto basic_data = h5::read>(basic, "ds"); - auto threaded_data = h5::read>(threaded, "ds"); - REQUIRE(basic_data.size() == expected.size()); - REQUIRE(threaded_data.size() == expected.size()); - CHECK(basic_data == expected); - CHECK(threaded_data == expected); - CHECK(basic_data == threaded_data); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — default worker count (hw_concurrency)") { - h5::test::file_fixture_t f("test-pt-threaded-default.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16} | h5::gzip{1}); - { - // h5::filter::threads{} with no number → hw_concurrency() workers - h5::pt_t pt(ds, h5::filter::threads{}); - for (int i = 0; i < 96; ++i) h5::append(pt, i); - h5::flush(pt); - } - auto readback = h5::read>(f.fd, "ds"); - REQUIRE(readback.size() == 96); - for (int i = 0; i < 96; ++i) CHECK(readback[i] == i); -} - TEST_CASE("[#232] std::forward_list append streams elements into chunked dataset") { h5::test::file_fixture_t f("test-pt-fwdlist.h5"); // forward_list is append/view only — h5::write/read intentionally unsupported. @@ -276,3 +203,125 @@ TEST_CASE("[#239] h5::reset zeroes packet table dimension tracker") { h5::reset(pt); // must compile and run without throwing CHECK(true); } + +// ===================================================================== +// [#250 1.3.2] pt_t resolves FAPL pool + backpressure at init +// ===================================================================== + +TEST_CASE("[#250 1.3.2] pt_t picks up worker pool + cap from file's FAPL") { + // Construct a file with h5::threads{4} | h5::backpressure{16} on its FAPL. + // The fixture's default file_fixture_t opens without these properties; + // we make a custom one inline here. + const char* path = "test-pt-1.3.2-pool-resolve.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32}); + + h5::pt_t pt(ds); + // pt_t::pool_ and ::backpressure_cap_ are private; the visible + // contract is that operations on this pt_t SHOULD use the pool + // (Phase 1.3.3). In this commit we just verify the pt_t was + // constructed without error and the file FAPL has the pool. + auto pool_check = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool_check); + CHECK(pool_check->worker_count() == 4); + CHECK(h5::impl::resolve_backpressure( + static_cast(fapl), pool_check->worker_count()) == 16u); + } + std::remove(path); +} + +TEST_CASE("[#250 1.3.2] pt_t with no FAPL pool falls back cleanly") { + // Default FAPL — no h5::threads applied. + h5::test::file_fixture_t f("test-pt-1.3.2-no-pool.h5"); + h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); + + h5::pt_t pt(ds); + // pt_t constructs without throwing; pool_ resolves to nullptr internally. + // Writes go through visit_pipeline (synchronous) — verify by appending + // and reading back. + for (int i = 0; i < 32; ++i) h5::append(pt, i); + h5::flush(pt); + + auto readback = h5::read>(f.fd, "ds"); + REQUIRE(readback.size() == 32); + for (int i = 0; i < 32; ++i) CHECK(readback[i] == i); +} + +// ===================================================================== +// [#250 1.3.2 step 2] pt_t pool path: bytewise equivalence + parallelism +// ===================================================================== + +TEST_CASE("[#250 1.3.2] pt_t with FAPL pool — gzip round-trip equivalence vs synchronous") { + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 7 + 3; + + // Helper: write N ints through a pt_t built from a given fapl, + // read back, return the content. + auto write_and_read = [&](const char* path, h5::fapl_t fapl) { + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{6}); + h5::pt_t pt(ds); + for (int v : expected) h5::append(pt, v); + h5::flush(pt); + } + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + return h5::read>(fd, "ds"); + }; + + // 1) Default FAPL: synchronous path + auto sync_data = write_and_read("test-pt-1.3.2-sync.h5", h5::default_fapl); + REQUIRE(sync_data.size() == expected.size()); + CHECK(sync_data == expected); + + // 2) Pool FAPL with 4 workers, default backpressure + h5::fapl_t pool_fapl = h5::threads{4}; + auto pool_data = write_and_read("test-pt-1.3.2-pool.h5", pool_fapl); + REQUIRE(pool_data.size() == expected.size()); + CHECK(pool_data == expected); + + // 3) Pool with explicit backpressure + h5::fapl_t bp_fapl = h5::threads{4} | h5::backpressure{8}; + auto bp_data = write_and_read("test-pt-1.3.2-bp.h5", bp_fapl); + REQUIRE(bp_data.size() == expected.size()); + CHECK(bp_data == expected); + + // All three produce the same logical content. + CHECK(sync_data == pool_data); + CHECK(pool_data == bp_data); + + std::remove("test-pt-1.3.2-sync.h5"); + std::remove("test-pt-1.3.2-pool.h5"); + std::remove("test-pt-1.3.2-bp.h5"); +} + +TEST_CASE("[#250 1.3.2] pt_t pool path — back-pressure bounds in-flight") { + // Tight back-pressure cap (2) forces frequent drains. The test + // exercises the producer-blocking branch in write_chunk_via_pool. + constexpr int N = 64; + const char* path = "test-pt-1.3.2-tight-bp.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{2} | h5::backpressure{2}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{8} | h5::gzip{1}); + h5::pt_t pt(ds); + for (int i = 0; i < N; ++i) h5::append(pt, i); + h5::flush(pt); + } + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == N); + for (int i = 0; i < N; ++i) CHECK(data[i] == i); + std::remove(path); +} diff --git a/test/H5Pall.cpp b/test/H5Pall.cpp index 0e08c2ecc0..c1475a9694 100644 --- a/test/H5Pall.cpp +++ b/test/H5Pall.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include "support/fixture.hpp" @@ -232,3 +233,524 @@ TEST_CASE("property builder can be chained against an existing dcpl handle") { } CHECK(found_deflate); } + +// ===================================================================== +// [#250] Phase I — FAPL worker-pool slot lifecycle +// ===================================================================== +// +// Same H5Pinsert2 + slot pattern as the DAPL pipeline (#242/#244), but with +// shared (refcounted) ownership instead of fresh-allocation-per-copy. +// Multiple FAPL copies alias one underlying worker_pool_t via shared_ptr; +// the pool is destroyed when the last live FAPL copy releases its slot. +// +// Workplan: tasks/h5cpp-fapl-multithreading-workplan.md §3. + +namespace h5_250_fapl_regression { + using slot_t = h5::impl::worker_pool_slot_t; + using pool_t = h5::impl::worker_pool_t; + + inline std::atomic slot_allocations{0}; + inline std::atomic slot_deletions{0}; + inline std::unordered_set& live_slots() { + static std::unordered_set s; + return s; + } + inline std::atomic double_free_detected{false}; + + inline herr_t tracking_close_cb(const char*, size_t, void* ptr) { + auto* slot = *static_cast(ptr); + auto& live = live_slots(); + auto it = live.find(slot); + if (it == live.end()) { + double_free_detected.store(true); + return -1; + } + live.erase(it); + slot_deletions.fetch_add(1); + delete slot; + return 0; + } + inline herr_t tracking_copy_cb(const char*, size_t, void* value) { + auto** loc = static_cast(value); + auto* fresh = new slot_t{(*loc)->pool}; + live_slots().insert(fresh); + slot_allocations.fetch_add(1); + *loc = fresh; + return 0; + } + + inline hid_t make_tracked_fapl(unsigned workers, bool with_copy_cb) { + hid_t fapl = H5Pcreate(H5P_FILE_ACCESS); + auto* slot = new slot_t{std::make_shared(workers)}; + live_slots().insert(slot); + slot_allocations.fetch_add(1); + H5Pinsert2(fapl, H5CPP_FAPL_WORKER_POOL, sizeof(slot_t*), &slot, + nullptr, nullptr, nullptr, + with_copy_cb ? tracking_copy_cb : nullptr, + nullptr, tracking_close_cb); + return fapl; + } + + inline void reset_counters() { + slot_allocations.store(0); + slot_deletions.store(0); + live_slots().clear(); + double_free_detected.store(false); + } +} + +TEST_CASE("[#250] regression scaffold — slot lifecycle test fails when copy-cb is omitted") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl_a = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/false); + hid_t fapl_b = H5Pcopy(fapl_a); + H5Pclose(fapl_a); + H5Pclose(fapl_b); + } + CHECK(double_free_detected.load()); + reset_counters(); +} + +TEST_CASE("[#250] single FAPL clean lifecycle") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/true); + H5Pclose(fapl); + } + CHECK(!double_free_detected.load()); + CHECK(slot_allocations.load() == slot_deletions.load()); + CHECK(live_slots().empty()); +} + +TEST_CASE("[#250] H5Pcopy preserves shared pool ownership across FAPL copies") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl_a = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/true); + auto pool_a = h5::impl::resolve_worker_pool(fapl_a); + REQUIRE(pool_a); + CHECK(pool_a->worker_count() == 4); + + hid_t fapl_b = H5Pcopy(fapl_a); + auto pool_b = h5::impl::resolve_worker_pool(fapl_b); + REQUIRE(pool_b); + CHECK(pool_a.get() == pool_b.get()); // SAME pool — refcount shared + + H5Pclose(fapl_a); + // pool still alive via fapl_b's slot + auto pool_after_close = h5::impl::resolve_worker_pool(fapl_b); + CHECK(pool_after_close.get() == pool_a.get()); + + H5Pclose(fapl_b); + } + CHECK(!double_free_detected.load()); + CHECK(slot_allocations.load() == slot_deletions.load()); + CHECK(live_slots().empty()); +} + +TEST_CASE("[#250] h5::threads tag installs the FAPL property via property chain") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + // Construct an FAPL with h5::threads{N} applied via the property + // chain — this exercises the real fapl_threads_set callback path, + // not the tracking shim. Uses h5cpp's actual copy/close callbacks. + h5::fapl_t fapl = h5::threads{4}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(pool->worker_count() == 4); + } + // Pool is destroyed when fapl goes out of scope — no leak detection + // possible here without separate scaffolding, but TSAN coverage will + // catch any worker-thread shutdown issues. +} + +TEST_CASE("[#250] h5::threads{} (no count) uses hardware_concurrency") { + h5::fapl_t fapl = h5::threads{}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + const unsigned expected = std::max(1u, std::thread::hardware_concurrency()); + CHECK(pool->worker_count() == expected); +} + +// ===================================================================== +// [#250] Phase 1.2 — worker_pool_t generic submit() + wait_idle() +// ===================================================================== + +TEST_CASE("[#250 1.2] worker_pool_t — single submit + future return") { + h5::impl::worker_pool_t pool{4}; + REQUIRE(pool.worker_count() == 4); + + auto fut = pool.submit([] { return 42; }); + CHECK(fut.get() == 42); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — many submits resolve in parallel") { + constexpr unsigned N = 256; + h5::impl::worker_pool_t pool{8}; + std::vector> futures; + futures.reserve(N); + for (unsigned i = 0; i < N; ++i) + futures.emplace_back(pool.submit([i] { return static_cast(i * i); })); + + int sum = 0; + for (auto& f : futures) sum += f.get(); + int expected = 0; + for (unsigned i = 0; i < N; ++i) expected += static_cast(i * i); + CHECK(sum == expected); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — multi-thread submission is safe (MPMC)") { + constexpr unsigned PRODUCERS = 4; + constexpr unsigned PER_PRODUCER = 100; + h5::impl::worker_pool_t pool{4}; + std::atomic total{0}; + + std::vector producers; + producers.reserve(PRODUCERS); + for (unsigned p = 0; p < PRODUCERS; ++p) { + producers.emplace_back([&pool, &total] { + for (unsigned i = 0; i < PER_PRODUCER; ++i) + pool.submit([&total] { total.fetch_add(1); }).wait(); + }); + } + for (auto& t : producers) t.join(); + CHECK(total.load() == static_cast(PRODUCERS * PER_PRODUCER)); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — wait_idle blocks until completion") { + h5::impl::worker_pool_t pool{2}; + std::atomic done{0}; + for (int i = 0; i < 50; ++i) + pool.submit([&done] { + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + done.fetch_add(1); + }); + pool.wait_idle(); + CHECK(done.load() == 50); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — exception in task is captured in future") { + h5::impl::worker_pool_t pool{2}; + auto fut = pool.submit([] () -> int { throw std::runtime_error("boom"); }); + bool caught = false; + try { (void)fut.get(); } + catch (const std::runtime_error& e) { + caught = std::string(e.what()) == "boom"; + } + CHECK(caught); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — dtor drains pending work cleanly") { + std::atomic done{0}; + { + h5::impl::worker_pool_t pool{4}; + for (int i = 0; i < 100; ++i) + pool.submit([&done] { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + done.fetch_add(1); + }); + // No explicit wait_idle — dtor must drain before joining workers. + } + CHECK(done.load() == 100); +} + +// ===================================================================== +// [#250] Phase 1.3.1 — h5::backpressure FAPL property +// ===================================================================== + +TEST_CASE("[#250 1.3] h5::backpressure tag installs the FAPL property") { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(pool->worker_count() == 4); + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), pool->worker_count()) == 16); +} + +TEST_CASE("[#250 1.3] resolve_backpressure default = 8 × worker_count when unset") { + h5::fapl_t fapl = h5::threads{4}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), pool->worker_count()) == 32u); +} + +TEST_CASE("[#250 1.3] backpressure property survives H5Pcopy") { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{64}; + hid_t copy = H5Pcopy(static_cast(fapl)); + auto pool_copy = h5::impl::resolve_worker_pool(copy); + REQUIRE(pool_copy); + CHECK(h5::impl::resolve_backpressure(copy, pool_copy->worker_count()) == 64u); + H5Pclose(copy); +} + +TEST_CASE("[#250 1.3] backpressure without threads — resolver returns default but no pool") { + h5::fapl_t fapl = h5::backpressure{32}; + CHECK(h5::impl::resolve_worker_pool(static_cast(fapl)) == nullptr); + // Cap is present even without a pool — resolver returns user value. + // pt_t / h5::write are responsible for ignoring it when no pool exists. + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), 0) == 32u); +} + +// ===================================================================== +// [#250 1.3.3] h5::write / h5::read pool integration end-to-end +// ===================================================================== +// +// Per workplan Approach 2: h5::write / h5::read consult the file's FAPL +// for a pool and route through a local pool_pipeline_t when one exists. +// Per-dataset opt-in is still h5::high_throughput on the DAPL; without +// it, even a FAPL-pool-equipped file falls through to standard H5Dwrite. + +TEST_CASE("[#250 1.3.3] h5::write + h5::read round-trip through FAPL pool") { + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 11 + 7; + + const char* path = "test-h5write-1.3.3-pool.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + // chunked + gzip + DAPL high_throughput to opt the dataset into + // the pipeline path; the FAPL pool then claims the chunks. + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{32} | h5::gzip{6}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + } + { + h5::fapl_t fapl = h5::threads{4}; + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY, h5::default_fapl); + // Read back without a pool — verify the data is on disk regardless + // of FAPL choice on the reader side. + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +TEST_CASE("[#250 1.3.3] h5::write without high_throughput bypasses pool") { + // Even with FAPL pool installed, h5::write to a dataset whose DAPL + // doesn't have high_throughput should go through standard H5Dwrite. + constexpr int N = 128; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i; + + const char* path = "test-h5write-1.3.3-no-ht.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + // No high_throughput on the DAPL; pool is on FAPL but unused. + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); + h5::write(ds, expected.data(), h5::count{N}); + } + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +// ===================================================================== +// [#250 1.5] TSAN-targeted coverage — multi-fd isolation, bytewise +// equivalence vs. basic pipeline, fd shutdown drains in-flight chunks +// ===================================================================== +// +// These tests exist to harden the FAPL-scoped pool against data races +// and ownership bugs that show up under `-fsanitize=thread`. They +// run as ordinary ctest cases on the local toolchain (no TSAN gate), +// and as race detectors on the CI `tsan` job introduced in #250 +// Phase 1.5. + +TEST_CASE("[#250 1.5] two FAPLs with independent pools — bytewise equivalence to basic") { + // Same payload, gzip-compressed, written via three configurations: + // basic pipeline, FAPL pool A (4 threads), FAPL pool B (2 threads). + // All three on-disk files must read back to the same logical data. + constexpr int N = 1024; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = (i * 31337) ^ 0xCAFE; + + auto write_with = [&](const char* path, h5::fapl_t fapl) { + std::remove(path); + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{64} | h5::gzip{6}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + }; + + const char* p_basic = "test-1.5-basic.h5"; + const char* p_pool4 = "test-1.5-pool4.h5"; + const char* p_pool2 = "test-1.5-pool2.h5"; + + write_with(p_basic, h5::default_fapl); + write_with(p_pool4, h5::fapl_t{h5::threads{4} | h5::backpressure{16}}); + write_with(p_pool2, h5::fapl_t{h5::threads{2} | h5::backpressure{8}}); + + auto load = [](const char* p) { + h5::fd_t fd = h5::open(p, H5F_ACC_RDONLY); + return h5::read>(fd, "ds"); + }; + + auto a = load(p_basic), b = load(p_pool4), c = load(p_pool2); + REQUIRE(a.size() == expected.size()); + CHECK(a == expected); + CHECK(b == expected); + CHECK(c == expected); + CHECK(a == b); + CHECK(b == c); + + std::remove(p_basic); + std::remove(p_pool4); + std::remove(p_pool2); +} + +TEST_CASE("[#250 1.5] interleaved writes through two FAPLs do not cross-contaminate") { + // Two simultaneously-open files, each owning its own h5::fd_t with + // its own FAPL pool. Writes are issued sequentially (HDF5 itself is + // not thread-safe in default builds), but both pools are alive at + // the same time — a shared-state bug would cross chunks between + // them. Compression on each pool's workers runs in parallel. + // + // (Concurrent H5 C-API calls from multiple threads are explicitly + // out of scope: the FAPL pool parallelizes compression, not HDF5 + // itself. See Phase II in the workplan for the async/thread-safe + // story.) + constexpr int N = 512; + std::vector payload_a(N), payload_b(N); + for (int i = 0; i < N; ++i) { + payload_a[i] = i * 3 + 1; + payload_b[i] = i * 5 + 2; + } + + const char* pa = "test-1.5-interleaved-a.h5"; + const char* pb = "test-1.5-interleaved-b.h5"; + std::remove(pa); + std::remove(pb); + + { + h5::fapl_t fapl_a = h5::threads{4}; + h5::fapl_t fapl_b = h5::threads{2}; + h5::fd_t fa = h5::create(pa, H5F_ACC_TRUNC, h5::default_fcpl, fapl_a); + h5::fd_t fb = h5::create(pb, H5F_ACC_TRUNC, h5::default_fcpl, fapl_b); + + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t da = h5::create(fa, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{4}, dapl); + h5::ds_t db = h5::create(fb, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{4}, dapl); + + // Interleave the writes so both pools have in-flight work + // simultaneously. h5::write on each side drains its own pool + // before returning, so by the time we move on the other pool + // is still alive with its own queued work. + h5::write(da, payload_a.data(), h5::count{N}); + h5::write(db, payload_b.data(), h5::count{N}); + } + + h5::fd_t fa = h5::open(pa, H5F_ACC_RDONLY); + h5::fd_t fb = h5::open(pb, H5F_ACC_RDONLY); + auto ra = h5::read>(fa, "ds"); + auto rb = h5::read>(fb, "ds"); + REQUIRE(ra.size() == payload_a.size()); + REQUIRE(rb.size() == payload_b.size()); + CHECK(ra == payload_a); + CHECK(rb == payload_b); + + std::remove(pa); + std::remove(pb); +} + +TEST_CASE("[#250 1.5] pt_t destructor drains pool before fd close — data on disk") { + // pt_t::flush is called by ~pt_t, which on the pool path calls + // pool_pipeline_t::drain() and blocks until every in-flight chunk + // has completed compression + H5Dwrite_chunk. After the pt_t + // scope exits, the data must be readable from a fresh open even + // though the worker pool is still alive (held by the FAPL slot). + constexpr int N = 384; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 13; + + const char* path = "test-1.5-pt-drain.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{8}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{24} | h5::gzip{6}, dapl); + { + h5::pt_t pt(ds); + for (int v : expected) h5::append(pt, v); + // No explicit h5::flush — let ~pt_t drain. + } + // ds and fd close at scope exit. + } + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +TEST_CASE("[#250 1.5] fd close after FAPL pool flush releases workers") { + // After the fd goes out of scope, the FAPL property's close + // callback drops its shared_ptr to the worker_pool_t. When the + // last user releases, the pool destructor stops the workers + // cleanly. This test exercises the lifecycle directly; TSAN + // catches use-after-free if the slot order is wrong. Each + // iteration uses a distinct path because Windows holds file + // handles slightly longer than POSIX after the HDF5 close. + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i + 100; + + auto write_then_close = [&](const char* path) { + std::remove(path); + h5::fapl_t fapl = h5::threads{2}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{32} | h5::gzip{1}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + // ds / fd close at scope exit; FAPL slot drops the pool ref. + }; + + const char* paths[] = { + "test-1.5-fd-close-a.h5", + "test-1.5-fd-close-b.h5", + "test-1.5-fd-close-c.h5", + "test-1.5-fd-close-d.h5", + }; + for (const char* p : paths) write_then_close(p); + + for (const char* p : paths) { + h5::fd_t fd = h5::open(p, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + std::remove(p); + } +} diff --git a/test/H5Zpipeline.cpp b/test/H5Zpipeline.cpp index 856fe52caa..2f9850c904 100644 --- a/test/H5Zpipeline.cpp +++ b/test/H5Zpipeline.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include "support/fixture.hpp" @@ -184,84 +183,3 @@ TEST_CASE("filter::error throws runtime_error") { char dst[8] = {}; CHECK_THROWS_AS(h5::impl::filter::error(dst, src, 8, 0, 1, nullptr), std::runtime_error); } - -TEST_CASE("threaded_pipeline_t write/read round-trip no filter") { - h5::test::file_fixture_t f("test-threaded-pipeline-nofilter.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{50}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(int)); - - std::vector data(50); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i * 3); - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{50}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(50); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -} - -TEST_CASE("threaded_pipeline_t write/read round-trip gzip") { - h5::test::file_fixture_t f("test-threaded-pipeline-gzip.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{100}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10} | h5::gzip{6}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(double)); - - std::vector data(100); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i) * 1.5; - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{100}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(100); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -} - -TEST_CASE("threaded_pipeline_t write/read round-trip shuffle+gzip") { - h5::test::file_fixture_t f("test-threaded-pipeline-multi.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{50}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10} | h5::shuffle | h5::gzip{6}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(int)); - - std::vector data(50); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i + 1); - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{50}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(50); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -}