Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,053 changes: 606 additions & 447 deletions src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,62 @@ extern "C" int bind_prepared_to_runtime_impl(
}
int64_t t_args_end = _now_ms();


// (Timing) Specify whether to perform timing analysis inside the aicpu executor
{
const char *env_timing_iterations = std::getenv("PTO2_KERNEL_TIMING_ENABLED");
if (env_timing_iterations) {
std::string env_timing_iterations_string = std::string(env_timing_iterations);
bool isValidValue = false;
if (env_timing_iterations_string == "True") { runtime->is_timing_enabled = true; isValidValue = true; }
if (env_timing_iterations_string == "False") { runtime->is_timing_enabled = false; isValidValue = true; }
if (isValidValue == false)
{
LOG_WARN("PTO2_KERNEL_TIMING_ENABLED=%s is invalid, using default: \"False\"", env_timing_iterations);
runtime->is_timing_enabled = false;
}
}
Comment on lines +232 to +241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The environment variable parsing for PTO2_KERNEL_TIMING_ENABLED is brittle as it only accepts exact case-sensitive matches for 'True' or 'False'. It would be more robust to support a wider range of boolean representations (e.g., '1', '0', 'true', 'false', 'on', 'off') and perform case-insensitive comparisons.

LOG_INFO_V0("Is kernel timing enabled? %s", runtime->is_timing_enabled ? "True" : "False");
}

// (Timing) Specify how many warmup runs inside the aicpu executor, if timing is enabled
if (runtime->is_timing_enabled == true)
{
const char *env_warmup_iterations = std::getenv("PTO2_KERNEL_TIMING_WARMUP_COUNT");
if (env_warmup_iterations) {
char *endptr;
int64_t val = strtol(env_warmup_iterations, &endptr, 10);
if (endptr != env_warmup_iterations && *endptr == '\0') {
runtime->warmup_iteration_count = static_cast<int>(val);
} else {
LOG_WARN(
"PTO2_KERNEL_TIMING_WARMUP_COUNT=%s is invalid, using default %d", env_warmup_iterations, RUNTIME_DEFAULT_WARMUP_ITERATION_COUNT
);
runtime->warmup_iteration_count = RUNTIME_DEFAULT_WARMUP_ITERATION_COUNT;
}
}
LOG_INFO_V0("Warmup iteration count: %d", runtime->warmup_iteration_count);
}

// (Timing) Specify how many timing runs inside the aicpu executor, if timing is enabled
if (runtime->is_timing_enabled == true)
{
const char *env_timing_iterations = std::getenv("PTO2_KERNEL_TIMING_ITERATION_COUNT");
if (env_timing_iterations) {
char *endptr;
int64_t val = strtol(env_timing_iterations, &endptr, 10);
if (endptr != env_timing_iterations && *endptr == '\0') {
runtime->timing_iteration_count = static_cast<int>(val);
} else {
LOG_WARN(
"PTO2_KERNEL_TIMING_ITERATION_COUNT=%s is invalid, using default %d", env_timing_iterations, RUNTIME_DEFAULT_TIMING_ITERATION_COUNT
);
runtime->timing_iteration_count = RUNTIME_DEFAULT_TIMING_ITERATION_COUNT;
}
}
LOG_INFO_V0("Timing iteration count: %d", runtime->timing_iteration_count);
}

// Read ready queue shard count from environment for AICPU scheduler
{
const char *env_shards = std::getenv("PTO2_READY_QUEUE_SHARDS");
Expand Down
13 changes: 12 additions & 1 deletion src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@

// Default ready queue shards: one shard per worker thread (total minus orchestrator)
constexpr int RUNTIME_DEFAULT_READY_QUEUE_SHARDS = PLATFORM_MAX_AICPU_THREADS - 1;
constexpr int RUNTIME_DEFAULT_WARMUP_ITERATION_COUNT = 0;
constexpr int RUNTIME_DEFAULT_TIMING_ITERATION_COUNT = 0;

// =============================================================================
// Data Structures
Expand Down Expand Up @@ -200,6 +202,11 @@ class Runtime {
// When false (default), orchestrator threads exit after orchestration without dispatching tasks.
// Controlled via PTO2_ORCH_TO_SCHED environment variable.
bool orch_to_sched;

// Timing parameters (for precise performance estimation)
bool is_timing_enabled;
int warmup_iteration_count;
int timing_iteration_count;

private:
// Kernel binary tracking for cleanup
Expand All @@ -224,7 +231,6 @@ class Runtime {
bool register_new_callable_id_;
char device_orch_func_name_[RUNTIME_MAX_ORCH_SYMBOL_NAME];
char device_orch_config_name_[RUNTIME_MAX_ORCH_SYMBOL_NAME];

public:
/**
* Constructor - zero-initialize all arrays
Expand All @@ -235,6 +241,10 @@ class Runtime {
// Performance Profiling
// =========================================================================

inline bool get_timing_enabled() const { return is_timing_enabled; };
inline int32_t get_warmup_iteration_count() const { return warmup_iteration_count; };
inline int32_t get_timing_iteration_count() const { return timing_iteration_count; };

// =========================================================================
// Device orchestration (for AICPU thread 3)
// =========================================================================
Expand All @@ -258,6 +268,7 @@ class Runtime {
void set_active_callable_id(int32_t callable_id, bool is_new);
int32_t get_active_callable_id() const;
bool register_new_callable_id() const;
void notify_callable_id_registered();
void set_device_orch_func_name(const char *name);
const char *get_device_orch_func_name() const;
void set_device_orch_config_name(const char *name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ void PTO2SchedulerState::destroy() {

void PTO2SchedulerState::print_stats() {
PTO2SchedulerState *sched = this;
LOG_INFO_V0("=== Scheduler Statistics ===");
LOG_INFO_V9("=== Scheduler Statistics ===");
for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) {
if (sched->ring_sched_states[r].last_task_alive > 0) {
LOG_INFO_V0("Ring %d:", r);
LOG_INFO_V0(" last_task_alive: %d", sched->ring_sched_states[r].last_task_alive);
LOG_INFO_V9("Ring %d:", r);
LOG_INFO_V9(" last_task_alive: %d", sched->ring_sched_states[r].last_task_alive);
auto &dp = sched->ring_sched_states[r].dep_pool;
if (dp.top > 0) {
LOG_INFO_V0(
LOG_INFO_V9(
" dep_pool: top=%d tail=%d used=%d high_water=%d capacity=%d", dp.top, dp.tail, dp.top - dp.tail,
dp.high_water, dp.capacity
);
Expand All @@ -231,19 +231,19 @@ void PTO2SchedulerState::print_stats() {
LOG_INFO_V0("tasks_completed: %lld", (long long)sched->tasks_completed.load(std::memory_order_relaxed));
LOG_INFO_V0("tasks_consumed: %lld", (long long)sched->tasks_consumed.load(std::memory_order_relaxed));
#endif
LOG_INFO_V0("============================");
LOG_INFO_V9("============================");
}

void PTO2SchedulerState::print_queues() {
PTO2SchedulerState *sched = this;
LOG_INFO_V0("=== Ready Queues ===");
LOG_INFO_V9("=== Ready Queues ===");

const char *shape_names[] = {"AIC", "AIV", "MIX"};

for (int i = 0; i < PTO2_NUM_RESOURCE_SHAPES; i++) {
LOG_INFO_V0(" %s: count=%" PRIu64, shape_names[i], sched->ready_queues[i].size());
LOG_INFO_V9(" %s: count=%" PRIu64, shape_names[i], sched->ready_queues[i].size());
}
LOG_INFO_V0(" DUMMY: count=%" PRIu64, sched->dummy_ready_queue.size());
LOG_INFO_V9(" DUMMY: count=%" PRIu64, sched->dummy_ready_queue.size());

LOG_INFO_V0("====================");
LOG_INFO_V9("====================");
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class SchedulerContext {
// Called by AicpuExecutor::deinit() during per-run teardown.
void deinit();

void initializePerfCounters();

// =========================================================================
// Per-thread execution entry points (called by AicpuExecutor::run)
// =========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ void SchedulerContext::dispatch_shape(
int32_t thread_idx, PTO2ResourceShape shape, CoreTracker::DispatchPhase phase, PTO2LocalReadyBuffer &local_buf,
CoreTracker &tracker, bool &entered_drain, bool &made_progress, bool &try_pushed
) {

#if PTO2_SCHED_PROFILING
auto &l2_perf = sched_l2_perf_[thread_idx];
#endif
Expand All @@ -270,15 +271,18 @@ void SchedulerContext::dispatch_shape(
if (slot_state->active_mask.requires_sync_start()) {
if (is_pending) {
sched_->ready_queues[static_cast<int32_t>(shape)].push(slot_state);
LOG_INFO_V9("Thread %d - Pushed task", thread_idx);
continue;
}
int32_t available = cores.count();
if (available < slot_state->logical_block_num) {
if (!enter_drain_mode(slot_state, slot_state->logical_block_num)) {
sched_->ready_queues[static_cast<int32_t>(shape)].push(slot_state);
LOG_INFO_V9("Thread %d - Pushed task", thread_idx);
}
for (int rem = bi + 1; rem < got; rem++) {
sched_->ready_queues[static_cast<int32_t>(shape)].push(batch[rem]);
LOG_INFO_V9("Thread %d - Pushed task", thread_idx);
}
entered_drain = true;
break;
Expand All @@ -287,6 +291,7 @@ void SchedulerContext::dispatch_shape(

if (!cores.has_value()) {
sched_->ready_queues[static_cast<int32_t>(shape)].push_batch(&batch[bi], got - bi);
LOG_INFO_V9("Thread %d - Pushed task", thread_idx);
break;
}

Expand All @@ -309,6 +314,7 @@ void SchedulerContext::dispatch_shape(

if (slot_state->next_block_idx < slot_state->logical_block_num) {
sched_->ready_queues[static_cast<int32_t>(shape)].push(slot_state);
LOG_INFO_V9("Thread %d - Pushed task", thread_idx);
}

for (int32_t b = 0; b < claim; b++) {
Expand Down Expand Up @@ -448,6 +454,37 @@ void SchedulerContext::dispatch_ready_tasks(
}
}


void SchedulerContext::initializePerfCounters()
{
// One-time init: assign perf buffers (one thread does it; others wait)
if (!pto2_init_done_.exchange(true, std::memory_order_acq_rel))
{
LOG_INFO_V0("Initializing scheduler perf counters");

#if PTO2_PROFILING
if (is_dump_tensor_enabled()) {
dump_tensor_init(orch_to_sched_ ? aicpu_thread_num_ : sched_thread_num_);
}
#endif

#if PTO2_PROFILING
// Initialize PMU: program events, start counters, and pop initial buffers
if (is_pmu_enabled()) {
pmu_aicpu_init(physical_core_ids_, cores_total_num_);
LOG_INFO_V0("PMU profiling started on %d cores", cores_total_num_);
}
#endif

LOG_INFO_V0("Initialized scheduler perf counters");
pto2_init_complete_.store(true, std::memory_order_release);
} else {
while (!pto2_init_complete_.load(std::memory_order_acquire)) {
SPIN_WAIT_HINT();
}
}
}

// =============================================================================
// Main scheduler dispatch loop
// =============================================================================
Expand All @@ -474,32 +511,6 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_
static_cast<uint64_t>(header->rings[0].task_window_size)
);

// One-time init: assign perf buffers (one thread does it; others wait)
if (!pto2_init_done_.exchange(true, std::memory_order_acq_rel)) {
LOG_INFO_V0("Thread %d: doing one-time init", thread_idx);

#if PTO2_PROFILING
if (is_dump_tensor_enabled()) {
dump_tensor_init(orch_to_sched_ ? aicpu_thread_num_ : sched_thread_num_);
}
#endif

#if PTO2_PROFILING
// Initialize PMU: program events, start counters, and pop initial buffers
if (is_pmu_enabled()) {
pmu_aicpu_init(physical_core_ids_, cores_total_num_);
LOG_INFO_V0("PMU profiling started on %d cores", cores_total_num_);
}
#endif

LOG_INFO_V0("Thread %d: one-time init done", thread_idx);
pto2_init_complete_.store(true, std::memory_order_release);
} else {
while (!pto2_init_complete_.load(std::memory_order_acquire)) {
SPIN_WAIT_HINT();
}
}

LOG_INFO_V0("Thread %d: PTO2 dispatch starting with %d cores", thread_idx, core_trackers_[thread_idx].core_num());
int32_t cur_thread_completed = 0;
int32_t idle_iterations = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Runtime::Runtime() {
worker_count = 0;
aicpu_thread_num = 1;
ready_queue_shards = RUNTIME_DEFAULT_READY_QUEUE_SHARDS;
is_timing_enabled = false;
warmup_iteration_count = RUNTIME_DEFAULT_WARMUP_ITERATION_COUNT;
timing_iteration_count = RUNTIME_DEFAULT_TIMING_ITERATION_COUNT;
task_window_size = 0;
heap_size = 0;
dep_pool_size = 0;
Expand Down