Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
628 changes: 628 additions & 0 deletions docs/L3-L2-host-device-communication.md

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions docs/dynamic-linking.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Python process (ChipWorker)
| +-- rtRegisterAllKernel(aicore_binary) ← CANN kernel registration
| +-- rtAicpuKernelLaunchExWithArgs(...) ← device-side execution
|
+-- dlopen("libascend_hal.so", RTLD_NOW | RTLD_LOCAL) ← CANN HAL (profiling only)
+-- dlopen("libascend_hal.so", RTLD_NOW | RTLD_LOCAL) ← CANN HAL
```

Key difference: onboard does **not** dlopen AICPU/AICore as host-side SOs.
Expand Down Expand Up @@ -93,9 +93,11 @@ execution.

### CANN HAL: `RTLD_NOW | RTLD_LOCAL`

`libascend_hal.so` is loaded only for performance profiling (SVM memory
mapping via `halHostRegister`/`halHostUnregister`). The handle is cached
in a file-scope `g_hal_handle` and never explicitly dlclosed.
`libascend_hal.so` is loaded for onboard HAL services that need SVM memory
mapping, including performance profiling buffers and a2a3
`HostDeviceMappedRegion` host mappings via
`halHostRegister`/`halHostUnregister`. The handle is cached in a file-scope
`g_hal_handle` and never explicitly dlclosed.

## All dlsym(RTLD_DEFAULT) Calls

Expand Down
51 changes: 50 additions & 1 deletion docs/worker-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,56 @@ mailbox_size_ = HEADER_SIZE // 8 B (state + error)

Per-worker total: ~2 KB. Typical pool: 4-8 workers → ~8-16 KB shm total.

### 3.4 Shutdown
### 3.4 Control-plane commands

The mailbox is also the per-child control channel. When the parent writes
`CONTROL_REQUEST`, offset 8 carries a `CTRL_*` sub-command instead of a task
callable id. The child loop handles the command in the same polling state
machine as `TASK_READY`, writes `MAILBOX_OFF_ERROR` / `MAILBOX_OFF_ERROR_MSG`
and any scalar result, then publishes `CONTROL_DONE`.

Task dispatch and control commands share one mailbox. Parent-side
`dispatch_process()` and every `control_*()` method serialize on the same
`mailbox_mu_`, so a control request issued while a task is running waits for
that task's mailbox round trip to finish before it claims the state field.
This is a WorkerManager-level RPC contract; individual features only define
their own `CTRL_*` sub-command and payload schema.

The fixed control slot layout is:

```text
offset 8: uint64 control sub-command
offset 16: uint64 arg0
offset 24: uint64 arg1
offset 32: uint64 arg2
offset 40: uint64 arg3
offset 48: uint64 result
```

The meaning of `arg0..arg3` is sub-command-specific. Commands that return one
scalar or pointer write it at `CTRL_OFF_RESULT`; commands with larger request
or reply payloads pass fixed-width POSIX shared-memory names through
`MAILBOX_OFF_ARGS`.

Current control-plane users include:

- Device memory control from the orchestrator:
`CTRL_MALLOC`, `CTRL_FREE`, `CTRL_COPY_TO`, and `CTRL_COPY_FROM`.
- Callable lifecycle control:
`CTRL_PREPARE`, `CTRL_REGISTER`, `CTRL_UNREGISTER`,
`CTRL_PY_REGISTER`, and `CTRL_PY_UNREGISTER`.
- Communication-domain setup:
`CTRL_COMM_INIT`, `CTRL_ALLOC_DOMAIN`, and `CTRL_RELEASE_DOMAIN`.
- Host/device mapped-region operations:
`CTRL_OPEN_MAPPED_REGION`, `CTRL_CLOSE_MAPPED_REGION`,
`CTRL_MAPPED_REGION_INFO`, datacopy, notify, and wait commands.

When adding a new control command, keep the mailbox fields limited to small
fixed arguments and move variable-sized payloads into side-band shared memory.
The child must always publish `CONTROL_DONE` with a clear error code/message
before the parent releases the mailbox back to `IDLE`.

### 3.5 Shutdown

`WorkerManager::shutdown_children()` writes `SHUTDOWN` to every registered
mailbox; each child loop sees it on its next poll and exits. The Python
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) PyPTO Contributors.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
* -----------------------------------------------------------------------------------------------------------
*/

#include <cstdint>

#include <pto/pto-inst.hpp>

#ifndef __gm__
#define __gm__
#endif
#ifndef __aicore__
#define __aicore__ [aicore]
#endif

#include "pipe_sync.h"

static constexpr uint64_t kCacheLineBytes = 64;
static constexpr uint32_t kMaxPollIters = 1024U;

static inline __aicore__ void flush_range(volatile __gm__ void *addr, uint64_t size_bytes) {
#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__)
uintptr_t start = reinterpret_cast<uintptr_t>(addr) & ~(uintptr_t(kCacheLineBytes) - 1u);
uintptr_t end =
(reinterpret_cast<uintptr_t>(addr) + size_bytes + kCacheLineBytes - 1u) & ~(uintptr_t(kCacheLineBytes) - 1u);
for (uintptr_t p = start; p < end; p += kCacheLineBytes) {
dcci((__gm__ int32_t *)p, SINGLE_CACHE_LINE, CACHELINE_OUT);
}
#if defined(__CPU_SIM)
dsb(0);
#else
dsb(DSB_DDR);
#endif
pipe_barrier(PIPE_ALL);
#else
(void)addr;
(void)size_bytes;
__asm__ __volatile__("" ::: "memory");
#endif
}

static inline __aicore__ void invalidate_range(volatile __gm__ void *addr, uint64_t size_bytes) {
#if defined(__CCE_KT_TEST__) || defined(__CCE_AICORE__) || defined(__DAV_C220__)
uintptr_t start = reinterpret_cast<uintptr_t>(addr) & ~(uintptr_t(kCacheLineBytes) - 1u);
uintptr_t end =
(reinterpret_cast<uintptr_t>(addr) + size_bytes + kCacheLineBytes - 1u) & ~(uintptr_t(kCacheLineBytes) - 1u);
for (uintptr_t p = start; p < end; p += kCacheLineBytes) {
dcci((__gm__ int32_t *)p, SINGLE_CACHE_LINE);
}
#if defined(__CPU_SIM)
dsb(0);
#else
dsb(DSB_DDR);
#endif
#else
(void)addr;
(void)size_bytes;
__asm__ __volatile__("" ::: "memory");
#endif
}

static inline __aicore__ volatile __gm__ uint32_t *signal_slot(__gm__ uint8_t *signal_base, uint32_t signal_id) {
return reinterpret_cast<volatile __gm__ uint32_t *>(signal_base + signal_id * kCacheLineBytes);
}

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) {
auto *data = reinterpret_cast<__gm__ uint8_t *>(static_cast<uint64_t>(args[0]));
auto *signal_base = reinterpret_cast<__gm__ uint8_t *>(static_cast<uint64_t>(args[1]));
auto *signal0 = signal_slot(signal_base, 0);
auto *signal1 = signal_slot(signal_base, 1);
uint32_t seq = static_cast<uint32_t>(args[2]);
uint32_t nbytes = static_cast<uint32_t>(args[3]);

bool observed = false;
for (uint32_t i = 0; i < kMaxPollIters; ++i) {
invalidate_range(signal0, kCacheLineBytes);
if (*signal0 >= seq) {
observed = true;
break;
}
}

invalidate_range(data, nbytes);
for (uint32_t i = 0; i < nbytes; ++i) {
uint8_t mask = observed ? static_cast<uint8_t>(seq + i * 3U) : static_cast<uint8_t>(0xA5U);
data[nbytes + i] = static_cast<uint8_t>(data[i] ^ mask);
}
flush_range(data + nbytes, nbytes);

*signal1 = seq;
flush_range(signal1, kCacheLineBytes);
}
Comment on lines +81 to +99
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't signal success after the poll loop times out.

If signal0 never reaches seq, this still writes transformed output and then publishes signal1 = seq. That hides real notify/cache failures and can make the host accept corrupted data as a successful round trip. Bail out, or publish a distinct error sentinel, instead of reusing the success signal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@examples/a2a3/tensormap_and_ringbuffer/host_device_mapped_region_round_trip/kernels/aiv/host_device_mapped_region_round_trip.cpp`
around lines 81 - 99, The poll loop using signal0 (kMaxPollIters) may time out
but the code still writes transformed output and sets signal1 to seq, hiding
failures; modify the tail of the function to check the observed flag and if
observed is false then do not publish the success seq: either return immediately
(bail out) before transforming/writing output, or write a distinct error
sentinel to *signal1 (e.g., a reserved value different from seq) and call
flush_range(signal1, kCacheLineBytes) so the host can detect the failure; ensure
any writes to data/nbytes and flush_range(data + nbytes, nbytes) only occur when
observed is true and keep references to signal0, signal1, seq, data, nbytes,
kMaxPollIters and kCacheLineBytes to locate the changes.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) PyPTO Contributors.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
* -----------------------------------------------------------------------------------------------------------
*/

#include "pto_orchestration_api.h" // NOLINT(build/include_subdir)

extern "C" {

__attribute__((visibility("default"))) PTO2OrchestrationConfig
host_device_mapped_region_round_trip_config(const ChipStorageTaskArgs &orch_args) {
(void)orch_args;
return PTO2OrchestrationConfig{.expected_arg_count = 4};
}

__attribute__((visibility("default"))) PTO2OrchestrationConfig
aicpu_orchestration_config(const ChipStorageTaskArgs &orch_args) {
return host_device_mapped_region_round_trip_config(orch_args);
}

__attribute__((visibility("default"))) void host_device_mapped_region_round_trip_orch(const ChipStorageTaskArgs &orch_args) {
Arg args;
args.add_scalar(orch_args.scalar(0));
args.add_scalar(orch_args.scalar(1));
args.add_scalar(orch_args.scalar(2));
args.add_scalar(orch_args.scalar(3));
rt_submit_aiv_task(0, args);
}

} // extern "C"
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Copyright (c) PyPTO Contributors.
# This program is free software, you can redistribute it and/or modify it under the terms and conditions of
# CANN Open Software License Agreement Version 2.0 (the "License").
# Please refer to the License for details. You may not use this file except in compliance with the License.
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
# See LICENSE in the root of the software repository for the full text of the License.
# -----------------------------------------------------------------------------------------------------------
"""Host CPU to device NPU round-trip through HostDeviceMappedRegion."""

from __future__ import annotations

import argparse
import os
import sys
from pathlib import Path

from simpler.task_interface import ArgDirection, CallConfig, ChipCallable, CoreCallable, TaskArgs
from simpler.worker import Worker
from simpler_setup.elf_parser import extract_text_section
from simpler_setup.kernel_compiler import KernelCompiler
from simpler_setup.pto_isa import ensure_pto_isa_root
from simpler_setup.runtime_builder import RuntimeBuilder


HERE = Path(__file__).resolve().parent
KERNEL_DIR = HERE / "kernels"
RUNTIME = "tensormap_and_ringbuffer"
DEFAULT_DATA_BYTES = 256
DEFAULT_ITERS = 10


def _build_callable(platform: str) -> ChipCallable:
kc = KernelCompiler(platform=platform)
pto_isa_root = ensure_pto_isa_root(clone_protocol="https")
include_dirs = kc.get_orchestration_include_dirs(RUNTIME)

incore = kc.compile_incore(
source_path=str(KERNEL_DIR / "aiv" / "host_device_mapped_region_round_trip.cpp"),
core_type="aiv",
pto_isa_root=pto_isa_root,
extra_include_dirs=include_dirs,
)
if not platform.endswith("sim"):
incore = extract_text_section(incore)

orch = kc.compile_orchestration(
runtime_name=RUNTIME,
source_path=str(KERNEL_DIR / "orchestration" / "host_device_mapped_region_round_trip_orch.cpp"),
)
return ChipCallable.build(
signature=[ArgDirection.IN, ArgDirection.IN, ArgDirection.IN, ArgDirection.IN],
func_name="host_device_mapped_region_round_trip_orch",
binary=orch,
children=[(0, CoreCallable.build(signature=[], binary=incore))],
)


def _pattern(seq: int, data_bytes: int) -> bytes:
return bytes(((seq * 17 + i * 5) & 0xFF) for i in range(data_bytes))


def _expected(seq: int, payload: bytes) -> bytes:
return bytes((b ^ ((seq + i * 3) & 0xFF)) for i, b in enumerate(payload))


def run(
platform: str,
device_id: int,
*,
build: bool = False,
iters: int = DEFAULT_ITERS,
data_bytes: int = DEFAULT_DATA_BYTES,
) -> None:
if platform not in {"a2a3sim", "a2a3"}:
raise ValueError(f"unsupported platform: {platform}")
if iters <= 0:
raise ValueError("iters must be positive")
if data_bytes <= 0:
raise ValueError("data_bytes must be positive")

os.environ["PTO_ISA_ROOT"] = ensure_pto_isa_root(clone_protocol="https")
RuntimeBuilder(platform=platform).get_binaries(RUNTIME, build=build)
chip_callable = _build_callable(platform)

worker = Worker(level=2, platform=platform, runtime=RUNTIME, device_id=device_id, build=build)
worker.init()
region = None
try:
chip_cid = worker.register(chip_callable)
region = worker.open_mapped_region(data_bytes * 2, signal_count=2)
info = worker.mapped_region_info(region)
assert info.host_data_ptr == 0
assert info.host_signal_ptr == 0
assert info.device_data_ptr != 0
assert info.device_signal_ptr != 0

cfg = CallConfig()
cfg.block_dim = 1
cfg.aicpu_thread_num = 2

for seq in range(1, iters + 1):
payload = _pattern(seq, data_bytes)
worker.mapped_region_datacopy_h2region(region, 0, payload)
worker.mapped_region_notify(region, 0, seq)

args = TaskArgs()
args.add_scalar(info.device_data_ptr)
args.add_scalar(info.device_signal_ptr)
args.add_scalar(seq)
args.add_scalar(data_bytes)
worker.run(chip_cid, args, cfg)

worker.mapped_region_wait(region, 1, seq, 1_000_000)
got = worker.mapped_region_datacopy_region2h(region, data_bytes, data_bytes)
assert got == _expected(seq, payload)
finally:
if region is not None:
worker.close_mapped_region(region)
worker.close()


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-p", "--platform", required=True, choices=["a2a3sim", "a2a3"])
parser.add_argument("-d", "--device", type=int, default=0)
parser.add_argument("--build", action="store_true", help="Rebuild runtime from source.")
parser.add_argument("--iters", type=int, default=DEFAULT_ITERS)
parser.add_argument("--data-bytes", type=int, default=DEFAULT_DATA_BYTES)
return parser.parse_args()


def main() -> int:
args = parse_args()
run(args.platform, args.device, build=args.build, iters=args.iters, data_bytes=args.data_bytes)
print(
"[host_device_mapped_region_round_trip] "
f"platform={args.platform} device={args.device} iters={args.iters} data_bytes={args.data_bytes} PASSED"
)
return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading