diff --git a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_consumer.cpp b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_consumer.cpp index 55d69ce34..6e7f7a662 100644 --- a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_consumer.cpp +++ b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_consumer.cpp @@ -36,7 +36,7 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ in constexpr int kRows = 128; constexpr int kCols = 128; using DynShapeDim5 = Shape<1, 1, 1, kRows, kCols>; - using DynStridDim5 = Stride<1, 1, 1, kCols, 1>; + using DynStridDim5 = pto::Stride<1, 1, 1, kCols, 1>; using GlobalData = GlobalTensor; using TileData = Tile; diff --git a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_producer_notify.cpp b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_producer_notify.cpp index 1cd3fb7ec..828129f14 100644 --- a/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_producer_notify.cpp +++ b/examples/a5/tensormap_and_ringbuffer/async_notify_demo/kernels/aiv/kernel_producer_notify.cpp @@ -48,7 +48,7 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ in constexpr int kRows = 128; constexpr int kCols = 128; using DynShapeDim5 = Shape<1, 1, 1, kRows, kCols>; - using DynStridDim5 = Stride<1, 1, 1, kCols, 1>; + using DynStridDim5 = pto::Stride<1, 1, 1, kCols, 1>; using GlobalData = GlobalTensor; using TileData = Tile; diff --git a/examples/workers/l3/allreduce_distributed/test_allreduce.py b/examples/workers/l3/allreduce_distributed/test_allreduce.py index fc7d4f639..29d4fc1eb 100644 --- a/examples/workers/l3/allreduce_distributed/test_allreduce.py +++ b/examples/workers/l3/allreduce_distributed/test_allreduce.py @@ -13,16 +13,29 @@ from .main import run +@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim", "a5"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +@pytest.mark.device_count(2) +def test_allreduce_distributed(st_platform, st_device_ids): + assert len(st_device_ids) == 2 + rc = run([int(d) for d in st_device_ids], platform=st_platform) + assert rc == 0 + + +# >2-rank cases live in a separate function so a5 can be dropped via the +# function-level platforms mark (the harness deselects by that mark, not by +# per-param marks). a5 onboard CI exposes only 2 NPUs, and a device_count(N>2) +# job aborts the whole resource phase — which would also take down the 2-rank +# case above. Still covered on a2a3 hardware and both sims. @pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim"]) @pytest.mark.runtime("tensormap_and_ringbuffer") @pytest.mark.parametrize( "n_devices", [ - pytest.param(2, marks=pytest.mark.device_count(2)), pytest.param(4, marks=pytest.mark.device_count(4)), ], ) -def test_allreduce_distributed(st_platform, st_device_ids, n_devices): +def test_allreduce_distributed_multi_rank(st_platform, st_device_ids, n_devices): assert len(st_device_ids) == n_devices rc = run([int(d) for d in st_device_ids], platform=st_platform) assert rc == 0 diff --git a/src/a2a3/platform/onboard/host/comm_hccl.cpp b/src/a2a3/platform/onboard/host/comm_hccl.cpp index d718cf9cd..01bb318e2 100644 --- a/src/a2a3/platform/onboard/host/comm_hccl.cpp +++ b/src/a2a3/platform/onboard/host/comm_hccl.cpp @@ -25,6 +25,8 @@ #include "platform_comm/comm.h" #include "platform_comm/comm_context.h" +#include "common/unified_log.h" + #include #include #include @@ -162,6 +164,7 @@ static void cleanup_handshake_files(const std::string &rootinfo_path) { static bool wait_for_rootinfo(const std::string &path, HcclRootInfo *root_info, uint64_t *run_token, int timeout_sec = 120) { + constexpr int kLogEverySec = 5; for (int i = 0; i < timeout_sec * 10; ++i) { std::ifstream f(path, std::ios::binary); if (f.good()) { @@ -183,6 +186,9 @@ wait_for_rootinfo(const std::string &path, HcclRootInfo *root_info, uint64_t *ru *run_token = header.run_token; return true; } + if (i > 0 && i % (kLogEverySec * 10) == 0) { + LOG_INFO_V0("[comm] wait_for_rootinfo: still waiting (%ds elapsed) path=%s", i / 10, path.c_str()); + } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } return false; @@ -202,9 +208,9 @@ static bool file_barrier( std::ifstream f(marker); if (f.good()) break; if (std::chrono::steady_clock::now() >= deadline) { - fprintf( - stderr, "[comm rank %d] file_barrier('%s') timed out after %ds waiting for rank %d\n", rank, - tag.c_str(), timeout_sec, r + LOG_ERROR( + "[comm rank %d] file_barrier('%s') timed out after %ds waiting for rank %d", rank, tag.c_str(), + timeout_sec, r ); return false; } @@ -222,22 +228,19 @@ static bool file_barrier( extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) try { if (stream == nullptr) { - fprintf(stderr, "[comm rank %d] comm_init: caller-supplied stream is null\n", rank); + LOG_ERROR("[comm rank %d] comm_init: caller-supplied stream is null", rank); return nullptr; } if (rootinfo_path == nullptr || *rootinfo_path == '\0') { - fprintf(stderr, "[comm rank %d] comm_init: rootinfo_path is null or empty\n", rank); + LOG_ERROR("[comm rank %d] comm_init: rootinfo_path is null or empty", rank); return nullptr; } if (nranks <= 0 || rank < 0 || rank >= nranks) { - fprintf(stderr, "[comm rank %d] comm_init: invalid rank/nranks (rank=%d, nranks=%d)\n", rank, rank, nranks); + LOG_ERROR("[comm rank %d] comm_init: invalid rank/nranks (rank=%d, nranks=%d)", rank, rank, nranks); return nullptr; } if (static_cast(nranks) > COMM_MAX_RANK_NUM) { - fprintf( - stderr, "[comm rank %d] comm_init: nranks=%d exceeds COMM_MAX_RANK_NUM=%u\n", rank, nranks, - COMM_MAX_RANK_NUM - ); + LOG_ERROR("[comm rank %d] comm_init: nranks=%d exceeds COMM_MAX_RANK_NUM=%u", rank, nranks, COMM_MAX_RANK_NUM); return nullptr; } @@ -262,7 +265,7 @@ extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char * h->run_token = make_run_token(rank); HcclResult hret = hccl_get_root_info(&rootInfo); if (hret != HCCL_SUCCESS) { - fprintf(stderr, "[comm rank 0] HcclGetRootInfo failed: %d\n", (int)hret); + LOG_ERROR("[comm rank 0] HcclGetRootInfo failed: %d", (int)hret); delete h; return nullptr; } @@ -280,7 +283,7 @@ extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char * } } else { if (!wait_for_rootinfo(h->rootinfo_path, &rootInfo, &h->run_token)) { - fprintf(stderr, "[comm rank %d] Timeout waiting for rootinfo\n", rank); + LOG_ERROR("[comm rank %d] Timeout waiting for rootinfo", rank); delete h; return nullptr; } @@ -295,17 +298,17 @@ extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char * HcclResult hret = hccl_comm_init_root_info(static_cast(nranks), &rootInfo, static_cast(rank), &h->hccl_comm); if (hret != HCCL_SUCCESS) { - fprintf(stderr, "[comm rank %d] HcclCommInitRootInfo failed: %d\n", rank, (int)hret); + LOG_ERROR("[comm rank %d] HcclCommInitRootInfo failed: %d", rank, (int)hret); delete h; return nullptr; } return h; } catch (const std::exception &e) { - fprintf(stderr, "[comm rank %d] comm_init: exception: %s\n", rank, e.what()); + LOG_ERROR("[comm rank %d] comm_init: exception: %s", rank, e.what()); return nullptr; } catch (...) { - fprintf(stderr, "[comm rank %d] comm_init: unknown exception\n", rank); + LOG_ERROR("[comm rank %d] comm_init: unknown exception", rank); return nullptr; } @@ -411,7 +414,7 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { // EnablePeerAccess takes a peer DEVICE id, not a peer rank. int32_t myDevice = -1; if (aclrtGetDevice(&myDevice) != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] ipc: aclrtGetDevice failed\n", rank); + LOG_ERROR("[comm rank %d] ipc: aclrtGetDevice failed", rank); return -1; } @@ -419,13 +422,13 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { void *localBuf = nullptr; aclError aret = aclrtMalloc(&localBuf, win_size, ACL_MEM_MALLOC_HUGE_FIRST); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] ipc: aclrtMalloc -> %d\n", rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] ipc: aclrtMalloc -> %d", rank, static_cast(aret)); return -1; } char myName[kIpcNameLen]{}; aret = aclrtIpcMemGetExportKey(localBuf, win_size, myName, kIpcNameLen, 0); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] ipc: GetExportKey -> %d\n", rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] ipc: GetExportKey -> %d", rank, static_cast(aret)); aclrtFree(localBuf); return -1; } @@ -433,7 +436,7 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { // Announce (pid, device, name) and read every peer's announcement. const int32_t myPid = static_cast(getpid()); if (!ipc_write_announce(rootinfo, rank, run_token, myPid, myDevice, myName)) { - fprintf(stderr, "[comm rank %d] ipc: write_announce failed\n", rank); + LOG_ERROR("[comm rank %d] ipc: write_announce failed", rank); aclrtFree(localBuf); return -1; } @@ -448,7 +451,7 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { continue; } if (!ipc_read_announce(rootinfo, p, run_token, &peers[p])) { - fprintf(stderr, "[comm rank %d] ipc: read_announce(peer=%d) timed out\n", rank, p); + LOG_ERROR("[comm rank %d] ipc: read_announce(peer=%d) timed out", rank, p); aclrtFree(localBuf); return -1; } @@ -460,9 +463,15 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { if (p == rank) continue; aclError r = aclrtDeviceEnablePeerAccess(peers[p].device_id, 0); if (r != ACL_SUCCESS) { - // Non-fatal: may already be enabled from a prior init in this process. - fprintf( - stderr, "[comm rank %d] ipc: EnablePeerAccess(peer_dev=%d) -> %d (continuing)\n", rank, + // Non-fatal but lossy: CANN 9.x does not expose a dedicated + // "already enabled" error code, so we cannot tell a benign + // re-enable from a real P2P failure (e.g. missing P2P link) + // here. The subsequent aclrtDevicePeerAccessStatus poll is + // the real source of truth — it returns status=1 if access + // is genuinely up, and times out (30s) otherwise. Failures + // that matter therefore still surface, just one stage later. + LOG_WARN( + "[comm rank %d] ipc: EnablePeerAccess(peer_dev=%d) -> %d (deferring to status poll)", rank, peers[p].device_id, static_cast(r) ); } @@ -474,8 +483,8 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { int32_t status = 0; aclError r = aclrtDevicePeerAccessStatus(myDevice, peers[p].device_id, &status); if (r != ACL_SUCCESS) { - fprintf( - stderr, "[comm rank %d] ipc: PeerAccessStatus(local_dev=%d peer_dev=%d) -> %d\n", rank, myDevice, + LOG_ERROR( + "[comm rank %d] ipc: PeerAccessStatus(local_dev=%d peer_dev=%d) -> %d", rank, myDevice, peers[p].device_id, static_cast(r) ); aclrtFree(localBuf); @@ -483,9 +492,9 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { } if (status == 1) break; if (std::chrono::steady_clock::now() >= deadline) { - fprintf( - stderr, "[comm rank %d] ipc: P2P enable timeout peer=%d peer_dev=%d status=%d\n", rank, p, - peers[p].device_id, status + LOG_ERROR( + "[comm rank %d] ipc: P2P enable timeout peer=%d peer_dev=%d status=%d", rank, p, peers[p].device_id, + status ); aclrtFree(localBuf); return -1; @@ -509,7 +518,7 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { } aret = aclrtIpcMemSetImportPid(myName, peerPids.data(), peerPids.size()); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] ipc: SetImportPid -> %d\n", rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] ipc: SetImportPid -> %d", rank, static_cast(aret)); aclrtFree(localBuf); return -1; } @@ -524,7 +533,9 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { // is kept in CommContext only to preserve byte-equivalence with pto-isa's // parallel HcclDeviceContext declaration; removing it is gated on the // F4 private-ization decision (see .docs/28.l3-comm/ext.01.pr-774-review.md). - memset(&h->host_ctx, 0, sizeof(h->host_ctx)); + // host_ctx was value-initialized at handle construction (CommContext{}), + // and the idempotency guard in comm_alloc_windows prevents a second + // entry; no re-zero needed before populating it here. h->host_ctx.rankId = static_cast(rank); h->host_ctx.rankNum = static_cast(nranks); h->host_ctx.winSize = win_size; @@ -535,9 +546,8 @@ static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { void *peerVa = nullptr; aret = aclrtIpcMemImportByKey(&peerVa, peers[p].name, 0); if (aret != ACL_SUCCESS) { - fprintf( - stderr, "[comm rank %d] ipc: ImportByKey(peer=%d pid=%d) -> %d\n", rank, p, peers[p].pid, - static_cast(aret) + LOG_ERROR( + "[comm rank %d] ipc: ImportByKey(peer=%d pid=%d) -> %d", rank, p, peers[p].pid, static_cast(aret) ); aclrtFree(localBuf); return -1; @@ -663,27 +673,27 @@ static int domain_alloc_via_ipc( int32_t myDevice = -1; if (aclrtGetDevice(&myDevice) != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: aclrtGetDevice failed\n", h->rank); + LOG_ERROR("[comm rank %d] alloc_domain: aclrtGetDevice failed", h->rank); return -1; } void *localBuf = nullptr; aclError aret = aclrtMalloc(&localBuf, win_size, ACL_MEM_MALLOC_HUGE_FIRST); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: aclrtMalloc -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: aclrtMalloc -> %d", h->rank, static_cast(aret)); return -1; } char myName[kIpcNameLen]{}; aret = aclrtIpcMemGetExportKey(localBuf, win_size, myName, kIpcNameLen, 0); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: GetExportKey -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: GetExportKey -> %d", h->rank, static_cast(aret)); aclrtFree(localBuf); return -1; } const int32_t myPid = static_cast(getpid()); if (!domain_write_announce(rootinfo, allocation_id, domain_rank, run_token, myPid, myDevice, myName)) { - fprintf(stderr, "[comm rank %d] alloc_domain: write_announce failed\n", h->rank); + LOG_ERROR("[comm rank %d] alloc_domain: write_announce failed", h->rank); aclrtFree(localBuf); return -1; } @@ -698,16 +708,59 @@ static int domain_alloc_via_ipc( continue; } if (!domain_read_announce(rootinfo, allocation_id, static_cast(p), run_token, &peers[p])) { - fprintf(stderr, "[comm rank %d] alloc_domain: read_announce(peer_dr=%d) timed out\n", h->rank, p); + LOG_ERROR("[comm rank %d] alloc_domain: read_announce(peer_dr=%d) timed out", h->rank, p); aclrtFree(localBuf); return -1; } } - // EnablePeerAccess is process-global and idempotent — already done by - // the base alloc for every base-comm pair, so domain allocations should - // never hit a new pair. Skip the EnablePeerAccess + wait loop here. - // (The base allocation owns the P2P route lifecycle.) + // Enable cross-card P2P for every domain peer and poll until ENABLED. + // The orch-only allocate_domain model has no base comm_alloc_windows to + // own the P2P route, so each allocation must (idempotently) ensure it. + // aclrtDeviceEnablePeerAccess is process-global and per device-pair, so + // once any allocation has opened a given pair, later ones simply observe + // it already enabled — the call + status poll is cheap in that case. + // Without this, the IPC VA import below still succeeds, but device-side + // cross-chip access from kernels silently fails, so peer TWAIT / + // notification writes never land and the scheduler times out. The + // aclrtDevicePeerAccessStatus poll is the source of truth (status==1) and + // surfaces a genuinely missing P2P link as a 30s timeout. + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + aclError r = aclrtDeviceEnablePeerAccess(peers[p].device_id, 0); + if (r != ACL_SUCCESS) { + LOG_WARN( + "[comm rank %d] alloc_domain: EnablePeerAccess(peer_dev=%d) -> %d (deferring to status poll)", h->rank, + peers[p].device_id, static_cast(r) + ); + } + } + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(30); + while (true) { + int32_t status = 0; + aclError r = aclrtDevicePeerAccessStatus(myDevice, peers[p].device_id, &status); + if (r != ACL_SUCCESS) { + LOG_ERROR( + "[comm rank %d] alloc_domain: PeerAccessStatus(local_dev=%d peer_dev=%d) -> %d", h->rank, myDevice, + peers[p].device_id, static_cast(r) + ); + aclrtFree(localBuf); + return -1; + } + if (status == 1) break; + if (std::chrono::steady_clock::now() >= deadline) { + LOG_ERROR( + "[comm rank %d] alloc_domain: P2P enable timeout peer_dr=%d peer_dev=%d status=%d", h->rank, p, + peers[p].device_id, status + ); + aclrtFree(localBuf); + return -1; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } if (!file_barrier(rootinfo, my_dr, subset_n, domain_barrier_tag(allocation_id, "p2p_ready"), run_token)) { aclrtFree(localBuf); @@ -722,7 +775,7 @@ static int domain_alloc_via_ipc( } aret = aclrtIpcMemSetImportPid(myName, peerPids.data(), peerPids.size()); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: SetImportPid -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: SetImportPid -> %d", h->rank, static_cast(aret)); aclrtFree(localBuf); return -1; } @@ -754,8 +807,8 @@ static int domain_alloc_via_ipc( void *peerVa = nullptr; aret = aclrtIpcMemImportByKey(&peerVa, peers[p].name, 0); if (aret != ACL_SUCCESS) { - fprintf( - stderr, "[comm rank %d] alloc_domain: ImportByKey(peer_dr=%d pid=%d) -> %d\n", h->rank, p, peers[p].pid, + LOG_ERROR( + "[comm rank %d] alloc_domain: ImportByKey(peer_dr=%d pid=%d) -> %d", h->rank, p, peers[p].pid, static_cast(aret) ); aclrtFree(localBuf); @@ -767,13 +820,13 @@ static int domain_alloc_via_ipc( void *newDevMem = nullptr; aret = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: ctx aclrtMalloc -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: ctx aclrtMalloc -> %d", h->rank, static_cast(aret)); aclrtFree(localBuf); return -1; } aret = aclrtMemcpy(newDevMem, sizeof(CommContext), &ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: ctx Memcpy H2D -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: ctx Memcpy H2D -> %d", h->rank, static_cast(aret)); aclrtFree(newDevMem); aclrtFree(localBuf); return -1; @@ -787,6 +840,15 @@ static int domain_alloc_via_ipc( extern "C" int comm_alloc_windows(CommHandle h, size_t win_size, uint64_t *device_ctx_out) try { if (!h || !device_ctx_out) return -1; + // Idempotency guard: comm_alloc_windows is not re-entrant. The localBuf + // allocated by alloc_windows_via_ipc is owned by the handle's windowsIn[] + // entries and is only reclaimed at aclrtResetDevice; calling this twice + // would leak a full per-rank pool. device_ctx is set on first success. + if (h->device_ctx != nullptr) { + LOG_ERROR("[comm rank %d] comm_alloc_windows: already allocated on this handle", h->rank); + return -1; + } + // Path D: DIY symmetric pool on stable ACL IPC + EnablePeerAccess. // Replaced the prior HcclAllocComResourceByTiling reverse-parse path // (broken on CANN 9.0 due to HcclOpResParam ABI drift; see project @@ -811,10 +873,10 @@ extern "C" int comm_alloc_windows(CommHandle h, size_t win_size, uint64_t *devic *device_ctx_out = reinterpret_cast(h->device_ctx); return 0; } catch (const std::exception &e) { - fprintf(stderr, "[comm] comm_alloc_windows: exception: %s\n", e.what()); + LOG_ERROR("[comm] comm_alloc_windows: exception: %s", e.what()); return -1; } catch (...) { - fprintf(stderr, "[comm] comm_alloc_windows: unknown exception\n"); + LOG_ERROR("[comm] comm_alloc_windows: unknown exception"); return -1; } @@ -836,20 +898,20 @@ extern "C" int comm_derive_context( ) try { if (!h || !rank_ids || !device_ctx_out) return -1; if (h->host_ctx.rankNum == 0) { - fprintf(stderr, "[comm rank %d] comm_derive_context: base windows are not allocated\n", h->rank); + LOG_ERROR("[comm rank %d] comm_derive_context: base windows are not allocated", h->rank); return -1; } if (rank_count == 0 || rank_count > COMM_MAX_RANK_NUM || domain_rank >= rank_count) { - fprintf( - stderr, "[comm rank %d] comm_derive_context: invalid rank_count=%zu domain_rank=%u\n", h->rank, rank_count, + LOG_ERROR( + "[comm rank %d] comm_derive_context: invalid rank_count=%zu domain_rank=%u", h->rank, rank_count, domain_rank ); return -1; } if (window_offset + window_size > static_cast(h->host_ctx.winSize)) { - fprintf( - stderr, "[comm rank %d] comm_derive_context: window range [%zu, %zu) exceeds base window size %llu\n", - h->rank, window_offset, window_offset + window_size, static_cast(h->host_ctx.winSize) + LOG_ERROR( + "[comm rank %d] comm_derive_context: window range [%zu, %zu) exceeds base window size %llu", h->rank, + window_offset, window_offset + window_size, static_cast(h->host_ctx.winSize) ); return -1; } @@ -863,9 +925,9 @@ extern "C" int comm_derive_context( for (size_t i = 0; i < rank_count; ++i) { uint32_t base_rank = rank_ids[i]; if (base_rank >= static_cast(h->nranks)) { - fprintf( - stderr, "[comm rank %d] comm_derive_context: rank_ids[%zu]=%u out of range [0, %d)\n", h->rank, i, - base_rank, h->nranks + LOG_ERROR( + "[comm rank %d] comm_derive_context: rank_ids[%zu]=%u out of range [0, %d)", h->rank, i, base_rank, + h->nranks ); return -1; } @@ -876,16 +938,12 @@ extern "C" int comm_derive_context( void *newDevMem = nullptr; aclError aRet = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); if (aRet != ACL_SUCCESS) { - fprintf( - stderr, "[comm rank %d] comm_derive_context: aclrtMalloc failed: %d\n", h->rank, static_cast(aRet) - ); + LOG_ERROR("[comm rank %d] comm_derive_context: aclrtMalloc failed: %d", h->rank, static_cast(aRet)); return -1; } aRet = aclrtMemcpy(newDevMem, sizeof(CommContext), &ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); if (aRet != ACL_SUCCESS) { - fprintf( - stderr, "[comm rank %d] comm_derive_context: aclrtMemcpy H2D failed: %d\n", h->rank, static_cast(aRet) - ); + LOG_ERROR("[comm rank %d] comm_derive_context: aclrtMemcpy H2D failed: %d", h->rank, static_cast(aRet)); aclrtFree(newDevMem); return -1; } @@ -895,10 +953,10 @@ extern "C" int comm_derive_context( *device_ctx_out = reinterpret_cast(derived); return 0; } catch (const std::exception &e) { - fprintf(stderr, "[comm] comm_derive_context: exception: %s\n", e.what()); + LOG_ERROR("[comm] comm_derive_context: exception: %s", e.what()); return -1; } catch (...) { - fprintf(stderr, "[comm] comm_derive_context: unknown exception\n"); + LOG_ERROR("[comm] comm_derive_context: unknown exception"); return -1; } @@ -910,7 +968,7 @@ extern "C" int comm_barrier(CommHandle h) { // stream for context-checked ACL calls (error 507018). HcclResult hret = hccl_barrier(h->hccl_comm, h->stream); if (hret != HCCL_SUCCESS) { - fprintf(stderr, "[comm rank %d] HcclBarrier failed: %d\n", h->rank, static_cast(hret)); + LOG_ERROR("[comm rank %d] HcclBarrier failed: %d", h->rank, static_cast(hret)); return static_cast(hret); } return 0; @@ -922,22 +980,22 @@ extern "C" int comm_alloc_domain_windows( ) try { if (!h || !rank_ids || !device_ctx_out || !local_window_base_out) return -1; if (rank_count == 0 || rank_count > COMM_MAX_RANK_NUM || domain_rank >= rank_count || window_size == 0) { - fprintf( - stderr, "[comm rank %d] alloc_domain: bad args (rank_count=%zu domain_rank=%u window_size=%zu)\n", h->rank, + LOG_ERROR( + "[comm rank %d] alloc_domain: bad args (rank_count=%zu domain_rank=%u window_size=%zu)", h->rank, rank_count, domain_rank, window_size ); return -1; } if (h->domain_allocations.count(allocation_id) > 0) { - fprintf( - stderr, "[comm rank %d] alloc_domain: allocation_id=%llu already live\n", h->rank, + LOG_ERROR( + "[comm rank %d] alloc_domain: allocation_id=%llu already live", h->rank, static_cast(allocation_id) ); return -1; } if (rank_ids[domain_rank] != static_cast(h->rank)) { - fprintf( - stderr, "[comm rank %d] alloc_domain: rank_ids[%u]=%u does not match base rank\n", h->rank, domain_rank, + LOG_ERROR( + "[comm rank %d] alloc_domain: rank_ids[%u]=%u does not match base rank", h->rank, domain_rank, rank_ids[domain_rank] ); return -1; @@ -947,7 +1005,7 @@ extern "C" int comm_alloc_domain_windows( // require comm_alloc_windows on the base in the orch-only model — the // dynamic alloc path does its own per-allocation aclrtMalloc + IPC dance. if (h->rootinfo_path.empty() || h->hccl_comm == nullptr) { - fprintf(stderr, "[comm rank %d] alloc_domain: base communicator not initialised\n", h->rank); + LOG_ERROR("[comm rank %d] alloc_domain: base communicator not initialised", h->rank); return -1; } @@ -959,7 +1017,7 @@ extern "C" int comm_alloc_domain_windows( // aclrtMalloc bytes (parity with the sim backend's memset). aclError aret = aclrtMemset(alloc->local_buf, window_size, 0, window_size); if (aret != ACL_SUCCESS) { - fprintf(stderr, "[comm rank %d] alloc_domain: aclrtMemset -> %d\n", h->rank, static_cast(aret)); + LOG_ERROR("[comm rank %d] alloc_domain: aclrtMemset -> %d", h->rank, static_cast(aret)); aclrtFree(alloc->device_ctx); aclrtFree(alloc->local_buf); return -1; @@ -970,10 +1028,10 @@ extern "C" int comm_alloc_domain_windows( h->domain_allocations.emplace(allocation_id, std::move(alloc)); return 0; } catch (const std::exception &e) { - fprintf(stderr, "[comm] alloc_domain: exception: %s\n", e.what()); + LOG_ERROR("[comm] alloc_domain: exception: %s", e.what()); return -1; } catch (...) { - fprintf(stderr, "[comm] alloc_domain: unknown exception\n"); + LOG_ERROR("[comm] alloc_domain: unknown exception"); return -1; } @@ -982,18 +1040,17 @@ comm_release_domain_windows(CommHandle h, uint64_t allocation_id, size_t rank_co if (!h) return -1; auto it = h->domain_allocations.find(allocation_id); if (it == h->domain_allocations.end()) { - fprintf( - stderr, "[comm rank %d] release_domain: allocation_id=%llu not found\n", h->rank, + LOG_ERROR( + "[comm rank %d] release_domain: allocation_id=%llu not found", h->rank, static_cast(allocation_id) ); return -1; } auto &alloc = it->second; if (static_cast(alloc->nranks) != rank_count || static_cast(alloc->rank) != domain_rank) { - fprintf( - stderr, + LOG_ERROR( "[comm rank %d] release_domain: caller (rank_count=%zu, domain_rank=%u) " - "disagrees with alloc-time (nranks=%d, rank=%d)\n", + "disagrees with alloc-time (nranks=%d, rank=%d)", h->rank, rank_count, domain_rank, alloc->nranks, alloc->rank ); return -1; @@ -1006,7 +1063,7 @@ comm_release_domain_windows(CommHandle h, uint64_t allocation_id, size_t rank_co h->rootinfo_path, static_cast(domain_rank), static_cast(rank_count), domain_barrier_tag(allocation_id, "release"), h->run_token )) { - fprintf(stderr, "[comm rank %d] release_domain: barrier timed out; releasing local state anyway\n", h->rank); + LOG_WARN("[comm rank %d] release_domain: barrier timed out; releasing local state anyway", h->rank); rc = -1; } @@ -1021,10 +1078,10 @@ comm_release_domain_windows(CommHandle h, uint64_t allocation_id, size_t rank_co h->domain_allocations.erase(it); return rc; } catch (const std::exception &e) { - fprintf(stderr, "[comm] release_domain: exception: %s\n", e.what()); + LOG_ERROR("[comm] release_domain: exception: %s", e.what()); return -1; } catch (...) { - fprintf(stderr, "[comm] release_domain: unknown exception\n"); + LOG_ERROR("[comm] release_domain: unknown exception"); return -1; } @@ -1035,9 +1092,7 @@ extern "C" int comm_destroy(CommHandle h) try { // release the local resources we own, so timeout just logs and proceeds. int rc = 0; if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "destroy", h->run_token)) { - fprintf( - stderr, "[comm rank %d] comm_destroy: final barrier timed out; releasing local state anyway\n", h->rank - ); + LOG_WARN("[comm rank %d] comm_destroy: final barrier timed out; releasing local state anyway", h->rank); rc = -1; } @@ -1062,7 +1117,7 @@ extern "C" int comm_destroy(CommHandle h) try { if (h->hccl_comm) { HcclResult hret = hccl_comm_destroy(h->hccl_comm); if (hret != HCCL_SUCCESS) { - fprintf(stderr, "[comm rank %d] HcclCommDestroy failed: %d\n", h->rank, static_cast(hret)); + LOG_ERROR("[comm rank %d] HcclCommDestroy failed: %d", h->rank, static_cast(hret)); if (rc == 0) rc = -1; } } @@ -1072,18 +1127,23 @@ extern "C" int comm_destroy(CommHandle h) try { // lifecycle belongs to DeviceRunner, whose finalize() releases all // device memory before resetting the device and running aclFinalize. - if (h->rank == 0) { + // Only rank 0 sweeps the on-disk handshake markers, and only if the + // final barrier succeeded. Deleting them after a timeout would strand + // any peer that hasn't observed our marker yet, and leak that peer + // into the next run with no rootinfo to discover. Letting cleanup + // ride on the next rank-0 init is the safer recovery path. + if (h->rank == 0 && rc == 0) { cleanup_handshake_files(h->rootinfo_path); } delete h; return rc; } catch (const std::exception &e) { - fprintf(stderr, "[comm] comm_destroy: exception: %s\n", e.what()); + LOG_ERROR("[comm] comm_destroy: exception: %s", e.what()); if (h) delete h; return -1; } catch (...) { - fprintf(stderr, "[comm] comm_destroy: unknown exception\n"); + LOG_ERROR("[comm] comm_destroy: unknown exception"); if (h) delete h; return -1; } diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index 4f04a94ce..59c6e48a4 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -355,11 +355,11 @@ int DeviceRunner::ensure_acl_ready(int device_id) { return -1; } - // aclInit is process-wide; CANN returns 100002 if it has already been - // initialized (possibly by another owner), which we treat as success. - constexpr int kAclRepeatInit = 100002; + // aclInit is process-wide; CANN returns ACL_ERROR_REPEAT_INITIALIZE if it + // has already been initialized (possibly by another owner), which we + // treat as success. aclError aRet = aclInit(nullptr); - if (aRet != ACL_SUCCESS && static_cast(aRet) != kAclRepeatInit) { + if (aRet != ACL_SUCCESS && static_cast(aRet) != ACL_ERROR_REPEAT_INITIALIZE) { LOG_ERROR("aclInit failed: %d", static_cast(aRet)); return static_cast(aRet); } diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index bd9d088b0..646e589e2 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -429,9 +429,14 @@ class DeviceRunner { /** * Destroy a stream previously returned by create_comm_stream(). - * Tolerates a nullptr stream (returns 0). + * Tolerates a nullptr stream. * - * @return 0 on success, error code on failure. + * Best-effort: any failure from aclrtSynchronizeStream / + * aclrtDestroyStream is logged but not propagated, since leaking a + * stream at teardown is strictly better than blocking device + * finalization. + * + * @return Always 0. */ int destroy_comm_stream(void *stream); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h index e859bb6d5..788652516 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ -#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ +#pragma once #include @@ -79,5 +78,3 @@ struct AICoreCompletionMailbox { static_assert( sizeof(AICoreCompletionMailbox) % PTO2_ALIGN_SIZE == 0, "AICoreCompletionMailbox size must be cache-line aligned" ); - -#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h index b646c8ad7..a242a0726 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_KERNEL_H_ -#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_KERNEL_H_ +#pragma once #include @@ -142,5 +141,3 @@ send_request_entry(AsyncCtx &ctx, SdmaRequestDescriptor #include @@ -65,5 +64,3 @@ inline void retire_sdma_event_record(uint64_t record_addr) { __atomic_store_n(channel_info, packed, __ATOMIC_RELEASE); cache_flush_range(const_cast(reinterpret_cast(channel_info)), sizeof(uint64_t)); } - -#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_SCHEDULER_H_ diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h index 87bcbfab6..9d1bd7700 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef PTO_ASYNC_KERNEL_API_H -#define PTO_ASYNC_KERNEL_API_H +#pragma once #include @@ -98,6 +97,10 @@ inline __aicore__ void defer_flush(AsyncCtx &ctx) { inline __aicore__ AsyncCtx get_async_ctx(__gm__ int64_t *args) { __gm__ LocalContext *lc = reinterpret_cast<__gm__ LocalContext *>(static_cast(args[PAYLOAD_LOCAL_CONTEXT_INDEX])); + // Field-by-field copy is mandatory: CCE rejects `AsyncCtx ctx = lc->async_ctx;` + // because there is no implicit constructor that crosses the __gm__ address + // space into Local Memory. When a new field is added to AsyncCtx, mirror it + // below or this kernel path will silently see zero for that field. AsyncCtx ctx{}; ctx.completion_count = lc->async_ctx.completion_count; ctx.completion_error_code = lc->async_ctx.completion_error_code; @@ -153,5 +156,3 @@ save_expected_notification_counter(AsyncCtx &ctx, volatile __gm__ void *counter_ (void)register_completion_condition(ctx, token); pto2::detail::defer_flush(ctx); } - -#endif // PTO_ASYNC_KERNEL_API_H diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h index c8781f9cf..8bd8ef76b 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef PTO_ASYNC_WAIT_H -#define PTO_ASYNC_WAIT_H +#pragma once #include #include @@ -367,5 +366,3 @@ struct AsyncWaitList { #endif ); }; - -#endif // PTO_ASYNC_WAIT_H diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h index 1ff9ab637..95ee0d5ff 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ -#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ +#pragma once #include @@ -41,5 +40,3 @@ struct CompletionPollResult { CompletionPollState state{CompletionPollState::PENDING}; int32_t error_code{PTO2_ERROR_NONE}; }; - -#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ diff --git a/src/a5/platform/onboard/host/CMakeLists.txt b/src/a5/platform/onboard/host/CMakeLists.txt index e5b57bf7a..c3c8ccded 100644 --- a/src/a5/platform/onboard/host/CMakeLists.txt +++ b/src/a5/platform/onboard/host/CMakeLists.txt @@ -43,6 +43,7 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/l2_perf_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/pmu_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/tensor_dump_collector.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/comm_hccl.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) foreach(SRC_DIR ${CUSTOM_SOURCE_DIRS}) @@ -80,6 +81,10 @@ target_include_directories(host_runtime PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../include + # Shared platform_comm headers (comm.h / comm_context.h) live in + # src/common so a5 (HCCL) and a5/a2a3 sim (POSIX-shm) can use the + # same contract. + ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common ${CMAKE_CUSTOM_INCLUDE_DIRS} PRIVATE ${ASCEND_HOME_PATH}/include @@ -95,6 +100,17 @@ target_link_directories(host_runtime ${ASCEND_HOME_PATH}/runtime/lib64 ) +# CANN 9.x exposes the working non-V2 HCCL entry points through libhcomm. +# Link it explicitly so comm_hccl.cpp can follow the same initialization path +# as the pto-isa communication tests. +find_library(HCOMM_LIB NAMES hcomm PATHS ${ASCEND_HOME_PATH}/lib64 NO_DEFAULT_PATH) +if(HCOMM_LIB) + set(HCCL_LINK_TARGETS ${HCOMM_LIB}) + message(STATUS "Using HCCL library: ${HCOMM_LIB}") +else() + message(FATAL_ERROR "libhcomm not found under ${ASCEND_HOME_PATH}/lib64") +endif() + # Link against CANN runtime libraries # ascend_hal is dynamically loaded at runtime via dlopen in device_runner # when performance profiling is enabled @@ -102,6 +118,7 @@ target_link_libraries(host_runtime PRIVATE runtime ascendcl + ${HCCL_LINK_TARGETS} dl ) diff --git a/src/a5/platform/onboard/host/comm_hccl.cpp b/src/a5/platform/onboard/host/comm_hccl.cpp new file mode 100644 index 000000000..b20f32f12 --- /dev/null +++ b/src/a5/platform/onboard/host/comm_hccl.cpp @@ -0,0 +1,1145 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * HCCL backend for the comm_* distributed communication API. + * + * Implements the five functions declared in host/comm.h using Ascend + * HCCL (bundled with CANN) for the bootstrap / barrier / teardown plane + * and the public ACL IPC primitives (aclrtIpcMem* + EnablePeerAccess) + * for the per-rank symmetric window pool (Path D). + * + * Scope: L3 single-host multi-card only. aclrtIpcMem* is host-local, so + * cross-host (L4) deployments need a different windows backend -- see + * .docs/28.l3-comm/ext.01.pr-774-review.md F2 / 05.plan.zh.md for the + * channel-API direction. + */ + +#include "platform_comm/comm.h" +#include "platform_comm/comm_context.h" + +#include "common/unified_log.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "acl/acl.h" +#include "hccl/hccl_comm.h" +#include "hccl/hccl_types.h" + +// Thin wrappers around the HCCL public APIs we use. Kept as a translation +// layer in case we need to swap (e.g., InitConfig variant) later. +static inline HcclResult hccl_get_root_info(HcclRootInfo *ri) { return HcclGetRootInfo(ri); } +static inline HcclResult hccl_comm_init_root_info(uint32_t n, const HcclRootInfo *ri, uint32_t r, HcclComm *c) { + return HcclCommInitRootInfo(n, ri, r, c); +} +static inline HcclResult hccl_barrier(HcclComm c, aclrtStream s) { return HcclBarrier(c, s); } +static inline HcclResult hccl_comm_destroy(HcclComm c) { return HcclCommDestroy(c); } + +// ============================================================================ +// Internal state +// ============================================================================ + +// Per-domain dynamic allocation. One of these per orch.allocate_domain call. +// Tracks the local IPC buffer (aclrtMalloc'd here, freed in +// comm_release_domain_windows) and the device CommContext we materialise for +// the subset. IPC import refs and EnablePeerAccess routes for this +// allocation are NOT explicitly released — same contract as +// alloc_windows_via_ipc (aclrtResetDevice at finalize reclaims them). +struct DomainAllocation { + int rank = 0; // this rank's index within the subset (domain_rank) + int nranks = 0; // subset size + void *local_buf = nullptr; + CommContext *device_ctx = nullptr; // aclrtMalloc'd CommContext mirror +}; + +struct CommHandle_ { + int rank; + int nranks; + std::string rootinfo_path; + uint64_t run_token = 0; + + // Caller-owned: supplied to comm_init, never created or destroyed here. + aclrtStream stream = nullptr; + HcclComm hccl_comm = nullptr; + + CommContext host_ctx{}; + CommContext *device_ctx = nullptr; + bool owns_device_ctx = false; + std::vector derived_contexts; + std::unordered_map> domain_allocations; +}; + +// ============================================================================ +// Helpers +// ============================================================================ + +namespace { + +static constexpr uint64_t ROOTINFO_MAGIC = 0x50544f5f4843434cULL; // "PTO_HCCL" + +struct RootInfoFileHeader { + uint64_t magic = ROOTINFO_MAGIC; + uint64_t run_token = 0; + uint32_t payload_size = HCCL_ROOT_INFO_BYTES; + uint32_t reserved = 0; +}; + +static std::string handshake_dir(const std::string &rootinfo_path) { + auto last_slash = rootinfo_path.rfind('/'); + if (last_slash == std::string::npos) return "."; + return rootinfo_path.substr(0, last_slash); +} + +static std::string handshake_prefix(const std::string &rootinfo_path) { + auto last_slash = rootinfo_path.rfind('/'); + return last_slash == std::string::npos ? rootinfo_path : rootinfo_path.substr(last_slash + 1); +} + +static std::string run_token_hex(uint64_t run_token) { + std::ostringstream oss; + oss << std::hex << run_token; + return oss.str(); +} + +static uint64_t make_run_token(int rank) { + // steady_clock is monotonic and unaffected by NTP or wall-clock jumps; + // we only need within-host uniqueness for the handshake file naming. + auto now = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); + uint64_t token = static_cast(now); + token ^= static_cast(getpid()) << 16; + token ^= static_cast(rank & 0xFFFF); + return token; +} + +static std::string +barrier_marker_path(const std::string &rootinfo_path, uint64_t run_token, const std::string &tag, int rank) { + return handshake_dir(rootinfo_path) + "/barrier_" + handshake_prefix(rootinfo_path) + "_" + tag + "_" + + run_token_hex(run_token) + "_" + std::to_string(rank) + ".ready"; +} + +static void cleanup_handshake_files(const std::string &rootinfo_path) { + std::error_code ec; + std::filesystem::remove(rootinfo_path, ec); + + const std::string prefix = "barrier_" + handshake_prefix(rootinfo_path) + "_"; + const std::string dir = handshake_dir(rootinfo_path); + for (const auto &entry : std::filesystem::directory_iterator(dir, ec)) { + if (ec) break; + if (!entry.is_regular_file(ec)) continue; + const std::string name = entry.path().filename().string(); + if (name.rfind(prefix, 0) != 0) continue; + if (name.size() < 6 || name.substr(name.size() - 6) != ".ready") continue; + std::filesystem::remove(entry.path(), ec); + ec.clear(); + } +} + +static bool +wait_for_rootinfo(const std::string &path, HcclRootInfo *root_info, uint64_t *run_token, int timeout_sec = 120) { + constexpr int kLogEverySec = 5; + for (int i = 0; i < timeout_sec * 10; ++i) { + std::ifstream f(path, std::ios::binary); + if (f.good()) { + RootInfoFileHeader header{}; + f.read(reinterpret_cast(&header), sizeof(header)); + if (!f.good()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + if (header.magic != ROOTINFO_MAGIC || header.payload_size != HCCL_ROOT_INFO_BYTES) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + f.read(root_info->internal, HCCL_ROOT_INFO_BYTES); + if (!f.good()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + *run_token = header.run_token; + return true; + } + if (i > 0 && i % (kLogEverySec * 10) == 0) { + LOG_INFO_V0("[comm] wait_for_rootinfo: still waiting (%ds elapsed) path=%s", i / 10, path.c_str()); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return false; +} + +static bool file_barrier( + const std::string &rootinfo_path, int rank, int nranks, const std::string &tag, uint64_t run_token, + int timeout_sec = 120 +) { + std::string my_marker = barrier_marker_path(rootinfo_path, run_token, tag, rank); + { std::ofstream(my_marker) << "1"; } + + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_sec); + for (int r = 0; r < nranks; ++r) { + std::string marker = barrier_marker_path(rootinfo_path, run_token, tag, r); + while (true) { + std::ifstream f(marker); + if (f.good()) break; + if (std::chrono::steady_clock::now() >= deadline) { + LOG_ERROR( + "[comm rank %d] file_barrier('%s') timed out after %ds waiting for rank %d", rank, tag.c_str(), + timeout_sec, r + ); + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + return true; +} + +} // namespace + +// ============================================================================ +// API implementation +// ============================================================================ + +extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) try { + if (stream == nullptr) { + LOG_ERROR("[comm rank %d] comm_init: caller-supplied stream is null", rank); + return nullptr; + } + if (rootinfo_path == nullptr || *rootinfo_path == '\0') { + LOG_ERROR("[comm rank %d] comm_init: rootinfo_path is null or empty", rank); + return nullptr; + } + if (nranks <= 0 || rank < 0 || rank >= nranks) { + LOG_ERROR("[comm rank %d] comm_init: invalid rank/nranks (rank=%d, nranks=%d)", rank, rank, nranks); + return nullptr; + } + if (static_cast(nranks) > COMM_MAX_RANK_NUM) { + LOG_ERROR("[comm rank %d] comm_init: nranks=%d exceeds COMM_MAX_RANK_NUM=%u", rank, nranks, COMM_MAX_RANK_NUM); + return nullptr; + } + + auto *h = new (std::nothrow) CommHandle_{}; + if (!h) return nullptr; + + h->rank = rank; + h->nranks = nranks; + h->rootinfo_path = rootinfo_path; + h->stream = static_cast(stream); + + // NOTE: aclInit / aclrtSetDevice / stream creation are intentionally NOT + // performed here — the caller (DeviceRunner::ensure_acl_ready + a stream + // it owns) is responsible for them. This keeps ACL lifecycle ownership + // in one place (DeviceRunner) and matches HCCL's API shape, which already + // takes a caller-supplied stream. + + // RootInfo exchange + HcclRootInfo rootInfo{}; + if (rank == 0) { + cleanup_handshake_files(h->rootinfo_path); + h->run_token = make_run_token(rank); + HcclResult hret = hccl_get_root_info(&rootInfo); + if (hret != HCCL_SUCCESS) { + LOG_ERROR("[comm rank 0] HcclGetRootInfo failed: %d", (int)hret); + delete h; + return nullptr; + } + RootInfoFileHeader header{}; + header.run_token = h->run_token; + std::string tmp_path = h->rootinfo_path + ".tmp." + std::to_string(getpid()); + std::ofstream fout(tmp_path, std::ios::binary | std::ios::trunc); + fout.write(reinterpret_cast(&header), sizeof(header)); + fout.write(rootInfo.internal, HCCL_ROOT_INFO_BYTES); + fout.close(); + if (!fout.good() || std::rename(tmp_path.c_str(), h->rootinfo_path.c_str()) != 0) { + std::remove(tmp_path.c_str()); + delete h; + return nullptr; + } + } else { + if (!wait_for_rootinfo(h->rootinfo_path, &rootInfo, &h->run_token)) { + LOG_ERROR("[comm rank %d] Timeout waiting for rootinfo", rank); + delete h; + return nullptr; + } + } + + if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "rootinfo_ready", h->run_token)) { + delete h; + return nullptr; + } + + // Init communicator + HcclResult hret = + hccl_comm_init_root_info(static_cast(nranks), &rootInfo, static_cast(rank), &h->hccl_comm); + if (hret != HCCL_SUCCESS) { + LOG_ERROR("[comm rank %d] HcclCommInitRootInfo failed: %d", rank, (int)hret); + delete h; + return nullptr; + } + + return h; +} catch (const std::exception &e) { + LOG_ERROR("[comm rank %d] comm_init: exception: %s", rank, e.what()); + return nullptr; +} catch (...) { + LOG_ERROR("[comm rank %d] comm_init: unknown exception", rank); + return nullptr; +} + +namespace { + +// Path D: build the per-rank symmetric pool ourselves via the public ACL +// IPC primitives (aclrtMalloc + aclrtIpcMemGetExportKey + SetImportPid + +// ImportByKey). This mirrors HCCL's own internal cross-rank IPC pattern +// (refs/hcomm adapter_rts.cc::hrtIpc* + p2p_mgmt.cc::EnableP2P) so we +// depend only on stable ACL surface, no HCCL-private struct ABI. +// Spike-validated in hw-native-sys/comm-spike; see project memory. + +// Default per-rank symmetric pool size when comm_alloc_windows is called +// with win_size == 0. Picked to match the HCCL_BUFFSIZE default of the +// pre-Path-D backend so existing callers see no behavioural change. +constexpr uint64_t kDefaultIpcWinSize = 200ULL * 1024 * 1024; +constexpr size_t kIpcNameLen = 65; +constexpr uint64_t kIpcAnnounceMagic = 0x49504344334d4549ULL; // "IPCD3MEI" + +struct IpcAnnounceFile { + uint64_t magic; + int32_t pid; + uint32_t rank; + int32_t device_id; // ACL logic device id this rank is bound to. + char name[kIpcNameLen]; + char pad[3]; // keep struct size a multiple of 8 +}; + +// Announce file path shares the `barrier__..._.ready` shape so +// cleanup_handshake_files picks it up alongside the file_barrier markers. +// Without this convention these files would accumulate across re-runs. +static std::string ipc_announce_path(const std::string &rootinfo, int rank, uint64_t run_token) { + return handshake_dir(rootinfo) + "/barrier_" + handshake_prefix(rootinfo) + "_ipc_announce_" + + run_token_hex(run_token) + "_" + std::to_string(rank) + ".ready"; +} + +static bool ipc_write_announce( + const std::string &rootinfo, int rank, uint64_t run_token, int32_t pid, int32_t device_id, const char *name +) { + IpcAnnounceFile a{}; + a.magic = kIpcAnnounceMagic; + a.pid = pid; + a.rank = static_cast(rank); + a.device_id = device_id; + memcpy(a.name, name, kIpcNameLen); + std::string p = ipc_announce_path(rootinfo, rank, run_token); + std::string tmp = p + ".tmp." + std::to_string(getpid()); + { + std::ofstream f(tmp, std::ios::binary | std::ios::trunc); + f.write(reinterpret_cast(&a), sizeof(a)); + if (!f.good()) { + std::remove(tmp.c_str()); + return false; + } + } + if (std::rename(tmp.c_str(), p.c_str()) != 0) { + std::remove(tmp.c_str()); + return false; + } + return true; +} + +static bool ipc_read_announce( + const std::string &rootinfo, int peer, uint64_t run_token, IpcAnnounceFile *out, int timeout_sec = 60 +) { + std::string p = ipc_announce_path(rootinfo, peer, run_token); + for (int i = 0; i < timeout_sec * 10; ++i) { + std::ifstream f(p, std::ios::binary); + if (f.good()) { + IpcAnnounceFile a{}; + f.read(reinterpret_cast(&a), sizeof(a)); + if (f.good() && a.magic == kIpcAnnounceMagic && a.rank == static_cast(peer)) { + *out = a; + return true; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return false; +} + +// Fills h->host_ctx with rankId/rankNum/winSize/windowsIn[] via DIY IPC. +// `win_size` is the per-rank pool byte size requested by the caller +// (kDefaultIpcWinSize when 0). +// +// On failure or normal exit, the device-side resources allocated here +// (localBuf via aclrtMalloc, the IPC export key, peer imports, and any +// P2P routes enabled) are NOT explicitly released. DeviceRunner::finalize +// calls aclrtResetDevice at Worker teardown, which reclaims all of the +// above. simpler's current usage is one comm_init/destroy per Worker +// lifetime, so the absence of explicit cleanup does not accumulate +// across runs. If a future caller starts cycling comm contexts within a +// single Worker, explicit teardown will need to land here. +static int alloc_windows_via_ipc(CommHandle h, uint64_t win_size) { + const int rank = h->rank; + const int nranks = h->nranks; + const std::string &rootinfo = h->rootinfo_path; + const uint64_t run_token = h->run_token; + + // Discover our own device id. Rank != device in general (e.g. simpler's + // chip_process spawns rank N on whatever device the resource pool gives + // it). We need real device ids before any cross-rank ACL setup -- + // EnablePeerAccess takes a peer DEVICE id, not a peer rank. + int32_t myDevice = -1; + if (aclrtGetDevice(&myDevice) != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] ipc: aclrtGetDevice failed", rank); + return -1; + } + + // Allocate local buffer + export its IPC name. + void *localBuf = nullptr; + aclError aret = aclrtMalloc(&localBuf, win_size, ACL_MEM_MALLOC_HUGE_FIRST); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] ipc: aclrtMalloc -> %d", rank, static_cast(aret)); + return -1; + } + char myName[kIpcNameLen]{}; + aret = aclrtIpcMemGetExportKey(localBuf, win_size, myName, kIpcNameLen, 0); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] ipc: GetExportKey -> %d", rank, static_cast(aret)); + aclrtFree(localBuf); + return -1; + } + + // Announce (pid, device, name) and read every peer's announcement. + const int32_t myPid = static_cast(getpid()); + if (!ipc_write_announce(rootinfo, rank, run_token, myPid, myDevice, myName)) { + LOG_ERROR("[comm rank %d] ipc: write_announce failed", rank); + aclrtFree(localBuf); + return -1; + } + std::vector peers(nranks); + for (int p = 0; p < nranks; ++p) { + if (p == rank) { + peers[p].magic = kIpcAnnounceMagic; + peers[p].pid = myPid; + peers[p].rank = static_cast(rank); + peers[p].device_id = myDevice; + memcpy(peers[p].name, myName, kIpcNameLen); + continue; + } + if (!ipc_read_announce(rootinfo, p, run_token, &peers[p])) { + LOG_ERROR("[comm rank %d] ipc: read_announce(peer=%d) timed out", rank, p); + aclrtFree(localBuf); + return -1; + } + } + + // Now we know every peer's device id. Enable cross-card P2P and poll + // until ENABLED. Mirrors hcomm/.../p2p_mgmt.cc::EnableP2P + WaitP2PEnabled. + for (int p = 0; p < nranks; ++p) { + if (p == rank) continue; + aclError r = aclrtDeviceEnablePeerAccess(peers[p].device_id, 0); + if (r != ACL_SUCCESS) { + // Non-fatal but lossy: CANN 9.x does not expose a dedicated + // "already enabled" error code, so we cannot tell a benign + // re-enable from a real P2P failure (e.g. missing P2P link) + // here. The subsequent aclrtDevicePeerAccessStatus poll is + // the real source of truth — it returns status=1 if access + // is genuinely up, and times out (30s) otherwise. Failures + // that matter therefore still surface, just one stage later. + LOG_WARN( + "[comm rank %d] ipc: EnablePeerAccess(peer_dev=%d) -> %d (deferring to status poll)", rank, + peers[p].device_id, static_cast(r) + ); + } + } + for (int p = 0; p < nranks; ++p) { + if (p == rank) continue; + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(30); + while (true) { + int32_t status = 0; + aclError r = aclrtDevicePeerAccessStatus(myDevice, peers[p].device_id, &status); + if (r != ACL_SUCCESS) { + LOG_ERROR( + "[comm rank %d] ipc: PeerAccessStatus(local_dev=%d peer_dev=%d) -> %d", rank, myDevice, + peers[p].device_id, static_cast(r) + ); + aclrtFree(localBuf); + return -1; + } + if (status == 1) break; + if (std::chrono::steady_clock::now() >= deadline) { + LOG_ERROR( + "[comm rank %d] ipc: P2P enable timeout peer=%d peer_dev=%d status=%d", rank, p, peers[p].device_id, + status + ); + aclrtFree(localBuf); + return -1; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + // Barrier so every rank has finished its outbound P2P enable+wait. + if (!file_barrier(rootinfo, rank, nranks, "ipc_p2p_ready", run_token)) { + aclrtFree(localBuf); + return -1; + } + + // Authorize every peer's pid against MY name (batched). + std::vector peerPids; + peerPids.reserve(nranks - 1); + for (int p = 0; p < nranks; ++p) { + if (p == rank) continue; + peerPids.push_back(peers[p].pid); + } + aret = aclrtIpcMemSetImportPid(myName, peerPids.data(), peerPids.size()); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] ipc: SetImportPid -> %d", rank, static_cast(aret)); + aclrtFree(localBuf); + return -1; + } + if (!file_barrier(rootinfo, rank, nranks, "ipc_auth_done", run_token)) { + aclrtFree(localBuf); + return -1; + } + + // Import every peer's buffer into our local VA space. + // windowsOut[] is intentionally left zero by the memset below: no kernel + // path reads it (verified by grep across simpler + pto-isa). The field + // is kept in CommContext only to preserve byte-equivalence with pto-isa's + // parallel HcclDeviceContext declaration; removing it is gated on the + // F4 private-ization decision (see .docs/28.l3-comm/ext.01.pr-774-review.md). + // host_ctx was value-initialized at handle construction (CommContext{}), + // and the idempotency guard in comm_alloc_windows prevents a second + // entry; no re-zero needed before populating it here. + h->host_ctx.rankId = static_cast(rank); + h->host_ctx.rankNum = static_cast(nranks); + h->host_ctx.winSize = win_size; + h->host_ctx.windowsIn[rank] = reinterpret_cast(localBuf); + + for (int p = 0; p < nranks; ++p) { + if (p == rank) continue; + void *peerVa = nullptr; + aret = aclrtIpcMemImportByKey(&peerVa, peers[p].name, 0); + if (aret != ACL_SUCCESS) { + LOG_ERROR( + "[comm rank %d] ipc: ImportByKey(peer=%d pid=%d) -> %d", rank, p, peers[p].pid, static_cast(aret) + ); + aclrtFree(localBuf); + return -1; + } + h->host_ctx.windowsIn[p] = reinterpret_cast(peerVa); + } + + return 0; +} + +// ============================================================================ +// Per-domain dynamic allocation (for orch.allocate_domain). +// +// Same Path-D IPC dance as alloc_windows_via_ipc, but on a fresh per-allocation +// local buffer. Every barrier filename and announce filename is scoped by +// allocation_id so concurrent allocations from different orch.allocate_domain +// calls do not collide. Participation is by subset (domain_rank within +// rank_count), so non-members of the subset are not involved. +// ============================================================================ + +// Announce file path scoped by allocation_id so two concurrent allocations +// from different orch calls do not collide. Same dir + cleanup-friendly +// prefix as the base-comm IPC announce. +static std::string +domain_announce_path(const std::string &rootinfo, uint64_t allocation_id, uint32_t domain_rank, uint64_t run_token) { + return handshake_dir(rootinfo) + "/barrier_" + handshake_prefix(rootinfo) + "_alloc_" + + std::to_string(allocation_id) + "_ipc_announce_" + run_token_hex(run_token) + "_" + + std::to_string(domain_rank) + ".ready"; +} + +static bool domain_write_announce( + const std::string &rootinfo, uint64_t allocation_id, uint32_t domain_rank, uint64_t run_token, int32_t pid, + int32_t device_id, const char *name +) { + IpcAnnounceFile a{}; + a.magic = kIpcAnnounceMagic; + a.pid = pid; + a.rank = domain_rank; + a.device_id = device_id; + memcpy(a.name, name, kIpcNameLen); + std::string p = domain_announce_path(rootinfo, allocation_id, domain_rank, run_token); + std::string tmp = p + ".tmp." + std::to_string(getpid()); + { + std::ofstream f(tmp, std::ios::binary | std::ios::trunc); + f.write(reinterpret_cast(&a), sizeof(a)); + if (!f.good()) { + std::remove(tmp.c_str()); + return false; + } + } + if (std::rename(tmp.c_str(), p.c_str()) != 0) { + std::remove(tmp.c_str()); + return false; + } + return true; +} + +static bool domain_read_announce( + const std::string &rootinfo, uint64_t allocation_id, uint32_t peer_domain_rank, uint64_t run_token, + IpcAnnounceFile *out, int timeout_sec = 60 +) { + std::string p = domain_announce_path(rootinfo, allocation_id, peer_domain_rank, run_token); + for (int i = 0; i < timeout_sec * 10; ++i) { + std::ifstream f(p, std::ios::binary); + if (f.good()) { + IpcAnnounceFile a{}; + f.read(reinterpret_cast(&a), sizeof(a)); + if (f.good() && a.magic == kIpcAnnounceMagic && a.rank == peer_domain_rank) { + *out = a; + return true; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return false; +} + +// Tag helper for allocation-scoped file barriers. Tag is fed straight into +// `file_barrier`, which already namespaces the marker filename by +// rootinfo prefix + run_token + rank, so adding allocation_id to `tag` is +// enough to keep concurrent allocations from sharing a marker file. +static std::string domain_barrier_tag(uint64_t allocation_id, const char *phase) { + return std::string("alloc_") + std::to_string(allocation_id) + "_" + phase; +} + +// Idempotently provision the process-global PTO-ISA async-SDMA scratch +// workspace on the comm handle and mirror its address into host_ctx. Both +// the base-window path and the dynamic per-domain path call this; only the +// first call allocates. CANN 9.0+ feature: on 8.5 the aclnn dlsym fails by +// design, so we leave workSpace == 0 and SDMA demos self-skip. No-op when +// the build-time PTO-ISA dependency is absent. +static void ensure_sdma_workspace(CommHandle h) { +#ifdef SIMPLER_ENABLE_PTO_SDMA_WORKSPACE + if (h->sdma_workspace) return; + h->sdma_workspace = std::make_unique(); + if (h->sdma_workspace->Init()) { + h->host_ctx.workSpace = reinterpret_cast(h->sdma_workspace->GetWorkspaceAddr()); + h->host_ctx.workSpaceSize = 16 * 1024; + } else { + h->sdma_workspace.reset(); + } +#else + (void)h; +#endif +} + +// Performs the per-allocation Path-D dance for one subset rank. rank_ids +// must list participating BASE-COMM rank ids in domain rank order; this +// rank's domain_rank must match its base rank for the same invariant +// alloc_windows_via_ipc relies on (rank_ids[domain_rank] == h->rank). +// +// Failure paths free the local buffer if it was allocated. IPC imports are +// NOT explicitly torn down on failure — mirrors alloc_windows_via_ipc; ACL +// reset at finalize cleans them up. +static int domain_alloc_via_ipc( + CommHandle h, uint64_t allocation_id, const uint32_t *rank_ids, size_t rank_count, uint32_t domain_rank, + uint64_t win_size, DomainAllocation *out +) { + const std::string &rootinfo = h->rootinfo_path; + const uint64_t run_token = h->run_token; + const int subset_n = static_cast(rank_count); + const int my_dr = static_cast(domain_rank); + + int32_t myDevice = -1; + if (aclrtGetDevice(&myDevice) != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: aclrtGetDevice failed", h->rank); + return -1; + } + + void *localBuf = nullptr; + aclError aret = aclrtMalloc(&localBuf, win_size, ACL_MEM_MALLOC_HUGE_FIRST); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: aclrtMalloc -> %d", h->rank, static_cast(aret)); + return -1; + } + char myName[kIpcNameLen]{}; + aret = aclrtIpcMemGetExportKey(localBuf, win_size, myName, kIpcNameLen, 0); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: GetExportKey -> %d", h->rank, static_cast(aret)); + aclrtFree(localBuf); + return -1; + } + + const int32_t myPid = static_cast(getpid()); + if (!domain_write_announce(rootinfo, allocation_id, domain_rank, run_token, myPid, myDevice, myName)) { + LOG_ERROR("[comm rank %d] alloc_domain: write_announce failed", h->rank); + aclrtFree(localBuf); + return -1; + } + std::vector peers(subset_n); + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) { + peers[p].magic = kIpcAnnounceMagic; + peers[p].pid = myPid; + peers[p].rank = domain_rank; + peers[p].device_id = myDevice; + memcpy(peers[p].name, myName, kIpcNameLen); + continue; + } + if (!domain_read_announce(rootinfo, allocation_id, static_cast(p), run_token, &peers[p])) { + LOG_ERROR("[comm rank %d] alloc_domain: read_announce(peer_dr=%d) timed out", h->rank, p); + aclrtFree(localBuf); + return -1; + } + } + + // Enable cross-card P2P for every domain peer and poll until ENABLED. + // The orch-only allocate_domain model has no base comm_alloc_windows to + // own the P2P route, so each allocation must (idempotently) ensure it. + // aclrtDeviceEnablePeerAccess is process-global and per device-pair, so + // once any allocation has opened a given pair, later ones simply observe + // it already enabled — the call + status poll is cheap in that case. + // Without this, the IPC VA import below still succeeds, but device-side + // cross-chip access from kernels silently fails, so peer TWAIT / + // notification writes never land and the scheduler times out. The + // aclrtDevicePeerAccessStatus poll is the source of truth (status==1) and + // surfaces a genuinely missing P2P link as a 30s timeout. + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + aclError r = aclrtDeviceEnablePeerAccess(peers[p].device_id, 0); + if (r != ACL_SUCCESS) { + LOG_WARN( + "[comm rank %d] alloc_domain: EnablePeerAccess(peer_dev=%d) -> %d (deferring to status poll)", h->rank, + peers[p].device_id, static_cast(r) + ); + } + } + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(30); + while (true) { + int32_t status = 0; + aclError r = aclrtDevicePeerAccessStatus(myDevice, peers[p].device_id, &status); + if (r != ACL_SUCCESS) { + LOG_ERROR( + "[comm rank %d] alloc_domain: PeerAccessStatus(local_dev=%d peer_dev=%d) -> %d", h->rank, myDevice, + peers[p].device_id, static_cast(r) + ); + aclrtFree(localBuf); + return -1; + } + if (status == 1) break; + if (std::chrono::steady_clock::now() >= deadline) { + LOG_ERROR( + "[comm rank %d] alloc_domain: P2P enable timeout peer_dr=%d peer_dev=%d status=%d", h->rank, p, + peers[p].device_id, status + ); + aclrtFree(localBuf); + return -1; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + if (!file_barrier(rootinfo, my_dr, subset_n, domain_barrier_tag(allocation_id, "p2p_ready"), run_token)) { + aclrtFree(localBuf); + return -1; + } + + std::vector peerPids; + peerPids.reserve(subset_n - 1); + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + peerPids.push_back(peers[p].pid); + } + aret = aclrtIpcMemSetImportPid(myName, peerPids.data(), peerPids.size()); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: SetImportPid -> %d", h->rank, static_cast(aret)); + aclrtFree(localBuf); + return -1; + } + if (!file_barrier(rootinfo, my_dr, subset_n, domain_barrier_tag(allocation_id, "auth_done"), run_token)) { + aclrtFree(localBuf); + return -1; + } + + out->rank = my_dr; + out->nranks = subset_n; + out->local_buf = localBuf; + // Build a host-side CommContext for the subset and upload it as device_ctx. + // PTO-ISA async SDMA ops (SdmaTget) read the scratch workspace off + // CommContext::workSpace. The dynamic-domain path does not go through + // comm_alloc_windows, so provision the workspace here; without it a + // freshly zero-initialized per-domain ctx would leave workSpace == 0 and + // those kernels early-return on the workSpace guard. + ensure_sdma_workspace(h); + + CommContext ctx{}; + ctx.rankId = domain_rank; + ctx.rankNum = static_cast(subset_n); + ctx.winSize = win_size; + ctx.workSpace = h->host_ctx.workSpace; + ctx.workSpaceSize = h->host_ctx.workSpaceSize; + ctx.windowsIn[my_dr] = reinterpret_cast(localBuf); + for (int p = 0; p < subset_n; ++p) { + if (p == my_dr) continue; + void *peerVa = nullptr; + aret = aclrtIpcMemImportByKey(&peerVa, peers[p].name, 0); + if (aret != ACL_SUCCESS) { + LOG_ERROR( + "[comm rank %d] alloc_domain: ImportByKey(peer_dr=%d pid=%d) -> %d", h->rank, p, peers[p].pid, + static_cast(aret) + ); + aclrtFree(localBuf); + return -1; + } + ctx.windowsIn[p] = reinterpret_cast(peerVa); + } + + void *newDevMem = nullptr; + aret = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: ctx aclrtMalloc -> %d", h->rank, static_cast(aret)); + aclrtFree(localBuf); + return -1; + } + aret = aclrtMemcpy(newDevMem, sizeof(CommContext), &ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: ctx Memcpy H2D -> %d", h->rank, static_cast(aret)); + aclrtFree(newDevMem); + aclrtFree(localBuf); + return -1; + } + out->device_ctx = reinterpret_cast(newDevMem); + return 0; +} + +} // namespace + +extern "C" int comm_alloc_windows(CommHandle h, size_t win_size, uint64_t *device_ctx_out) try { + if (!h || !device_ctx_out) return -1; + + // Idempotency guard: comm_alloc_windows is not re-entrant. The localBuf + // allocated by alloc_windows_via_ipc is owned by the handle's windowsIn[] + // entries and is only reclaimed at aclrtResetDevice; calling this twice + // would leak a full per-rank pool. device_ctx is set on first success. + if (h->device_ctx != nullptr) { + LOG_ERROR("[comm rank %d] comm_alloc_windows: already allocated on this handle", h->rank); + return -1; + } + + // Path D: DIY symmetric pool on stable ACL IPC + EnablePeerAccess. + // Replaced the prior HcclAllocComResourceByTiling reverse-parse path + // (broken on CANN 9.0 due to HcclOpResParam ABI drift; see project + // history). One backend, works on 8.5 and 9.0 unchanged. + const uint64_t effective_win_size = win_size != 0 ? static_cast(win_size) : kDefaultIpcWinSize; + if (alloc_windows_via_ipc(h, effective_win_size) != 0) return -1; + + // Optional PTO-ISA async SDMA workspace pre-allocation (overlays the comm + // backend's output; comm-side flow does not care about workSpace). No-op + // when SIMPLER_ENABLE_PTO_SDMA_WORKSPACE is undefined (this PR ships A + // without the macro; the overlay PR turns it on). + ensure_sdma_workspace(h); + + void *newDevMem = nullptr; + aclError aRet = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); + if (aRet != ACL_SUCCESS) return -1; + aRet = aclrtMemcpy(newDevMem, sizeof(CommContext), &h->host_ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); + if (aRet != ACL_SUCCESS) { + aclrtFree(newDevMem); + return -1; + } + h->device_ctx = reinterpret_cast(newDevMem); + h->owns_device_ctx = true; + *device_ctx_out = reinterpret_cast(h->device_ctx); + return 0; +} catch (const std::exception &e) { + LOG_ERROR("[comm] comm_alloc_windows: exception: %s", e.what()); + return -1; +} catch (...) { + LOG_ERROR("[comm] comm_alloc_windows: unknown exception"); + return -1; +} + +extern "C" int comm_get_local_window_base(CommHandle h, uint64_t *base_out) { + if (!h || !base_out) return -1; + *base_out = h->host_ctx.windowsIn[h->rank]; + return 0; +} + +extern "C" int comm_get_window_size(CommHandle h, size_t *size_out) { + if (!h || !size_out) return -1; + *size_out = static_cast(h->host_ctx.winSize); + return 0; +} + +extern "C" int comm_derive_context( + CommHandle h, const uint32_t *rank_ids, size_t rank_count, uint32_t domain_rank, size_t window_offset, + size_t window_size, uint64_t *device_ctx_out +) try { + if (!h || !rank_ids || !device_ctx_out) return -1; + if (h->host_ctx.rankNum == 0) { + LOG_ERROR("[comm rank %d] comm_derive_context: base windows are not allocated", h->rank); + return -1; + } + if (rank_count == 0 || rank_count > COMM_MAX_RANK_NUM || domain_rank >= rank_count) { + LOG_ERROR( + "[comm rank %d] comm_derive_context: invalid rank_count=%zu domain_rank=%u", h->rank, rank_count, + domain_rank + ); + return -1; + } + if (window_offset + window_size > static_cast(h->host_ctx.winSize)) { + LOG_ERROR( + "[comm rank %d] comm_derive_context: window range [%zu, %zu) exceeds base window size %llu", h->rank, + window_offset, window_offset + window_size, static_cast(h->host_ctx.winSize) + ); + return -1; + } + + CommContext ctx{}; + ctx.workSpace = h->host_ctx.workSpace; + ctx.workSpaceSize = h->host_ctx.workSpaceSize; + ctx.rankId = domain_rank; + ctx.rankNum = static_cast(rank_count); + ctx.winSize = window_size; + for (size_t i = 0; i < rank_count; ++i) { + uint32_t base_rank = rank_ids[i]; + if (base_rank >= static_cast(h->nranks)) { + LOG_ERROR( + "[comm rank %d] comm_derive_context: rank_ids[%zu]=%u out of range [0, %d)", h->rank, i, base_rank, + h->nranks + ); + return -1; + } + ctx.windowsIn[i] = h->host_ctx.windowsIn[base_rank] + window_offset; + ctx.windowsOut[i] = h->host_ctx.windowsOut[base_rank] + window_offset; + } + + void *newDevMem = nullptr; + aclError aRet = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] comm_derive_context: aclrtMalloc failed: %d", h->rank, static_cast(aRet)); + return -1; + } + aRet = aclrtMemcpy(newDevMem, sizeof(CommContext), &ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] comm_derive_context: aclrtMemcpy H2D failed: %d", h->rank, static_cast(aRet)); + aclrtFree(newDevMem); + return -1; + } + + auto *derived = reinterpret_cast(newDevMem); + h->derived_contexts.push_back(derived); + *device_ctx_out = reinterpret_cast(derived); + return 0; +} catch (const std::exception &e) { + LOG_ERROR("[comm] comm_derive_context: exception: %s", e.what()); + return -1; +} catch (...) { + LOG_ERROR("[comm] comm_derive_context: unknown exception"); + return -1; +} + +extern "C" int comm_barrier(CommHandle h) { + if (!h) return -1; + // HcclBarrier is synchronous — it blocks until all ranks arrive. + // Do NOT call aclrtSynchronizeStream after it: HcclBarrier internally + // switches the thread's ACL context, which invalidates the caller-owned + // stream for context-checked ACL calls (error 507018). + HcclResult hret = hccl_barrier(h->hccl_comm, h->stream); + if (hret != HCCL_SUCCESS) { + LOG_ERROR("[comm rank %d] HcclBarrier failed: %d", h->rank, static_cast(hret)); + return static_cast(hret); + } + return 0; +} + +extern "C" int comm_alloc_domain_windows( + CommHandle h, uint64_t allocation_id, const uint32_t *rank_ids, size_t rank_count, uint32_t domain_rank, + size_t window_size, uint64_t *device_ctx_out, uint64_t *local_window_base_out +) try { + if (!h || !rank_ids || !device_ctx_out || !local_window_base_out) return -1; + if (rank_count == 0 || rank_count > COMM_MAX_RANK_NUM || domain_rank >= rank_count || window_size == 0) { + LOG_ERROR( + "[comm rank %d] alloc_domain: bad args (rank_count=%zu domain_rank=%u window_size=%zu)", h->rank, + rank_count, domain_rank, window_size + ); + return -1; + } + if (h->domain_allocations.count(allocation_id) > 0) { + LOG_ERROR( + "[comm rank %d] alloc_domain: allocation_id=%llu already live", h->rank, + static_cast(allocation_id) + ); + return -1; + } + if (rank_ids[domain_rank] != static_cast(h->rank)) { + LOG_ERROR( + "[comm rank %d] alloc_domain: rank_ids[%u]=%u does not match base rank", h->rank, domain_rank, + rank_ids[domain_rank] + ); + return -1; + } + // The base communicator only needs comm_init to have run (rootinfo_path + // + run_token are set, used to scope barrier filenames). We do NOT + // require comm_alloc_windows on the base in the orch-only model — the + // dynamic alloc path does its own per-allocation aclrtMalloc + IPC dance. + if (h->rootinfo_path.empty() || h->hccl_comm == nullptr) { + LOG_ERROR("[comm rank %d] alloc_domain: base communicator not initialised", h->rank); + return -1; + } + + auto alloc = std::make_unique(); + int rc = domain_alloc_via_ipc(h, allocation_id, rank_ids, rank_count, domain_rank, window_size, alloc.get()); + if (rc != 0) return rc; + + // Zero the freshly-allocated local pool so kernels do not observe stale + // aclrtMalloc bytes (parity with the sim backend's memset). + aclError aret = aclrtMemset(alloc->local_buf, window_size, 0, window_size); + if (aret != ACL_SUCCESS) { + LOG_ERROR("[comm rank %d] alloc_domain: aclrtMemset -> %d", h->rank, static_cast(aret)); + aclrtFree(alloc->device_ctx); + aclrtFree(alloc->local_buf); + return -1; + } + + *device_ctx_out = reinterpret_cast(alloc->device_ctx); + *local_window_base_out = reinterpret_cast(alloc->local_buf); + h->domain_allocations.emplace(allocation_id, std::move(alloc)); + return 0; +} catch (const std::exception &e) { + LOG_ERROR("[comm] alloc_domain: exception: %s", e.what()); + return -1; +} catch (...) { + LOG_ERROR("[comm] alloc_domain: unknown exception"); + return -1; +} + +extern "C" int +comm_release_domain_windows(CommHandle h, uint64_t allocation_id, size_t rank_count, uint32_t domain_rank) try { + if (!h) return -1; + auto it = h->domain_allocations.find(allocation_id); + if (it == h->domain_allocations.end()) { + LOG_ERROR( + "[comm rank %d] release_domain: allocation_id=%llu not found", h->rank, + static_cast(allocation_id) + ); + return -1; + } + auto &alloc = it->second; + if (static_cast(alloc->nranks) != rank_count || static_cast(alloc->rank) != domain_rank) { + LOG_ERROR( + "[comm rank %d] release_domain: caller (rank_count=%zu, domain_rank=%u) " + "disagrees with alloc-time (nranks=%d, rank=%d)", + h->rank, rank_count, domain_rank, alloc->nranks, alloc->rank + ); + return -1; + } + int rc = 0; + // Best-effort subset barrier so peers don't free local memory under each + // other. If a peer crashed mid-allocation, the timeout returns false and + // we proceed with local teardown anyway — same shape as comm_destroy. + if (!file_barrier( + h->rootinfo_path, static_cast(domain_rank), static_cast(rank_count), + domain_barrier_tag(allocation_id, "release"), h->run_token + )) { + LOG_WARN("[comm rank %d] release_domain: barrier timed out; releasing local state anyway", h->rank); + rc = -1; + } + + if (alloc->device_ctx) { + aclError aret = aclrtFree(alloc->device_ctx); + if (aret != ACL_SUCCESS && rc == 0) rc = -1; + } + if (alloc->local_buf) { + aclError aret = aclrtFree(alloc->local_buf); + if (aret != ACL_SUCCESS && rc == 0) rc = -1; + } + h->domain_allocations.erase(it); + return rc; +} catch (const std::exception &e) { + LOG_ERROR("[comm] release_domain: exception: %s", e.what()); + return -1; +} catch (...) { + LOG_ERROR("[comm] release_domain: unknown exception"); + return -1; +} + +extern "C" int comm_destroy(CommHandle h) try { + if (!h) return -1; + + // Final barrier is best-effort: if a peer already crashed we still need to + // release the local resources we own, so timeout just logs and proceeds. + int rc = 0; + if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "destroy", h->run_token)) { + LOG_WARN("[comm rank %d] comm_destroy: final barrier timed out; releasing local state anyway", h->rank); + rc = -1; + } + + if (h->owns_device_ctx && h->device_ctx) { + aclrtFree(h->device_ctx); + } + for (CommContext *ctx : h->derived_contexts) { + if (ctx != nullptr) { + aclrtFree(ctx); + } + } + h->derived_contexts.clear(); + // Reclaim any still-live domain allocations as a safety net. Caller + // should release them explicitly via comm_release_domain_windows; this + // path runs only when an exception or shutdown bypassed that. + for (auto &kv : h->domain_allocations) { + auto &alloc = kv.second; + if (alloc->device_ctx) aclrtFree(alloc->device_ctx); + if (alloc->local_buf) aclrtFree(alloc->local_buf); + } + h->domain_allocations.clear(); + if (h->hccl_comm) { + HcclResult hret = hccl_comm_destroy(h->hccl_comm); + if (hret != HCCL_SUCCESS) { + LOG_ERROR("[comm rank %d] HcclCommDestroy failed: %d", h->rank, static_cast(hret)); + if (rc == 0) rc = -1; + } + } + + // NOTE: we do NOT destroy h->stream — it is caller-owned. + // We also do NOT call aclrtResetDevice / aclFinalize here. Device/ACL + // lifecycle belongs to DeviceRunner, whose finalize() releases all + // device memory before resetting the device and running aclFinalize. + + // Only rank 0 sweeps the on-disk handshake markers, and only if the + // final barrier succeeded. Deleting them after a timeout would strand + // any peer that hasn't observed our marker yet, and leak that peer + // into the next run with no rootinfo to discover. Letting cleanup + // ride on the next rank-0 init is the safer recovery path. + if (h->rank == 0 && rc == 0) { + cleanup_handshake_files(h->rootinfo_path); + } + + delete h; + return rc; +} catch (const std::exception &e) { + LOG_ERROR("[comm] comm_destroy: exception: %s", e.what()); + if (h) delete h; + return -1; +} catch (...) { + LOG_ERROR("[comm] comm_destroy: unknown exception"); + if (h) delete h; + return -1; +} diff --git a/src/a5/platform/onboard/host/device_runner.cpp b/src/a5/platform/onboard/host/device_runner.cpp index 8017f9b09..fdafb0348 100644 --- a/src/a5/platform/onboard/host/device_runner.cpp +++ b/src/a5/platform/onboard/host/device_runner.cpp @@ -292,6 +292,66 @@ void DeviceRunner::configure_aicore_op_timeout() { } } +int DeviceRunner::ensure_acl_ready(int device_id) { + if (device_id < 0) { + LOG_ERROR("ensure_acl_ready: invalid device_id %d", device_id); + return -1; + } + + // aclInit is process-wide; CANN returns ACL_ERROR_REPEAT_INITIALIZE if it + // has already been initialized (possibly by another owner), which we + // treat as success. + aclError aRet = aclInit(nullptr); + if (aRet != ACL_SUCCESS && static_cast(aRet) != ACL_ERROR_REPEAT_INITIALIZE) { + LOG_ERROR("aclInit failed: %d", static_cast(aRet)); + return static_cast(aRet); + } + + // ACL device binding is per-thread; every caller must still hit it. + aRet = aclrtSetDevice(device_id); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("aclrtSetDevice(%d) failed: %d", device_id, static_cast(aRet)); + return static_cast(aRet); + } + + // Record that we are responsible for aclFinalize at teardown. + acl_ready_ = true; + if (device_id_ < 0) device_id_ = device_id; + return 0; +} + +void *DeviceRunner::create_comm_stream() { + aclrtStream stream = nullptr; + aclError aRet = aclrtCreateStream(&stream); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("aclrtCreateStream failed: %d", static_cast(aRet)); + return nullptr; + } + return stream; +} + +int DeviceRunner::destroy_comm_stream(void *stream) { + if (stream == nullptr) return 0; + + // Best-effort teardown. HcclBarrier submits async work on the stream; + // if the caller never blocked for completion (or hit the HCCL 507018 + // barrier regression), aclrtDestroyStream will refuse with 507901 + // ("stream still has pending tasks"). We try to drain first, then + // destroy anyway, and log failures without propagating them — leaking + // a stream at teardown is strictly better than failing the teardown + // itself, which would block device finalization. This matches the + // cleanup behavior of the HCCL C++ hardware UT. + aclError sync_rc = aclrtSynchronizeStream(static_cast(stream)); + if (sync_rc != ACL_SUCCESS) { + LOG_ERROR("aclrtSynchronizeStream during stream teardown failed: %d", static_cast(sync_rc)); + } + aclError destroy_rc = aclrtDestroyStream(static_cast(stream)); + if (destroy_rc != ACL_SUCCESS) { + LOG_ERROR("aclrtDestroyStream failed (leaking stream): %d", static_cast(destroy_rc)); + } + return 0; +} + int DeviceRunner::prepare_run_context(int device_id) { int rc = attach_current_thread(device_id); if (rc != 0) { @@ -1001,10 +1061,30 @@ int DeviceRunner::finalize() { // Free all remaining allocations (including handshake buffer and binGmAddr) mem_alloc_.finalize(); - rc = rtDeviceReset(device_id_); - if (rc != 0) { - LOG_ERROR("rtDeviceReset(%d) failed during finalize: %d", device_id_, rc); - return rc; + // Reset device and finalize ACL AFTER all device memory is freed. When the + // ACL layer was brought up (comm path), aclrtResetDevice supersedes + // rtDeviceReset and additionally releases ACL's per-thread ref-count; + // calling raw rtDeviceReset in that state would leave ACL with stale + // bookkeeping. Pure rt-layer runtimes that never asked for ACL still get + // the bare rtDeviceReset. + if (acl_ready_ && device_id_ >= 0) { + int reset_rc = aclrtResetDevice(device_id_); + if (reset_rc != 0) { + LOG_ERROR("aclrtResetDevice(%d) failed during finalize: %d", device_id_, reset_rc); + rc = reset_rc; + } + int finalize_rc = aclFinalize(); + if (finalize_rc != 0) { + LOG_ERROR("aclFinalize failed during finalize: %d", finalize_rc); + if (rc == 0) rc = finalize_rc; + } + acl_ready_ = false; + } else { + rc = rtDeviceReset(device_id_); + if (rc != 0) { + LOG_ERROR("rtDeviceReset(%d) failed during finalize: %d", device_id_, rc); + return rc; + } } // Free the 8-byte device_wall buffer (allocated lazily in run()). @@ -1018,7 +1098,7 @@ int DeviceRunner::finalize() { aicore_kernel_binary_.clear(); LOG_INFO_V0("DeviceRunner finalized"); - return 0; + return rc; } int DeviceRunner::launch_aicpu_kernel(rtStream_t stream, KernelArgs *k_args, const char *kernel_name, int aicpu_num) { diff --git a/src/a5/platform/onboard/host/device_runner.h b/src/a5/platform/onboard/host/device_runner.h index cab1c2d8c..236b9f7bb 100644 --- a/src/a5/platform/onboard/host/device_runner.h +++ b/src/a5/platform/onboard/host/device_runner.h @@ -383,6 +383,47 @@ class DeviceRunner { */ int attach_current_thread(int device_id); + /** + * Make the ACL context ready on the current thread. + * + * Calls aclInit() once per process (subsequent calls are idempotent and + * tolerate the ACL_ERROR_REPEAT_INITIALIZE sentinel) and aclrtSetDevice() + * on the current thread. This is the entry point for consumers that need + * to call acl* / Hccl* APIs (for example the comm_hccl backend) but + * intentionally do not want those modules to own ACL lifecycle themselves. + * + * Symmetric with finalize(): aclrtResetDevice + aclFinalize run there. + * + * @param device_id Device ID to bind on the current thread. + * @return 0 on success, error code on failure. + */ + int ensure_acl_ready(int device_id); + + /** + * Create a caller-owned aclrtStream for comm_* usage. + * + * Intended to back the ChipWorker Python wrapper's internal stream + * ownership for distributed comm — callers pair it with + * destroy_comm_stream() at teardown. The ACL context must already be + * ready on the calling thread (ensure_acl_ready()). + * + * @return aclrtStream pointer on success, NULL on failure. + */ + void *create_comm_stream(); + + /** + * Destroy a stream previously returned by create_comm_stream(). + * Tolerates a nullptr stream. + * + * Best-effort: any failure from aclrtSynchronizeStream / + * aclrtDestroyStream is logged but not propagated, since leaking a + * stream at teardown is strictly better than blocking device + * finalization. + * + * @return Always 0. + */ + int destroy_comm_stream(void *stream); + /** * Ensure the current thread has fresh run-scoped streams. * @@ -517,6 +558,11 @@ class DeviceRunner { // Kernel binary management bool binaries_loaded_{false}; // true after AICPU SO loaded + // ACL lifecycle (process-wide). aclInit must run exactly once; ensure_acl_ready + // gates it behind this flag. finalize() drives aclFinalize only if we observed + // acl_ready_, so runtimes that never ask for ACL (e.g. pure rt-layer) stay unaffected. + bool acl_ready_{false}; + // Chip-callable buffer pool. Keyed by FNV-1a 64-bit content hash of the // ChipCallable bytes. Each entry owns one device GM allocation holding // the entire ChipCallable buffer (header + storage_, with each child's diff --git a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp index 21f919fd0..ee435d730 100644 --- a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp @@ -191,106 +191,37 @@ int finalize_device(DeviceContextHandle ctx) { } } -/* =========================================================================== - * ACL + comm_* placeholders (distributed runtime not yet implemented on a5) - * - * These exist only to satisfy ChipWorker's unconditional dlsym of the extension - * surface — the contract is "every host_runtime.so exports the full set; a - * runtime without a real implementation returns a not-supported result at - * call time" rather than having ChipWorker probe each symbol individually. - * When a5 grows real HCCL / sim distributed support these stubs get replaced - * wholesale; no ChipWorker changes are needed. - * =========================================================================== */ - int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { - (void)ctx; - (void)device_id; - return 0; + if (ctx == NULL) return -1; + try { + return static_cast(ctx)->ensure_acl_ready(device_id); + } catch (...) { + return -1; + } } +/* + * Stream creation/destruction exposed so the ChipWorker Python wrapper can + * drive comm_init end-to-end without leaking aclrtStream lifetime (or ACL + * libs) into Python. Both entries go through the DeviceRunner so the ACL + * ready-flag and device bookkeeping stay consistent with the normal run path. + */ void *create_comm_stream_ctx(DeviceContextHandle ctx) { - (void)ctx; - return NULL; + if (ctx == NULL) return NULL; + try { + return static_cast(ctx)->create_comm_stream(); + } catch (...) { + return NULL; + } } int destroy_comm_stream_ctx(DeviceContextHandle ctx, void *stream) { - (void)ctx; - (void)stream; - return 0; -} - -void *comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) { - (void)rank; - (void)nranks; - (void)stream; - (void)rootinfo_path; - return NULL; // distributed runtime not yet supported on a5 -} - -int comm_alloc_windows(void *handle, size_t win_size, uint64_t *device_ctx_out) { - (void)handle; - (void)win_size; - (void)device_ctx_out; - return -1; -} - -int comm_get_local_window_base(void *handle, uint64_t *base_out) { - (void)handle; - (void)base_out; - return -1; -} - -int comm_get_window_size(void *handle, size_t *size_out) { - (void)handle; - (void)size_out; - return -1; -} - -int comm_derive_context( - void *handle, const uint32_t *rank_ids, size_t rank_count, uint32_t domain_rank, size_t window_offset, - size_t window_size, uint64_t *device_ctx_out -) { - (void)handle; - (void)rank_ids; - (void)rank_count; - (void)domain_rank; - (void)window_offset; - (void)window_size; - (void)device_ctx_out; - return -1; -} - -int comm_alloc_domain_windows( - void *handle, uint64_t allocation_id, const uint32_t *rank_ids, size_t rank_count, uint32_t domain_rank, - size_t window_size, uint64_t *device_ctx_out, uint64_t *local_window_base_out -) { - (void)handle; - (void)allocation_id; - (void)rank_ids; - (void)rank_count; - (void)domain_rank; - (void)window_size; - (void)device_ctx_out; - (void)local_window_base_out; - return -1; -} - -int comm_release_domain_windows(void *handle, uint64_t allocation_id, size_t rank_count, uint32_t domain_rank) { - (void)handle; - (void)allocation_id; - (void)rank_count; - (void)domain_rank; - return -1; -} - -int comm_barrier(void *handle) { - (void)handle; - return -1; -} - -int comm_destroy(void *handle) { - (void)handle; - return -1; + if (ctx == NULL) return -1; + try { + return static_cast(ctx)->destroy_comm_stream(stream); + } catch (...) { + return -1; + } } int simpler_init( diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h index f1e476e8f..788652516 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ -#define SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ +#pragma once #include @@ -33,6 +32,7 @@ inline constexpr int32_t MAX_COMPLETIONS_PER_TASK = 64; #define COMPLETION_ENGINE_CCU 3u #define COMPLETION_TYPE_COUNTER 0 +#define COMPLETION_TYPE_SDMA_EVENT_RECORD 1 struct AICoreCompletionMailboxMessage { volatile uint64_t seq; @@ -78,5 +78,3 @@ struct AICoreCompletionMailbox { static_assert( sizeof(AICoreCompletionMailbox) % PTO2_ALIGN_SIZE == 0, "AICoreCompletionMailbox size must be cache-line aligned" ); - -#endif // SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_ diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h new file mode 100644 index 000000000..a242a0726 --- /dev/null +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_kernel.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#pragma once + +#include + +#include +#include + +#include "pto_async_kernel_api.h" +#include "aicore_completion_mailbox.h" +#include "pto_runtime_status.h" + +#ifndef __aicore__ +#define __aicore__ +#endif +#ifndef __gm__ +#define __gm__ +#endif + +// Re-exposed PTO-ISA constant so examples / callers don't need to include +// just to spell their scratch tile. +inline constexpr uint32_t SDMA_SCRATCH_ALIGNMENT = pto::comm::sdma::UB_ALIGN_SIZE; + +enum class SdmaOp : uint8_t { + TGET = 0, + TPUT = 1, +}; + +// SdmaRequestDescriptor bundles everything send_request_entry needs to drive +// one SDMA transfer + completion registration. It is a template because the +// destination / source / scratch types carry tensor shape & stride at compile +// time; the SdmaTget() / SdmaTput() helpers below let callers skip the +// template arguments. +// +// sync_id selects which event-record slot inside the workspace the engine +// writes into. Concurrent dispatches must use distinct sync_ids; today every +// caller submits one request per kernel invocation so passing 0 is safe. +// Future work (see .docs/25.comm-api-refactor/03.implementation-plan.md §5.2) +// will fold sync_id allocation into the adapter. +template +struct SdmaRequestDescriptor { + SdmaOp op; + DstTensor dst; + SrcTensor src; + ScratchTileT scratch; + __gm__ uint8_t *workspace; + uint32_t sync_id; +}; + +template +inline __aicore__ SdmaRequestDescriptor SdmaTget( + const DstTensor &dst, const SrcTensor &src, const ScratchTileT &scratch, __gm__ uint8_t *workspace, + uint32_t sync_id = 0 +) { + return SdmaRequestDescriptor{SdmaOp::TGET, dst, src, + scratch, workspace, sync_id}; +} + +template +inline __aicore__ SdmaRequestDescriptor SdmaTput( + const DstTensor &dst, const SrcTensor &src, const ScratchTileT &scratch, __gm__ uint8_t *workspace, + uint32_t sync_id = 0 +) { + return SdmaRequestDescriptor{SdmaOp::TPUT, dst, src, + scratch, workspace, sync_id}; +} + +namespace pto2::detail { + +inline __aicore__ void register_sdma_event_record(AsyncCtx &ctx, volatile __gm__ void *record_addr) { + CompletionToken token{ + reinterpret_cast(record_addr), 0, COMPLETION_ENGINE_SDMA, COMPLETION_TYPE_SDMA_EVENT_RECORD, 0 + }; + (void)register_completion_condition(ctx, token); +} + +template +inline __aicore__ void +register_pto_async_event(AsyncCtx &ctx, const PtoAsyncEvent &event, const PtoAsyncSession &session) { + if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr || ctx.completion_entries == nullptr) { + (void)event.Wait(session); + return; + } + if (event.handle == 0) { + return; + } + + const uint32_t engine = static_cast(event.engine); + if (engine != static_cast(::pto::comm::DmaEngine::SDMA)) { + defer_error(ctx, PTO2_ERROR_ASYNC_COMPLETION_INVALID); + return; + } + + ::pto::comm::sdma::detail::UbTmpBuf tmp_buf; + uint32_t sync_id = 0; + __gm__ uint8_t *recv_workspace = nullptr; + uint32_t queue_num = 0; + if (!::pto::comm::sdma::detail::PrepareEventCheck( + session.sdmaSession, tmp_buf, sync_id, recv_workspace, queue_num + )) { + defer_error(ctx, PTO2_ERROR_ASYNC_COMPLETION_INVALID); + return; + } + for (uint32_t queue_id = 0; queue_id < queue_num; ++queue_id) { + register_sdma_event_record(ctx, ::pto::comm::sdma::detail::GetEventRecord(recv_workspace, queue_id)); + } +} + +} // namespace pto2::detail + +// SDMA overload of the runtime's send_request_entry. Submits the descriptor +// to PTO-ISA, then registers the resulting AsyncEvent's GM flag(s) into the +// AsyncCtx deferred-wait slab and flushes. Returns false on submit/session +// failure (also records the error in ctx.completion_error_code). +template +inline __aicore__ bool +send_request_entry(AsyncCtx &ctx, SdmaRequestDescriptor desc) { + pto::comm::AsyncSession session; + if (!pto::comm::BuildAsyncSession(desc.scratch, desc.workspace, session, desc.sync_id)) { + pto2::detail::defer_error(ctx, PTO2_ERROR_ASYNC_COMPLETION_INVALID); + return false; + } + + pto::comm::AsyncEvent event; + if (desc.op == SdmaOp::TGET) { + event = pto::comm::TGET_ASYNC(desc.dst, desc.src, session); + } else { + event = pto::comm::TPUT_ASYNC(desc.dst, desc.src, session); + } + pto2::detail::register_pto_async_event(ctx, event, session); + pto2::detail::defer_flush(ctx); + return true; +} diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_scheduler.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_scheduler.h new file mode 100644 index 000000000..7ebb4d2d7 --- /dev/null +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/backend/sdma/sdma_completion_scheduler.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +#pragma once + +#include +#include + +#include "aicpu/platform_regs.h" +#include "aicore_completion_mailbox.h" +#include "pto_completion_token.h" +#include "pto_runtime_status.h" + +// runtime-side mirror of the PTO-ISA SdmaEventRecord. SDMA backend is the only +// allowed holder of this ABI knowledge; the generic scheduler dispatches into +// the helpers below through the completion ops table. +struct SdmaEventRecord { + uint32_t flag; + uint32_t sq_tail; + uint64_t channel_info; +}; + +static_assert(sizeof(SdmaEventRecord) == 16, "SDMA event record ABI drift"); +static_assert(offsetof(SdmaEventRecord, sq_tail) == 4, "SDMA event record ABI drift"); + +inline uintptr_t sdma_completion_cache_line(const volatile void *addr) { + return reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); +} + +inline CompletionPollResult poll_sdma_event_record(uint64_t record_addr) { + if (record_addr == 0) { + return {CompletionPollState::FAILED, PTO2_ERROR_ASYNC_COMPLETION_INVALID}; + } + volatile SdmaEventRecord *record = + reinterpret_cast(static_cast(record_addr)); + cache_invalidate_range(reinterpret_cast(sdma_completion_cache_line(record)), PTO2_ALIGN_SIZE); + uint32_t flag = __atomic_load_n(&record->flag, __ATOMIC_ACQUIRE); + return {flag != 0 ? CompletionPollState::READY : CompletionPollState::PENDING, PTO2_ERROR_NONE}; +} + +inline void retire_sdma_event_record(uint64_t record_addr) { + if (record_addr == 0) return; + volatile SdmaEventRecord *record = + reinterpret_cast(static_cast(record_addr)); + cache_invalidate_range(reinterpret_cast(sdma_completion_cache_line(record)), PTO2_ALIGN_SIZE); + uint32_t completed_tail = __atomic_load_n(&record->sq_tail, __ATOMIC_ACQUIRE); + uint64_t channel_info_addr = __atomic_load_n(&record->channel_info, __ATOMIC_ACQUIRE); + + volatile uint64_t *record_head = reinterpret_cast(record); + __atomic_store_n(record_head, 0ULL, __ATOMIC_RELEASE); + cache_flush_range(const_cast(reinterpret_cast(record_head)), sizeof(uint64_t)); + + if (channel_info_addr == 0) return; + // channel_info packs (head, tail) into one 64-bit word; head==tail at + // retire signals "queue fully drained" to the SDMA driver. + uint64_t packed = (static_cast(completed_tail) << 32) | static_cast(completed_tail); + volatile uint64_t *channel_info = reinterpret_cast(static_cast(channel_info_addr)); + __atomic_store_n(channel_info, packed, __ATOMIC_RELEASE); + cache_flush_range(const_cast(reinterpret_cast(channel_info)), sizeof(uint64_t)); +} diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h index 0efa07d0c..9d1bd7700 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_kernel_api.h @@ -9,8 +9,7 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef PTO_ASYNC_KERNEL_API_H -#define PTO_ASYNC_KERNEL_API_H +#pragma once #include @@ -29,9 +28,10 @@ #define __gm__ #endif -// Public surface: get_async_ctx, register_completion_condition, -// send_notification, save_expected_notification_counter. Everything else -// lives in pto2::detail and is reserved for backend adapters / internal use. +// Public surface: get_async_ctx, async_ctx_is_deferred, +// register_completion_condition, send_notification, +// save_expected_notification_counter. Everything else lives in +// pto2::detail and is reserved for backend adapters / internal use. namespace pto2::detail { inline __aicore__ void defer_load_slab(AsyncCtx &ctx) { @@ -44,6 +44,12 @@ inline __aicore__ void defer_load_slab(AsyncCtx &ctx) { #endif } +inline __aicore__ void defer_error(AsyncCtx &ctx, int32_t error_code) { + if (ctx.task_token.is_valid() && ctx.completion_error_code != nullptr) { + *ctx.completion_error_code = error_code; + } +} + inline __aicore__ void defer_flush_range(volatile __gm__ void *addr, uint32_t size_bytes) { if (addr == nullptr || size_bytes == 0) return; #if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__) @@ -91,11 +97,27 @@ inline __aicore__ void defer_flush(AsyncCtx &ctx) { inline __aicore__ AsyncCtx get_async_ctx(__gm__ int64_t *args) { __gm__ LocalContext *lc = reinterpret_cast<__gm__ LocalContext *>(static_cast(args[PAYLOAD_LOCAL_CONTEXT_INDEX])); - AsyncCtx ctx = lc->async_ctx; + // Field-by-field copy is mandatory: CCE rejects `AsyncCtx ctx = lc->async_ctx;` + // because there is no implicit constructor that crosses the __gm__ address + // space into Local Memory. When a new field is added to AsyncCtx, mirror it + // below or this kernel path will silently see zero for that field. + AsyncCtx ctx{}; + ctx.completion_count = lc->async_ctx.completion_count; + ctx.completion_error_code = lc->async_ctx.completion_error_code; + ctx.completion_entries = lc->async_ctx.completion_entries; + ctx.completion_capacity = lc->async_ctx.completion_capacity; + ctx.task_token.raw = lc->async_ctx.task_token.raw; pto2::detail::defer_load_slab(ctx); return ctx; } +inline __aicore__ bool async_ctx_is_deferred(const AsyncCtx &ctx) { return ctx.task_token.is_valid(); } + +// Canonical writer: backend submit handlers build a CompletionToken and pass +// it here. Writes one DeferredCompletionEntry to the AsyncCtx slab and +// bumps completion_count. Returns false on overflow (also stores +// PTO2_ERROR_ASYNC_WAIT_OVERFLOW in ctx.completion_error_code) or when ctx is +// not currently a deferred context. inline __aicore__ bool register_completion_condition(AsyncCtx &ctx, const CompletionToken &token) { if (ctx.task_token.is_invalid() || ctx.completion_count == nullptr || ctx.completion_entries == nullptr) { return false; @@ -134,5 +156,3 @@ save_expected_notification_counter(AsyncCtx &ctx, volatile __gm__ void *counter_ (void)register_completion_condition(ctx, token); pto2::detail::defer_flush(ctx); } - -#endif // PTO_ASYNC_KERNEL_API_H diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h index 23249c4f0..8bd8ef76b 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h @@ -9,15 +9,17 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef PTO_ASYNC_WAIT_H -#define PTO_ASYNC_WAIT_H +#pragma once #include #include #include +#include "aicpu/platform_regs.h" +#include "backend/sdma/sdma_completion_scheduler.h" #include "intrinsic.h" #include "aicore_completion_mailbox.h" +#include "pto_completion_token.h" #include "pto_runtime2_types.h" struct PTO2SchedulerState; @@ -34,36 +36,84 @@ inline bool aicore_completion_mailbox_has_pending(volatile AICoreCompletionMailb return tail < head; } -enum class CompletionPollState : uint8_t { - PENDING = 0, - READY = 1, - FAILED = 2, -}; +inline uintptr_t mailbox_cache_line(const volatile void *addr) { + return reinterpret_cast(addr) & ~(uintptr_t(PTO2_ALIGN_SIZE) - 1u); +} -struct CompletionPollResult { - CompletionPollState state{CompletionPollState::PENDING}; - int32_t error_code{PTO2_ERROR_NONE}; +struct CompletionCondition; + +using CompletionPollFn = CompletionPollResult (*)(const CompletionCondition &); +using CompletionRetireFn = void (*)(CompletionCondition &); + +struct CompletionBackendOps { + CompletionPollFn poll; + CompletionRetireFn retire; }; struct CompletionCondition { AsyncEngine engine{ASYNC_ENGINE_SDMA}; + int32_t completion_type{COMPLETION_TYPE_COUNTER}; bool satisfied{false}; + bool retired{false}; volatile uint32_t *counter_addr{nullptr}; + uint64_t addr{0}; uint32_t expected_value{0}; - CompletionPollResult test() const { - if (satisfied) { - return {CompletionPollState::READY, PTO2_ERROR_NONE}; - } - if (counter_addr == nullptr) { - return {CompletionPollState::FAILED, PTO2_ERROR_ASYNC_COMPLETION_INVALID}; - } - return { - *counter_addr >= expected_value ? CompletionPollState::READY : CompletionPollState::PENDING, PTO2_ERROR_NONE - }; - } + CompletionPollResult test() const; + void retire(); }; +// Per-completion-type ops. SDMA_EVENT_RECORD detail lives in +// backend/sdma/sdma_completion_scheduler.h; the op wrappers below are thin +// glue mapping CompletionCondition.addr into the backend's raw-addr helpers. +inline CompletionPollResult counter_poll_op(const CompletionCondition &cond) { + if (cond.counter_addr == nullptr) { + return {CompletionPollState::FAILED, PTO2_ERROR_ASYNC_COMPLETION_INVALID}; + } + return { + *cond.counter_addr >= cond.expected_value ? CompletionPollState::READY : CompletionPollState::PENDING, + PTO2_ERROR_NONE + }; +} + +inline void counter_retire_op(CompletionCondition & /*cond*/) {} + +inline CompletionPollResult sdma_event_record_poll_op(const CompletionCondition &cond) { + return poll_sdma_event_record(cond.addr); +} + +inline void sdma_event_record_retire_op(CompletionCondition &cond) { retire_sdma_event_record(cond.addr); } + +inline const CompletionBackendOps *completion_backend_ops_for(int completion_type) { + static const CompletionBackendOps kOps[] = { + {counter_poll_op, counter_retire_op}, // COMPLETION_TYPE_COUNTER = 0 + {sdma_event_record_poll_op, sdma_event_record_retire_op}, // COMPLETION_TYPE_SDMA_EVENT_RECORD = 1 + }; + constexpr int kOpsCount = static_cast(sizeof(kOps) / sizeof(kOps[0])); + if (completion_type < 0 || completion_type >= kOpsCount) return nullptr; + return &kOps[completion_type]; +} + +inline CompletionPollResult CompletionCondition::test() const { + if (satisfied) { + return {CompletionPollState::READY, PTO2_ERROR_NONE}; + } + const CompletionBackendOps *ops = completion_backend_ops_for(completion_type); + if (ops == nullptr || ops->poll == nullptr) { + return {CompletionPollState::FAILED, PTO2_ERROR_ASYNC_COMPLETION_INVALID}; + } + return ops->poll(*this); +} + +inline void CompletionCondition::retire() { + if (retired) return; + const CompletionBackendOps *ops = completion_backend_ops_for(completion_type); + if (ops != nullptr && ops->retire != nullptr) { + ops->retire(*this); + } + retired = true; +} + struct AsyncWaitEntry { PTO2TaskSlotState *slot_state{nullptr}; PTO2TaskId task_token{PTO2TaskId::invalid()}; @@ -158,7 +208,10 @@ struct AsyncWaitList { } CompletionCondition &cond = entry->conditions[entry->condition_count++]; cond.engine = engine; + cond.completion_type = COMPLETION_TYPE_COUNTER; cond.satisfied = false; + cond.retired = false; + cond.addr = addr; cond.counter_addr = reinterpret_cast(static_cast(addr)); cond.expected_value = expected_value; entry->waiting_completion_count++; @@ -186,7 +239,10 @@ struct AsyncWaitList { if (entry.condition_count < MAX_COMPLETIONS_PER_TASK) { CompletionCondition &cond = entry.conditions[entry.condition_count++]; cond.engine = pending_completions[i].engine; + cond.completion_type = COMPLETION_TYPE_COUNTER; cond.satisfied = false; + cond.retired = false; + cond.addr = pending_completions[i].addr; cond.counter_addr = reinterpret_cast(static_cast(pending_completions[i].addr)); cond.expected_value = pending_completions[i].expected_value; @@ -203,7 +259,8 @@ struct AsyncWaitList { enum class RegisterResult { Registered, NotDeferred, Skipped, Error }; bool append_condition_locked( - AsyncWaitEntry &entry, uint64_t addr, uint32_t expected_value, AsyncEngine engine, int32_t &error_code + AsyncWaitEntry &entry, uint64_t addr, uint32_t expected_value, AsyncEngine engine, int32_t completion_type, + int32_t &error_code ) { if (entry.condition_count >= MAX_COMPLETIONS_PER_TASK) { error_code = PTO2_ERROR_ASYNC_REGISTRATION_FAILED; @@ -211,8 +268,13 @@ struct AsyncWaitList { } CompletionCondition &cond = entry.conditions[entry.condition_count++]; cond.engine = engine; + cond.completion_type = completion_type; cond.satisfied = false; - cond.counter_addr = reinterpret_cast(static_cast(addr)); + cond.retired = false; + cond.addr = addr; + cond.counter_addr = completion_type == COMPLETION_TYPE_COUNTER ? + reinterpret_cast(static_cast(addr)) : + nullptr; cond.expected_value = expected_value; entry.waiting_completion_count++; return true; @@ -274,9 +336,14 @@ struct AsyncWaitList { for (uint32_t i = 0; i < deferred_count; ++i) { volatile DeferredCompletionEntry *deferred = &async_ctx.completion_entries[i]; + if (deferred->completion_type == COMPLETION_TYPE_COUNTER) { + volatile uint32_t *counter = + reinterpret_cast(static_cast(deferred->addr)); + cache_invalidate_range(reinterpret_cast(mailbox_cache_line(counter)), sizeof(uint32_t)); + } if (!append_condition_locked( *entry, deferred->addr, deferred->expected_value, static_cast(deferred->engine), - error_code + deferred->completion_type, error_code )) { unlock(); return RegisterResult::Error; @@ -299,5 +366,3 @@ struct AsyncWaitList { #endif ); }; - -#endif // PTO_ASYNC_WAIT_H diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h index 982d71a67..95ee0d5ff 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_completion_token.h @@ -9,12 +9,12 @@ * ----------------------------------------------------------------------------------------------------------- */ -#ifndef SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ -#define SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ +#pragma once #include #include "aicore_completion_mailbox.h" +#include "pto_runtime_status.h" // CompletionToken is the runtime-internal POD that backend submit handlers // produce and the generic register_completion_condition() consumes. It is the @@ -30,4 +30,13 @@ struct CompletionToken { uint64_t backend_cookie; }; -#endif // SRC_A5_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_ +enum class CompletionPollState : uint8_t { + PENDING = 0, + READY = 1, + FAILED = 2, +}; + +struct CompletionPollResult { + CompletionPollState state{CompletionPollState::PENDING}; + int32_t error_code{PTO2_ERROR_NONE}; +}; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 32887d0be..8b51e83c2 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -1076,9 +1076,17 @@ inline AsyncPollResult AsyncWaitList::poll_and_complete( for (int32_t i = count - 1; i >= 0; --i) { AsyncWaitEntry &entry = entries[i]; + uintptr_t last_invalidated_counter_line = static_cast(-1); for (int32_t c = 0; c < entry.condition_count; c++) { CompletionCondition &cond = entry.conditions[c]; if (cond.satisfied) continue; + if (cond.completion_type == COMPLETION_TYPE_COUNTER && cond.counter_addr != nullptr) { + uintptr_t counter_line = mailbox_cache_line(cond.counter_addr); + if (counter_line != last_invalidated_counter_line) { + cache_invalidate_range(reinterpret_cast(counter_line), sizeof(uint32_t)); + last_invalidated_counter_line = counter_line; + } + } CompletionPollResult poll = cond.test(); if (poll.state == CompletionPollState::FAILED) { result.error_code = poll.error_code; @@ -1088,6 +1096,7 @@ inline AsyncPollResult AsyncWaitList::poll_and_complete( } if (poll.state == CompletionPollState::READY) { cond.satisfied = true; + cond.retire(); entry.waiting_completion_count--; } } diff --git a/tests/ut/py/test_worker/test_dynamic_alloc_hw.py b/tests/ut/py/test_worker/test_dynamic_alloc_hw.py index 681182dad..5030098d8 100644 --- a/tests/ut/py/test_worker/test_dynamic_alloc_hw.py +++ b/tests/ut/py/test_worker/test_dynamic_alloc_hw.py @@ -37,9 +37,9 @@ @pytest.mark.requires_hardware -@pytest.mark.platforms(["a2a3"]) +@pytest.mark.platforms(["a2a3", "a5"]) @pytest.mark.device_count(2) -def test_two_rank_allocate_release_round_trip(st_device_ids): +def test_two_rank_allocate_release_round_trip(st_platform, st_device_ids): """End-to-end 2-rank hardware alloc + release round trip. Single allocation, one CommBufferSpec, both chips participate. Locks @@ -51,7 +51,7 @@ def test_two_rank_allocate_release_round_trip(st_device_ids): from simpler_setup.runtime_builder import RuntimeBuilder build = bool(os.environ.get("PTO_UT_BUILD")) - _ = RuntimeBuilder(platform="a2a3").get_binaries("tensormap_and_ringbuffer", build=build) + _ = RuntimeBuilder(platform=st_platform).get_binaries("tensormap_and_ringbuffer", build=build) assert len(st_device_ids) >= 2, "device_count(2) fixture must yield >= 2 ids" device_ids = [int(d) for d in st_device_ids[:2]] nranks = len(device_ids) @@ -84,7 +84,7 @@ def orch_fn(orch, _args, _cfg): worker = Worker( level=3, - platform="a2a3", + platform=st_platform, runtime="tensormap_and_ringbuffer", device_ids=device_ids, num_sub_workers=0, diff --git a/tests/ut/py/test_worker/test_platform_comm.py b/tests/ut/py/test_worker/test_platform_comm.py index 0144025a7..d7631a700 100644 --- a/tests/ut/py/test_worker/test_platform_comm.py +++ b/tests/ut/py/test_worker/test_platform_comm.py @@ -47,7 +47,7 @@ # --------------------------------------------------------------------------- # CommContext layout — must stay byte-compatible with -# src/a2a3/platform/include/common/comm_context.h (static_asserts there). +# src/common/platform_comm/comm_context.h (static_asserts there). # If CANN / HCCL ever shifts these offsets, comm_hccl.cpp's build-time asserts # will fail first; this struct mirrors them so the Python side can read back a # CommContext without rebuilding nanobind just to expose the layout. @@ -166,14 +166,14 @@ def _rank_entry( @pytest.mark.requires_hardware -@pytest.mark.platforms(["a2a3"]) +@pytest.mark.platforms(["a2a3", "a5"]) @pytest.mark.device_count(2) -def test_two_rank_comm_lifecycle(st_device_ids): +def test_two_rank_comm_lifecycle(st_platform, st_device_ids): """End-to-end 2-rank hardware smoke test for ChipWorker.comm_* wrappers.""" from simpler_setup.runtime_builder import RuntimeBuilder build = bool(os.environ.get("PTO_UT_BUILD")) - bins = RuntimeBuilder(platform="a2a3").get_binaries("tensormap_and_ringbuffer", build=build) + bins = RuntimeBuilder(platform=st_platform).get_binaries("tensormap_and_ringbuffer", build=build) assert len(st_device_ids) >= 2, "device_count(2) fixture must yield >= 2 ids" nranks = 2 rootinfo_path = f"/tmp/pto_comm_py_ut_rootinfo_{os.getpid()}.bin"