diff --git a/pyproject.toml b/pyproject.toml index 37bf5fd..2c657cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ warn_redundant_casts = true warn_unused_ignores = true [tool.ruff] -line-length = 120 +line-length = 100 indent-width = 2 [tool.ruff.format] diff --git a/src/mldebug/aie_status.py b/src/mldebug/aie_status.py index 0ba2f95..91daa56 100644 --- a/src/mldebug/aie_status.py +++ b/src/mldebug/aie_status.py @@ -12,6 +12,7 @@ from mldebug.extra.aie_guidance import AIEGuidanceChecker + class AIEStatus: """ Top level class to manage aie status @@ -84,7 +85,9 @@ def _append_dma_status(self, mtype, vaiml=False): if overlay_info is None and vaiml: continue overlay_info = "" if not overlay_info or not vaiml else f" ({overlay_info})" - self.results[mtype][rtype].append((name + overlay_info, c, r, hex(regdata), extra_meta, parsed_reg)) + self.results[mtype][rtype].append( + (name + overlay_info, c, r, hex(regdata), extra_meta, parsed_reg) + ) channel += 1 def _append_bd_status(self, mtype, registers): @@ -184,24 +187,18 @@ def _read_ddr_with_devmem(self, address, width=32): if width == 64: # Read 64-bit by reading two 32-bit values and combining. result_low = subprocess.run( - ["devmem2", hex(address)], - capture_output=True, - text=True, - check=True + ["devmem2", hex(address)], capture_output=True, text=True, check=True ) - match_low = re.search(r':\s*(0x[0-9a-fA-F]+)', result_low.stdout) + match_low = re.search(r":\s*(0x[0-9a-fA-F]+)", result_low.stdout) if not match_low: print(f"[WARNING] Failed to parse devmem2 output for address {hex(address)}") return None low_val = int(match_low.group(1), 16) result_high = subprocess.run( - ["devmem2", hex(address + 4)], - capture_output=True, - text=True, - check=True + ["devmem2", hex(address + 4)], capture_output=True, text=True, check=True ) - match_high = re.search(r':\s*(0x[0-9a-fA-F]+)', result_high.stdout) + match_high = re.search(r":\s*(0x[0-9a-fA-F]+)", result_high.stdout) if not match_high: print(f"[WARNING] Failed to parse devmem2 output for address {hex(address + 4)}") return None @@ -210,12 +207,9 @@ def _read_ddr_with_devmem(self, address, width=32): return (high_val << 32) | low_val else: result = subprocess.run( - ["devmem2", hex(address)], - capture_output=True, - text=True, - check=True + ["devmem2", hex(address)], capture_output=True, text=True, check=True ) - match = re.search(r':\s*(0x[0-9a-fA-F]+)', result.stdout) + match = re.search(r":\s*(0x[0-9a-fA-F]+)", result.stdout) if not match: print(f"[WARNING] Failed to parse devmem2 output for address {hex(address)}") return None @@ -259,11 +253,11 @@ def _append_hsa_queue_status(self): # Validate address - skip DDR reads for invalid addresses # 0x0 indicates no queue, 0xffffffffffffffff indicates uninitialized/invalid - if hsa_queue_addr != 0 and hsa_queue_addr != 0xffffffffffffffff: + if hsa_queue_addr != 0 and hsa_queue_addr != 0xFFFFFFFFFFFFFFFF: # Read queue information from DDR read_index = self._read_ddr_with_devmem(hsa_queue_addr + 0x0, 64) write_index = self._read_ddr_with_devmem(hsa_queue_addr + 0x10, 64) - queue_capacity = self._read_ddr_with_devmem(hsa_queue_addr + 0xc, 32) + queue_capacity = self._read_ddr_with_devmem(hsa_queue_addr + 0xC, 32) if read_index is not None: hsa_info.append(("HSA_READ_INDEX", read_index)) @@ -316,7 +310,9 @@ def _append_core_status(self, debug_mode=False): if debug_mode: dbg_ctrl_1 = hex(self.backend.read_register(c, r, regmap["DEBUG_CONTROL1"])) - self.results[mtype][cs_k].append((f"DBG_CTRL:{dbg_ctrl_1}", c, r, cs_val, f"PC:{cpc_val}", cs_parsed)) + self.results[mtype][cs_k].append( + (f"DBG_CTRL:{dbg_ctrl_1}", c, r, cs_val, f"PC:{cpc_val}", cs_parsed) + ) else: cs_parsed += f",PC:{cpc_val}" self.results[mtype][cs_k].append((cs_k, c, r, cs_val, "", cs_parsed)) @@ -436,7 +432,15 @@ def update(self, tile_type=None, vaiml=False, advanced=False, debug_map_json=Non # DMA in AIE, Shim and MEM Tiles self._append_dma_status(ttype, vaiml) - def get(self, filename=None, tile_type=None, vaiml=False, advanced=False, debug_map_json=None, guidance=False): + def get( + self, + filename=None, + tile_type=None, + vaiml=False, + advanced=False, + debug_map_json=None, + guidance=False, + ): """ Query, store, and print or save status for all requested tiles. @@ -477,7 +481,11 @@ def _get_uc_status(self, debug_map_json=None): key = (entry.get("page_offset"), entry.get("column")) prev_map[key] = None if prev_entry: - prev_map[key] = prev_entry.get("operation"), prev_entry.get("line"), prev_entry.get("file") + prev_map[key] = ( + prev_entry.get("operation"), + prev_entry.get("line"), + prev_entry.get("file"), + ) prev_entry = entry for uc_data in self.results[self.aie_iface.SHIM_TILE_T]["UC_STATUS"]: d = dict(uc_data[3]) @@ -514,8 +522,7 @@ def get_uc_status(self, debug_map_json=None, guidance=False): else: print("UC Module is not present in this device.") - def run_guidance_checks(self, show_passed=False, show_guidance=True, - export_json=None): + def run_guidance_checks(self, show_passed=False, show_guidance=True, export_json=None): """ Run guidance checks on collected status data and display results. diff --git a/src/mldebug/aie_util.py b/src/mldebug/aie_util.py index 4210a7c..6269a3c 100644 --- a/src/mldebug/aie_util.py +++ b/src/mldebug/aie_util.py @@ -190,13 +190,12 @@ def skip_iterations_to_lock_acq(self, lock_acq_pc, count, sid): wait_until(self.impl.poll_core_status) pcs = self.impl.read_core_pc(True) - is_valid = self.pcs_match_target(pcs, lock_acq_pc) + is_valid = self.pcs_match_target(pcs, lock_acq_pc) if not is_valid: LOGGER.log( - f"{sid}: Invalid result in skip_iterations_to_lock_acq. " - f"target_pc={lock_acq_pc} pcs={pcs} " - ) - #else: + f"{sid}: Invalid result in skip_iterations_to_lock_acq. target_pc={lock_acq_pc} pcs={pcs} " + ) + # else: # LOGGER.log( # f"{sid}: Successfully skipped to lock acq pc. " # f"target_pc={lock_acq_pc} pcs={pcs} " @@ -307,7 +306,9 @@ def break_combo(self): true_core_event = self._get_eventid("TRUE_CORE") # eventC==eventD means generate combo3 and reset state machine - combo_event_inputs = rising_edge_event + (true_core_event << 8) + (pc_event << 16) + (pc_event << 24) + combo_event_inputs = ( + rising_edge_event + (true_core_event << 8) + (pc_event << 16) + (pc_event << 24) + ) self.impl.write_aie_regs(reg_map["DEBUG_CONTROL1"], combo_3_event << 16) self.impl.write_aie_regs(reg_map["COMBO_EVENT_INPUTS_A_D"], combo_event_inputs) @@ -359,7 +360,7 @@ def check_errors(self, layer, itr): self._error_found = True # Check secondary error event register if it exists (NPU3 only) - if hasattr(aif, 'ERRORS_EVENT_REG2'): + if hasattr(aif, "ERRORS_EVENT_REG2"): for c, r in self._filter_tiles(aif.AIE_TILE_T): data = self.impl.read_register(c, r, aif.Core_registers[aif.ERRORS_EVENT_REG2]) parsed = aif.parse_register(aif.ERRORS_EVENT_REG2, data) @@ -393,7 +394,6 @@ def check_errors(self, layer, itr): ) print() - def write_aie_regs(self, offset, val): """ Write a value to all AIE tile registers @@ -444,7 +444,7 @@ def single_step_core(self, c, r): Single step an aie core """ offset = self.aie_iface.Core_registers["DEBUG_CONTROL0"] - self.impl.write_register(c, r, offset, (1<<2)) + self.impl.write_register(c, r, offset, (1 << 2)) def disable_ecc_event(self): """ @@ -474,17 +474,17 @@ def pcs_match_target(self, pcs, target_pc, allow_combo_delay=False): for tile, val in pc_dict.items(): if target_pc == val: continue - #print(f"Try to reconcile tile {tile} {val}") + # print(f"Try to reconcile tile {tile} {val}") col, row = tile for _ in range(num_pipeline_stages): self.single_step_core(col, row) newpc = self.read_core_pc_tile(col, row) delta = newpc - target_pc - if target_pc == newpc or max_pc_tolerance > delta > 0 : + if target_pc == newpc or max_pc_tolerance > delta > 0: break # if core pc is slightly ahead, we should be okay # but if not, execution can run into trouble later if target_pc > self.read_core_pc_tile(col, row): return False - #print("Successfully reconciled") + # print("Successfully reconciled") return True diff --git a/src/mldebug/arch/aie2p_defs.py b/src/mldebug/arch/aie2p_defs.py index 07801d9..b5e860a 100644 --- a/src/mldebug/arch/aie2p_defs.py +++ b/src/mldebug/arch/aie2p_defs.py @@ -650,7 +650,7 @@ def parse_overlay(): overlay[t] = tile["dma_connectivity"] except FileNotFoundError: # Return empty overlay if not supported - #print("Overlay info not found for this Device.") + # print("Overlay info not found for this Device.") return {} return overlay diff --git a/src/mldebug/arch/aie2ps_defs.py b/src/mldebug/arch/aie2ps_defs.py index f2171eb..8b2d38e 100644 --- a/src/mldebug/arch/aie2ps_defs.py +++ b/src/mldebug/arch/aie2ps_defs.py @@ -651,7 +651,7 @@ def parse_overlay(): overlay[t] = tile["dma_connectivity"] except FileNotFoundError: # Return empty overlay if not supported - #print("Overlay info not found for this Device.") + # print("Overlay info not found for this Device.") return {} return overlay diff --git a/src/mldebug/arch/loader.py b/src/mldebug/arch/loader.py index d0efec5..be7738b 100644 --- a/src/mldebug/arch/loader.py +++ b/src/mldebug/arch/loader.py @@ -12,6 +12,7 @@ AIE_DEV_TEL = "telluride" AIE_DEV_NPU3 = "npu3" + def load_aie_arch(device): """ return specific aie arch module based on name diff --git a/src/mldebug/backend/backend_interface.py b/src/mldebug/backend/backend_interface.py index 2d3523e..c4ea689 100644 --- a/src/mldebug/backend/backend_interface.py +++ b/src/mldebug/backend/backend_interface.py @@ -186,7 +186,7 @@ def single_step(self, num_instr=1): """ @abstractmethod - def read_aie_regs(self, reg)-> list[int]: + def read_aie_regs(self, reg) -> list[int]: """ Reads a register in all of debug aie cores diff --git a/src/mldebug/backend/core_dump_impl.py b/src/mldebug/backend/core_dump_impl.py index 50e9268..aeebb10 100644 --- a/src/mldebug/backend/core_dump_impl.py +++ b/src/mldebug/backend/core_dump_impl.py @@ -13,6 +13,7 @@ try: from .xrt_backend import MlDebug + HAS_XRT_BACKEND = True except ImportError: HAS_XRT_BACKEND = False @@ -28,7 +29,7 @@ "numrows": 6, "numcols": 4, "shim_tile_block_size": 1024 * 1024, # 1MB - "mem_tile_block_size": 1024 * 1024, # 1MB + "mem_tile_block_size": 1024 * 1024, # 1MB "core_tile_block_size": 1024 * 1024, # 1MB }, AIE_DEV_STX: { @@ -57,11 +58,13 @@ }, } + class CoreDumpFallbackReader: """ Pure Python fallback implementation for reading core dump files. Replicates the C++ CoreDumpDataAccessBackend logic. """ + def __init__(self, core_dump_file, dev_name, no_header=False): """ Initialize the fallback reader @@ -88,8 +91,9 @@ def __init__(self, core_dump_file, dev_name, no_header=False): try: self.file_handle = open(self.filename, "rb") except PermissionError as e: - raise PermissionError(f"Permission denied: Cannot open core dump file '{self.filename}'." - " Check file permissions.") from e + raise PermissionError( + f"Permission denied: Cannot open core dump file '{self.filename}'. Check file permissions." + ) from e except OSError as e: raise OSError(f"Failed to open core dump file '{self.filename}': {e}") from e except Exception as e: @@ -147,8 +151,9 @@ def peek_device(filename): return None version_num, header_size = struct.unpack(" 1024 * 1024: - raise ValueError(f"Invalid header size in core dump: {self.header_size} bytes (expected 18-1048576)") + raise ValueError( + f"Invalid header size in core dump: {self.header_size} bytes (expected 18-1048576)" + ) except (ValueError, RuntimeError) as e: raise ValueError("I/O error while reading core dump header") from e @@ -227,9 +238,11 @@ def _calculate_file_position(self, col, row, offset): core_row_start = self.metadata["core_row_start"] # Calculate tower size (one column's worth of tiles) - tower_size = (shim_tile_block_size + - mem_tile_block_size * memtile_rows + - core_tile_block_size * (self.metadata["numrows"] - core_row_start)) + tower_size = ( + shim_tile_block_size + + mem_tile_block_size * memtile_rows + + core_tile_block_size * (self.metadata["numrows"] - core_row_start) + ) # Calculate tile position index based on row type if row == 0: @@ -240,10 +253,12 @@ def _calculate_file_position(self, col, row, offset): tile_pos_index = col * tower_size + shim_tile_block_size + (row - 1) * mem_tile_block_size else: # Core tile - tile_pos_index = (col * tower_size + - shim_tile_block_size + - mem_tile_block_size * memtile_rows + - (row - 1 - memtile_rows) * core_tile_block_size) + tile_pos_index = ( + col * tower_size + + shim_tile_block_size + + mem_tile_block_size * memtile_rows + + (row - 1 - memtile_rows) * core_tile_block_size + ) file_position = self.header_size + tile_pos_index + offset return file_position @@ -315,13 +330,15 @@ def dump_buffer(self, col, row, offset, size): # Unpack as little-endian uint32 array for i in range(num_words): - word = struct.unpack(" None: + + def __init__( + self, aie_overlay_tiles, ctx_id, pid, dev_name, core_dump_file=None, no_header=False + ) -> None: """ Initialize the Core Dump backend @@ -356,13 +376,15 @@ def __init__(self, aie_overlay_tiles, ctx_id, pid, dev_name, core_dump_file=None if no_header or not HAS_XRT_BACKEND: # Python fallback reader is required to support headerless parsing - #print("[INFO] --no_header specified: using Python fallback reader " + # print("[INFO] --no_header specified: using Python fallback reader " # "(C++ binding does not support headerless mode)") self.use_fallback = True else: # Try to initialize the C++ binding first try: - self.binding = MlDebug(list(self.overlay_aie_core_tiles), ctx_id, pid, dev_name, "debuglibrary", core_dump_file) + self.binding = MlDebug( + list(self.overlay_aie_core_tiles), ctx_id, pid, dev_name, "debuglibrary", core_dump_file + ) print("[INFO] Core Dump backend initialized with C++ DebugLibrary") except (ImportError, TypeError): self.use_fallback = True @@ -402,7 +424,9 @@ def configure_performance_counters(self): """ Configure performance counters - NOT SUPPORTED in core dump mode """ - print("[WARNING] configure_performance_counters() is not supported in core dump mode (read-only)") + print( + "[WARNING] configure_performance_counters() is not supported in core dump mode (read-only)" + ) def set_performance_counter_halt(self): """ diff --git a/src/mldebug/backend/factory.py b/src/mldebug/backend/factory.py index ed639c7..b94f437 100644 --- a/src/mldebug/backend/factory.py +++ b/src/mldebug/backend/factory.py @@ -55,7 +55,9 @@ def create_backend(backend_type, config): try: xrt_mod = importlib.import_module("mldebug.backend.xrt_impl") except ModuleNotFoundError: - print("Unable to import Backend. Python 3.10 is required on Win/Linux and 3.12 on Embedded Linux.") + print( + "Unable to import Backend. Python 3.10 is required on Win/Linux and 3.12 on Embedded Linux." + ) cleanup_and_exit(config.args, 1) except ImportError: print("Unable to import XRT. Please check install.") @@ -69,6 +71,10 @@ def create_backend(backend_type, config): # core_dump (default) core_dump_mod = importlib.import_module("mldebug.backend.core_dump_impl") return core_dump_mod.CoreDumpImpl( - config.tiles, config.ctx_id, config.pid, config.device, - core_dump_file=config.core_dump_file, no_header=config.no_header, + config.tiles, + config.ctx_id, + config.pid, + config.device, + core_dump_file=config.core_dump_file, + no_header=config.no_header, ) diff --git a/src/mldebug/backend/test_impl.py b/src/mldebug/backend/test_impl.py index 6d8e2f0..4949bf1 100644 --- a/src/mldebug/backend/test_impl.py +++ b/src/mldebug/backend/test_impl.py @@ -10,6 +10,7 @@ from mldebug.utils import print_tile_grid from .backend_interface import BackendInterface + class TestImpl(BackendInterface): """ Test Backend Top Class @@ -270,7 +271,11 @@ def read_core_debug_status(self): Returns: None """ - print_tile_grid("Core Debug Status", self.aie_debug_tiles, register_values=[0xDEAD] * len(self.aie_debug_tiles)) + print_tile_grid( + "Core Debug Status", + self.aie_debug_tiles, + register_values=[0xDEAD] * len(self.aie_debug_tiles), + ) def read_core_execution_status(self): """ @@ -282,7 +287,11 @@ def read_core_execution_status(self): Returns: None """ - print_tile_grid("Core Execution Status", self.aie_debug_tiles, register_values=[0xDEAD] * len(self.aie_debug_tiles)) + print_tile_grid( + "Core Execution Status", + self.aie_debug_tiles, + register_values=[0xDEAD] * len(self.aie_debug_tiles), + ) def read_all_core_pc(self): """ @@ -347,4 +356,4 @@ def disable_pc_halt(self): Args: None """ - return \ No newline at end of file + return diff --git a/src/mldebug/backend/xrt_impl.py b/src/mldebug/backend/xrt_impl.py index 27ffe3f..e5643cf 100644 --- a/src/mldebug/backend/xrt_impl.py +++ b/src/mldebug/backend/xrt_impl.py @@ -16,7 +16,9 @@ class XRTImpl(BackendInterface): XRT Backend top """ - def __init__(self, aie_overlay_tiles, ctx_id, pid, dev_name, debug_library=False, core_dump_file=None) -> None: + def __init__( + self, aie_overlay_tiles, ctx_id, pid, dev_name, debug_library=False, core_dump_file=None + ) -> None: """ Initialize the XRTImpl backend @@ -31,7 +33,9 @@ def __init__(self, aie_overlay_tiles, ctx_id, pid, dev_name, debug_library=False self.overlay_aie_core_tiles = aie_overlay_tiles use_debug_library = "debuglibrary" if debug_library else "xrt" if dev_name != AIE_DEV_TEL: - self.binding = MlDebug(list(self.overlay_aie_core_tiles), ctx_id, pid, dev_name, use_debug_library) + self.binding = MlDebug( + list(self.overlay_aie_core_tiles), ctx_id, pid, dev_name, use_debug_library + ) else: self.binding = MlDebug(list(self.overlay_aie_core_tiles), ctx_id, pid, dev_name) self.pc_brkpts = [0, 0] diff --git a/src/mldebug/batch_runner.py b/src/mldebug/batch_runner.py index 2bba4e1..247a2df 100644 --- a/src/mldebug/batch_runner.py +++ b/src/mldebug/batch_runner.py @@ -31,8 +31,7 @@ class BatchRunner: as the execution backend for InteractiveController. """ - def __init__(self, args, state, design_info, impls, aie_utls, - dumper, status_handle): + def __init__(self, args, state, design_info, impls, aie_utls, dumper, status_handle): """ Args: args: Parsed command-line arguments. @@ -129,7 +128,9 @@ def check_pm_reload(self, stamp_id=0): """ layer = self.state.layers[self.state.current_layer] # PM Load is not enabled for this stamp or this is last layer - if not self.design_info.work_dir.stamp(stamp_id).pm_reload_en or self.state.current_layer + 1 >= len(self.state.layers): + if not self.design_info.work_dir.stamp( + stamp_id + ).pm_reload_en or self.state.current_layer + 1 >= len(self.state.layers): return False # Stamp id doesn't run for this layer if not layer.runs_replica(stamp_id): @@ -215,9 +216,7 @@ def schedule_layer_start(self, next_layer): self.state.break_on_stamp_scheduled[sid] = True if pml: if not reaches_now: - LOGGER.log( - f"\nArming PM RELOAD on stamp {sid} for Layer_{target_layer.layer_order} " - ) + LOGGER.log(f"\nArming PM RELOAD on stamp {sid} for Layer_{target_layer.layer_order} ") else: LOGGER.log(f"\nPM RELOAD on stamp: {sid}") skip_end_pc = not (self.args.run_flags.l1_ofm_dump and stamp.end_pc) @@ -274,7 +273,9 @@ def _process_err(self): if not self.args.aie_only: layer = self.state.get_current_layer() if layer: - stamp_names = ", ".join([f"Stamp {i}: {stamp.name}" for i, stamp in enumerate(layer.stamps)]) + stamp_names = ", ".join( + [f"Stamp {i}: {stamp.name}" for i, stamp in enumerate(layer.stamps)] + ) LOGGER.log(f"Stopped at Start of Kernel(s): {stamp_names}") LOGGER.log(f"Current Layer: {layer.layer_order}, Iteration: {self.state.cur_it}") LOGGER.log(str(layer)) @@ -385,7 +386,8 @@ def _run_stamp(self, layer, sid, target_itr, cur_it=1): self.state.error = not utl.skip_iterations(target_itr - cur_it, sid) elif self.args.run_flags.skip_iter2: self.state.error = not utl.skip_iterations_to_lock_acq( - self.design_info.work_dir.stamp(sid).post_layer_lock_acq_pc, target_itr - cur_it, sid) + self.design_info.work_dir.stamp(sid).post_layer_lock_acq_pc, target_itr - cur_it, sid + ) else: while cur_it < target_itr: self.hit_next_breakpoint(sid) @@ -398,7 +400,9 @@ def _run_stamp(self, layer, sid, target_itr, cur_it=1): cur_it += 1 self._process_end_breakpoint(layer, cur_it, sid) else: - print(f"[ERROR] Abort Execution of Stamp {sid}. PC List: {all_pc} doesn't match {stamp.start_pc}") + print( + f"[ERROR] Abort Execution of Stamp {sid}. PC List: {all_pc} doesn't match {stamp.start_pc}" + ) self.state.error = True break @@ -458,8 +462,10 @@ def execute_and_dump(self): overlay = self.design_info.overlay for layer in self.state.update_layer(): - LOGGER.log(f"Stepping to layer {layer.layer_order}: {layer.stamps[0].name}," - f" stamps: {len(layer.stamps)}, iters {layer.lcp.num_iter}") + LOGGER.log( + f"Stepping to layer {layer.layer_order}: {layer.stamps[0].name}," + f" stamps: {len(layer.stamps)}, iters {layer.lcp.num_iter}" + ) self.schedule_layer_start(layer) self.run_layer(layer) diff --git a/src/mldebug/client_debug.py b/src/mldebug/client_debug.py index 5e15f42..85cfbc7 100644 --- a/src/mldebug/client_debug.py +++ b/src/mldebug/client_debug.py @@ -52,7 +52,9 @@ def __init__(self, args, ctx_id, pid, output_dir): # Create this first so that connection will be aborted in case of crash if self.args.automated_debug or self.args.l3: debug_server = DebugServer( - self.output_dir, self.args.backend == "test", subgraph_name=self.args.subgraph_name, + self.output_dir, + self.args.backend == "test", + subgraph_name=self.args.subgraph_name, ) # Track the live server so cleanup_and_exit() at unplanned exit points # can send TERMINATE_CONNECTION to flexmlrt. @@ -69,7 +71,7 @@ def __init__(self, args, ctx_id, pid, output_dir): if debug_server: print("[INFO] closing debug server.") debug_server.close() - raise(err) + raise (err) for i in self.design_info.overlay.get_stampids(): config = BackendConfig( @@ -79,14 +81,17 @@ def __init__(self, args, ctx_id, pid, output_dir): device=args.device, design_info=self.design_info, args=args, - core_dump_file=getattr(args, 'core_dump', None), - no_header=getattr(args, 'no_header', False), + core_dump_file=getattr(args, "core_dump", None), + no_header=getattr(args, "no_header", False), ) impl = create_backend(args.backend, config) self.impls.append(impl) self.aie_utls.append( AIEUtil( - args.aie_iface, impl, self.design_info.overlay.get_tiles(stamp_id=i), self.design_info.work_dir.stamp(i).globals + args.aie_iface, + impl, + self.design_info.overlay.get_tiles(stamp_id=i), + self.design_info.work_dir.stamp(i).globals, ) ) @@ -94,7 +99,10 @@ def __init__(self, args, ctx_id, pid, output_dir): self.impl = self.impls[0] self.status_handle = AIEStatus( - self.impl, self.design_info.overlay.get_tiles, args.aie_iface, self.design_info.overlay.get_repr() + self.impl, + self.design_info.overlay.get_tiles, + args.aie_iface, + self.design_info.overlay.get_repr(), ) # Initialize specialized components (share mutable lists by reference) @@ -103,8 +111,7 @@ def __init__(self, args, ctx_id, pid, output_dir): self.dumper.debug_server = debug_server self.runner = BatchRunner( - args, self.state, self.design_info, self.impls, self.aie_utls, - self.dumper, self.status_handle + args, self.state, self.design_info, self.impls, self.aie_utls, self.dumper, self.status_handle ) self.interactive = InteractiveController( args, self.state, self.design_info, self.impls, self.aie_utls, self.runner diff --git a/src/mldebug/debug_server.py b/src/mldebug/debug_server.py index 856f238..6c69a90 100644 --- a/src/mldebug/debug_server.py +++ b/src/mldebug/debug_server.py @@ -20,8 +20,12 @@ class DebugServer: """ def __init__( - self, output_dir, is_testmode, subgraph_name="subgraph", - bind_addr=("127.0.0.1", 9000), connect_timeout=None, + self, + output_dir, + is_testmode, + subgraph_name="subgraph", + bind_addr=("127.0.0.1", 9000), + connect_timeout=None, ) -> None: """ Initialize the DebugServer instance. @@ -134,7 +138,9 @@ def send_request(self, name, offset, size, current_dir=False): filename = "" if self.subgraph_name: - filename = os.path.normpath(os.path.join(file_prefix, "spillBO_" + self.subgraph_name + "_id_0_" + name + ".bin")) + filename = os.path.normpath( + os.path.join(file_prefix, "spillBO_" + self.subgraph_name + "_id_0_" + name + ".bin") + ) else: filename = os.path.normpath(os.path.join(file_prefix, name + ".bin")) padded_filename_bytes = self.pad_string(filename) diff --git a/src/mldebug/extra/aie_guidance.py b/src/mldebug/extra/aie_guidance.py index 43c318d..8f04923 100644 --- a/src/mldebug/extra/aie_guidance.py +++ b/src/mldebug/extra/aie_guidance.py @@ -14,24 +14,37 @@ # AIE Core Status Register Bit Masks # These correspond to bit positions in the CORE_STATUS register # NOTE: These are the same for all AIE generations -CORE_STATUS_ENABLED_MASK = 0x1 # Bit 0: Core enabled/disabled -CORE_STATUS_RESET_MASK = 0x2 # Bit 1: Core in reset state +CORE_STATUS_ENABLED_MASK = 0x1 # Bit 0: Core enabled/disabled +CORE_STATUS_RESET_MASK = 0x2 # Bit 1: Core in reset state + # Severity levels for categorizing guidance rule failures class Severity(Enum): """Severity levels for guidance messages""" + ERROR = "error" WARNING = "warning" INFO = "info" + # Result of a single guidance rule evaluation with pass/fail status and details class GuidanceResult: """Result of a single guidance rule check""" - def __init__(self, rule_id: str, rule_name: str, category: str, subcategory: str, - passed: bool, severity: Severity, message: str, guidance: str, - tile_location: Optional[Tuple[int, int]] = None, - actual_value: Any = None, expected_value: Any = None): + def __init__( + self, + rule_id: str, + rule_name: str, + category: str, + subcategory: str, + passed: bool, + severity: Severity, + message: str, + guidance: str, + tile_location: Optional[Tuple[int, int]] = None, + actual_value: Any = None, + expected_value: Any = None, + ): self.rule_id = rule_id self.rule_name = rule_name self.category = category @@ -63,7 +76,7 @@ def to_dict(self) -> Dict[str, Any]: "guidance": self.guidance if not self.passed else "", "tile_location": self.tile_location, "actual_value": str(self.actual_value) if self.actual_value is not None else None, - "expected_value": str(self.expected_value) if self.expected_value is not None else None + "expected_value": str(self.expected_value) if self.expected_value is not None else None, } @@ -94,10 +107,10 @@ def __init__(self, rules_file: Optional[str] = None, aie_iface=None): def _load_rules(self, rules_file: str) -> Dict[str, Dict]: """Load guidance rules from JSON file""" try: - with open(rules_file, 'r', encoding="utf-8") as f: + with open(rules_file, "r", encoding="utf-8") as f: data = json.load(f) # Convert list of rules to dict keyed by rule_id - return {rule['id']: rule for rule in data['rules']} + return {rule["id"]: rule for rule in data["rules"]} except FileNotFoundError: print(f"[WARNING] Guidance rules file not found: {rules_file}") return {} @@ -106,8 +119,9 @@ def _load_rules(self, rules_file: str) -> Dict[str, Dict]: return {} # Evaluate a single guidance rule against actual hardware status value - def _evaluate_rule(self, rule: Dict, actual_value: Any, col = None, - row = None, extra_params = None) -> GuidanceResult: + def _evaluate_rule( + self, rule: Dict, actual_value: Any, col=None, row=None, extra_params=None + ) -> GuidanceResult: """ Evaluate a single rule against actual value @@ -121,22 +135,22 @@ def _evaluate_rule(self, rule: Dict, actual_value: Any, col = None, Returns: GuidanceResult object """ - threshold = rule['threshold'] - operator = rule['operator'] - value_type = rule['value_type'] + threshold = rule["threshold"] + operator = rule["operator"] + value_type = rule["value_type"] # Convert actual_value to appropriate type - if value_type == 'int': + if value_type == "int": actual_value = int(actual_value) if actual_value is not None else 0 threshold = int(threshold) - elif value_type == 'float': + elif value_type == "float": actual_value = float(actual_value) if actual_value is not None else 0.0 threshold = float(threshold) - elif value_type == 'bool': + elif value_type == "bool": if isinstance(actual_value, bool): pass # Already boolean elif isinstance(actual_value, str): - actual_value = actual_value.lower() in ('true', '1', 'enabled', 'yes') + actual_value = actual_value.lower() in ("true", "1", "enabled", "yes") else: actual_value = bool(actual_value) threshold = bool(threshold) @@ -144,22 +158,22 @@ def _evaluate_rule(self, rule: Dict, actual_value: Any, col = None, # Evaluate based on operator passed = False - if operator == '==': + if operator == "==": passed = actual_value == threshold - elif operator == '!=': + elif operator == "!=": passed = actual_value != threshold - elif operator == '>': + elif operator == ">": passed = actual_value > threshold - elif operator == '>=': + elif operator == ">=": passed = actual_value >= threshold - elif operator == '<': + elif operator == "<": passed = actual_value < threshold - elif operator == '<=': + elif operator == "<=": passed = actual_value <= threshold # Build message with parameter substitution - message = rule['good_message'] if passed else rule['bad_message'] - params = {'col': col, 'row': row, 'value': actual_value} + message = rule["good_message"] if passed else rule["bad_message"] + params = {"col": col, "row": row, "value": actual_value} if extra_params: params.update(extra_params) @@ -169,21 +183,26 @@ def _evaluate_rule(self, rule: Dict, actual_value: Any, col = None, # If formatting fails, use message as-is pass - severity = Severity.ERROR if rule.get('severity', 'error') == 'error' else \ - Severity.WARNING if rule.get('severity') == 'warning' else Severity.INFO + severity = ( + Severity.ERROR + if rule.get("severity", "error") == "error" + else Severity.WARNING + if rule.get("severity") == "warning" + else Severity.INFO + ) return GuidanceResult( - rule_id=rule['id'], - rule_name=rule['name'], - category=rule['category'], - subcategory=rule['subcategory'], + rule_id=rule["id"], + rule_name=rule["name"], + category=rule["category"], + subcategory=rule["subcategory"], passed=passed, severity=severity, message=message, - guidance=rule['guidance'], + guidance=rule["guidance"], tile_location=(col, row) if col is not None and row is not None else None, actual_value=actual_value, - expected_value=threshold + expected_value=threshold, ) # Check AIE core tile status (enabled, PC, locks, error events) @@ -197,7 +216,7 @@ def check_core_status(self, status_data: Dict) -> None: aie_tile_key = None # Find the AIE tile type key (may vary: 'aie_tile', 'AIE_TILE_T', etc.) for key in status_data.keys(): - if 'aie' in key.lower() and 'tile' in key.lower(): + if "aie" in key.lower() and "tile" in key.lower(): aie_tile_key = key break @@ -207,8 +226,8 @@ def check_core_status(self, status_data: Dict) -> None: tile_data = status_data[aie_tile_key] # Check CORE_STATUS - if 'CORE_STATUS' in tile_data: - for entry in tile_data['CORE_STATUS']: + if "CORE_STATUS" in tile_data: + for entry in tile_data["CORE_STATUS"]: _, col, row, status_val, __, parsed = entry # Parse status value to extract enabled, reset, running flags @@ -216,55 +235,59 @@ def check_core_status(self, status_data: Dict) -> None: status_int = int(status_val, 16) if isinstance(status_val, str) else status_val # Check if core is enabled - if 'CORE_ENABLED' in self.rules: + if "CORE_ENABLED" in self.rules: enabled = bool(status_int & CORE_STATUS_ENABLED_MASK) - result = self._evaluate_rule(self.rules['CORE_ENABLED'], enabled, col, row) + result = self._evaluate_rule(self.rules["CORE_ENABLED"], enabled, col, row) self.results.append(result) # Check if core is in reset - if 'CORE_IN_RESET' in self.rules: + if "CORE_IN_RESET" in self.rules: in_reset = bool(status_int & CORE_STATUS_RESET_MASK) - result = self._evaluate_rule(self.rules['CORE_IN_RESET'], in_reset, col, row) + result = self._evaluate_rule(self.rules["CORE_IN_RESET"], in_reset, col, row) self.results.append(result) # Check if core is in lock stall - look for "Lock_Stall" in parsed status - if 'CORE_LOCK_STALL' in self.rules and parsed: - lock_stall = 'Lock_Stall' in parsed or 'LOCK_STALL' in parsed.upper() - result = self._evaluate_rule(self.rules['CORE_LOCK_STALL'], lock_stall, col, row) + if "CORE_LOCK_STALL" in self.rules and parsed: + lock_stall = "Lock_Stall" in parsed or "LOCK_STALL" in parsed.upper() + result = self._evaluate_rule(self.rules["CORE_LOCK_STALL"], lock_stall, col, row) self.results.append(result) # Check if core is in error halt - look for "Error_Halt" in parsed status - if 'CORE_ERROR_HALT' in self.rules and parsed: - error_halt = 'Error_Halt' in parsed or 'ERROR_HALT' in parsed.upper() - result = self._evaluate_rule(self.rules['CORE_ERROR_HALT'], error_halt, col, row) + if "CORE_ERROR_HALT" in self.rules and parsed: + error_halt = "Error_Halt" in parsed or "ERROR_HALT" in parsed.upper() + result = self._evaluate_rule(self.rules["CORE_ERROR_HALT"], error_halt, col, row) self.results.append(result) # Check lock overflows/underflows - if 'LOCK_OFL' in tile_data and 'LOCK_OVERFLOW' in self.rules: - for entry in tile_data['LOCK_OFL']: + if "LOCK_OFL" in tile_data and "LOCK_OVERFLOW" in self.rules: + for entry in tile_data["LOCK_OFL"]: _, col, row, value, __, parsed = entry overflow_count = int(value, 16) if isinstance(value, str) else value if overflow_count > 0: - result = self._evaluate_rule(self.rules['LOCK_OVERFLOW'], overflow_count, col, row) + result = self._evaluate_rule(self.rules["LOCK_OVERFLOW"], overflow_count, col, row) self.results.append(result) - if 'LOCK_UFL' in tile_data and 'LOCK_UNDERFLOW' in self.rules: - for entry in tile_data['LOCK_UFL']: + if "LOCK_UFL" in tile_data and "LOCK_UNDERFLOW" in self.rules: + for entry in tile_data["LOCK_UFL"]: _, col, row, value, __, parsed = entry underflow_count = int(value, 16) if isinstance(value, str) else value if underflow_count > 0: - result = self._evaluate_rule(self.rules['LOCK_UNDERFLOW'], underflow_count, col, row) + result = self._evaluate_rule(self.rules["LOCK_UNDERFLOW"], underflow_count, col, row) self.results.append(result) # Check event status for errors - only check registers defined in architecture - if 'EVENT_STATUS_ERRORS' in self.rules and self.aie_iface: + if "EVENT_STATUS_ERRORS" in self.rules and self.aie_iface: # Get the error event register names from architecture error_regs = [self.aie_iface.ERRORS_EVENT_REG] - if hasattr(self.aie_iface, 'ERRORS_EVENT_REG2'): + if hasattr(self.aie_iface, "ERRORS_EVENT_REG2"): error_regs.append(self.aie_iface.ERRORS_EVENT_REG2) # Get the specific error event strings to check for - error_strings = self.aie_iface.errors_event_strings if hasattr(self.aie_iface, 'errors_event_strings') else [] + error_strings = ( + self.aie_iface.errors_event_strings + if hasattr(self.aie_iface, "errors_event_strings") + else [] + ) # Check each error register for error_reg_name in error_regs: @@ -284,14 +307,10 @@ def check_core_status(self, status_data: Dict) -> None: # Create custom message with specific errors error_list = ", ".join(errors_found) result = self._evaluate_rule( - self.rules['EVENT_STATUS_ERRORS'], - event_val, - col, - row, - {'errors': error_list} + self.rules["EVENT_STATUS_ERRORS"], event_val, col, row, {"errors": error_list} ) # Override message to include specific errors - result.message = result.message.replace('Status: {value}', f'Errors: {error_list}') + result.message = result.message.replace("Status: {value}", f"Errors: {error_list}") self.results.append(result) # Check DMA channel status for activity and configuration @@ -315,7 +334,7 @@ def check_shim_status(self, status_data: Dict) -> None: """ shim_tile_key = None for key in status_data.keys(): - if 'shim' in key.lower() and 'tile' in key.lower(): + if "shim" in key.lower() and "tile" in key.lower(): shim_tile_key = key break @@ -325,28 +344,36 @@ def check_shim_status(self, status_data: Dict) -> None: tile_data = status_data[shim_tile_key] # Check microcontroller status - if 'UC_STATUS' in tile_data and 'UC_FIRMWARE_RUNNING' in self.rules: - for entry in tile_data['UC_STATUS']: + if "UC_STATUS" in tile_data and "UC_FIRMWARE_RUNNING" in self.rules: + for entry in tile_data["UC_STATUS"]: _, col, row, uc_data = entry # uc_data is list of (name, value) tuples fw_state = None for name, value in uc_data: - if 'FW_STATE' in name: + if "FW_STATE" in name: fw_state = value break if fw_state: - result = self._evaluate_rule(self.rules['UC_FIRMWARE_RUNNING'], fw_state, col, row) + result = self._evaluate_rule(self.rules["UC_FIRMWARE_RUNNING"], fw_state, col, row) self.results.append(result) # Check if shim DMA is configured - if ('dma_mm2s_status' in tile_data or 'dma_s2mm_status' in tile_data) and 'SHIM_DMA_CONFIGURED' in self.rules: - for col_row_pair in set([(e[1], e[2]) for section in ['dma_mm2s_status', 'dma_s2mm_status'] - if section in tile_data for e in tile_data[section]]): + if ( + "dma_mm2s_status" in tile_data or "dma_s2mm_status" in tile_data + ) and "SHIM_DMA_CONFIGURED" in self.rules: + for col_row_pair in set( + [ + (e[1], e[2]) + for section in ["dma_mm2s_status", "dma_s2mm_status"] + if section in tile_data + for e in tile_data[section] + ] + ): col, row = col_row_pair # If we have DMA status entries, assume DMA is configured configured = True # Simplified check - result = self._evaluate_rule(self.rules['SHIM_DMA_CONFIGURED'], configured, col, row) + result = self._evaluate_rule(self.rules["SHIM_DMA_CONFIGURED"], configured, col, row) self.results.append(result) # Run all guidance checks (core, DMA, shim) on collected status data @@ -377,11 +404,11 @@ def get_summary(self) -> Dict[str, int]: Dictionary with counts of passed, errors, and warnings """ summary = { - 'total': len(self.results), - 'passed': sum(1 for r in self.results if r.passed), - 'errors': sum(1 for r in self.results if not r.passed and r.severity == Severity.ERROR), - 'warnings': sum(1 for r in self.results if not r.passed and r.severity == Severity.WARNING), - 'info': sum(1 for r in self.results if not r.passed and r.severity == Severity.INFO) + "total": len(self.results), + "passed": sum(1 for r in self.results if r.passed), + "errors": sum(1 for r in self.results if not r.passed and r.severity == Severity.ERROR), + "warnings": sum(1 for r in self.results if not r.passed and r.severity == Severity.WARNING), + "info": sum(1 for r in self.results if not r.passed and r.severity == Severity.INFO), } return summary @@ -454,7 +481,9 @@ def print_results(self, show_passed: bool = False, show_guidance: bool = True) - if show_guidance and not result.passed: print(f" | {'':10} | {'':35} | → {result.guidance}") if result.actual_value is not None: - print(f" | {'':10} | {'':35} | Actual: {result.actual_value}, Expected: {result.expected_value}") + print( + f" | {'':10} | {'':35} | Actual: {result.actual_value}, Expected: {result.expected_value}" + ) print() @@ -478,12 +507,9 @@ def export_json(self, filename: str) -> None: Args: filename: Output filename """ - output = { - 'summary': self.get_summary(), - 'results': [r.to_dict() for r in self.results] - } + output = {"summary": self.get_summary(), "results": [r.to_dict() for r in self.results]} - with open(filename, 'w', encoding="utf-8") as f: + with open(filename, "w", encoding="utf-8") as f: json.dump(output, f, indent=2) print(f"[INFO] Guidance results exported to {filename}") diff --git a/src/mldebug/extra/calltree.py b/src/mldebug/extra/calltree.py index d7d6ab4..f1d2b02 100644 --- a/src/mldebug/extra/calltree.py +++ b/src/mldebug/extra/calltree.py @@ -34,6 +34,7 @@ @dataclass class AIEFunc: """Represents a function in the AIE assembly.""" + name: str start_pc: int end_pc: int = 0 @@ -45,6 +46,7 @@ class AIEFunc: @dataclass class CallNode: """Node in the call tree.""" + func_name: str pc: int # PC where call was made children: list = field(default_factory=list) @@ -82,7 +84,7 @@ def from_file(cls, filepath): Returns: AIECallTree instance """ - with open(filepath, 'r', encoding='utf-8') as f: + with open(filepath, "r", encoding="utf-8") as f: content = f.read() return cls(content) @@ -101,16 +103,16 @@ def from_string(cls, lst_content): def _parse(self): """Parse the LST content and extract functions and call information.""" - lines = self._raw_content.split('\n') + lines = self._raw_content.split("\n") # Pattern for function/label header: "00000000 :" - func_pattern = re.compile(r'^([0-9a-f]+)\s+<([0-9a-zA-Z_\s]+)(.+)>:$') + func_pattern = re.compile(r"^([0-9a-f]+)\s+<([0-9a-zA-Z_\s]+)(.+)>:$") # Pattern for instruction with PC: " hex: instruction" - instr_pattern = re.compile(r'^\s*([0-9a-f]+):\s+(.+)$') + instr_pattern = re.compile(r"^\s*([0-9a-f]+):\s+(.+)$") # Pattern for jl (jump and link - function call): jl #0xXXXX - call_pattern = re.compile(r'\bjl\s+#(0x[0-9a-f]+)') + call_pattern = re.compile(r"\bjl\s+#(0x[0-9a-f]+)") # Pattern for j (unconditional jump - potential tail call): j #0xXXXX - jump_pattern = re.compile(r'\bj\s+#(0x[0-9a-f]+)') + jump_pattern = re.compile(r"\bj\s+#(0x[0-9a-f]+)") current_func = None @@ -122,7 +124,7 @@ def _parse(self): name = m_func.group(2) # Skip internal labels (start with '.') - if name.startswith('.'): + if name.startswith("."): continue # Save previous function if exists @@ -150,13 +152,13 @@ def _parse(self): current_func.calls.append((pc, target)) # Check for ret (function end) - if '\tret' in instr or instr.strip().startswith('ret'): + if "\tret" in instr or instr.strip().startswith("ret"): if current_func.end_pc == 0: current_func.end_pc = pc # Check for unconditional jump (potential tail call) m_jump = jump_pattern.search(instr) - if m_jump and '\tjl' not in instr: + if m_jump and "\tjl" not in instr: target = int(m_jump.group(1), 16) current_func.tail_jump_target = target @@ -229,7 +231,7 @@ def build_tree(addr, depth=0): if func.tail_jump_target and func.tail_jump_target in self._addr_to_name: target_name = self._addr_to_name[func.tail_jump_target] - if not target_name.startswith('.'): + if not target_name.startswith("."): child = build_tree(func.tail_jump_target, depth + 1) child.is_tail_call = True node.children.append(child) @@ -273,12 +275,12 @@ def _get_root_addresses(self, root_func=None): else: # Default: start with __start or _main_init for addr, name in self._addr_to_name.items(): - if name in ('__start', '_main_init'): + if name in ("__start", "_main_init"): root_addrs.append(addr) # Also find all superkernel functions for addr, name in self._addr_to_name.items(): - if 'superkernel' in name.lower() and addr not in root_addrs: + if "superkernel" in name.lower() and addr not in root_addrs: root_addrs.append(addr) return sorted(root_addrs) @@ -363,7 +365,7 @@ def get_call_relationships(self): lines.append(f" ├─ calls {target_name} at PC 0x{call_pc:x}") if func.tail_jump_target and func.tail_jump_target in self._addr_to_name: target_name = self._addr_to_name[func.tail_jump_target] - if not target_name.startswith('.'): + if not target_name.startswith("."): lines.append(f" └─ tail-calls {target_name}") return "\n".join(lines) diff --git a/src/mldebug/input_parser.py b/src/mldebug/input_parser.py index 7636602..77fe925 100644 --- a/src/mldebug/input_parser.py +++ b/src/mldebug/input_parser.py @@ -23,6 +23,7 @@ # Seconds to wait at interactive prompts before giving up and exiting. HW_CONTEXT_INPUT_TIMEOUT_S = 60 + @dataclass class RunFlags: """ @@ -74,7 +75,9 @@ def create_run_flags(args, subgraph_path: str, fsp: str, fsp_execution_order: li # AIE Work dir, device, buffer info if subgraph_path and os.path.exists(args.vaiml_folder_path): args.aie_dir = subgraph_path + f"/{fsp}/aiecompiler/Work" - args.mladf_report = subgraph_path + f"/{fsp}/aiecompiler/Work/reports/mladf_compiler_report.json" + args.mladf_report = ( + subgraph_path + f"/{fsp}/aiecompiler/Work/reports/mladf_compiler_report.json" + ) args.buffer_info = subgraph_path + f"/{fsp}/buffer_info.json" args.flexmlrt_hsi = subgraph_path + f"/{fsp}/flexmlrt-hsi.json" args.debug_map_json = subgraph_path + f"/{fsp}/debug_map.json" @@ -94,8 +97,8 @@ def create_run_flags(args, subgraph_path: str, fsp: str, fsp_execution_order: li no_metadata = args.buffer_info is None or not os.path.exists(args.buffer_info) if (no_metadata or not os.path.exists(args.aie_dir)) and not args.aie_only: print("[INFO] Using Standalone mode.") - args.aie_only=True - args.interactive=True + args.aie_only = True + args.interactive = True # AIE interface for aie2p and aie2 are shared # We need to differentiate between them for a few items @@ -127,7 +130,7 @@ def get_flag(s, default=False): get_flag("mock_hang"), get_flag("dump_temps"), get_flag("multistamp"), - get_flag("disable_tg") + get_flag("disable_tg"), ) @@ -239,7 +242,7 @@ def set_device(args) -> None: args.device = AIE_DEV_PHX except (FileNotFoundError, KeyError): pass - #LOGGER.log("[INFO] Unable to detect device automatically.") + # LOGGER.log("[INFO] Unable to detect device automatically.") print(f"[INFO] Using AIE Device: {args.device}.", end=endmsg) @@ -260,10 +263,14 @@ def print_hw_context_table(current_contexts: dict[str, dict[str, str]]) -> None: # LOGGER.log table data for context, context_data in current_contexts.items(): columns_str = ", ".join(map(str, context_data["columns"])) - LOGGER.log(f"{context:<12} {columns_str:<30} {context_data['pid']:<12} {context_data['status']:<12}") + LOGGER.log( + f"{context:<12} {columns_str:<30} {context_data['pid']:<12} {context_data['status']:<12}" + ) -def _validate_contexts_with_read(contexts: dict, device: str, aie_iface) -> list[tuple[int, int]] | None: +def _validate_contexts_with_read( + contexts: dict, device: str, aie_iface +) -> list[tuple[int, int]] | None: """ Validate ALL contexts by reading CORE_STATUS register (verifies register access) @@ -285,7 +292,7 @@ def _validate_contexts_with_read(contexts: dict, device: str, aie_iface) -> list # Device-specific addresses: Telluride=0x38004, PHX/STX=0x32004 test_reg = aie_iface.Core_registers["CORE_STATUS"] test_tiles = [(test_col, test_row)] - + valid_contexts = [] for ctx_id, ctx_info in contexts.items(): backend = None @@ -355,9 +362,11 @@ def check_hw_context(args) -> tuple[int, int]: } if not current_contexts: - print("Warning: xrt-smi could find no applications running. Please launch an application to use MLDebugger.") + print( + "Warning: xrt-smi could find no applications running. Please launch an application to use MLDebugger." + ) raise FileNotFoundError - + # Path 1: Single context found -> auto-select it if len(current_contexts) == 1: ctx = int(list(current_contexts.keys())[0]) @@ -365,7 +374,9 @@ def check_hw_context(args) -> tuple[int, int]: return ctx, pid # Path 2: Multiple contexts found -> validate all with register read test - print(f"[INFO] Found {len(current_contexts)} hardware context(s). Validating with register read test...") + print( + f"[INFO] Found {len(current_contexts)} hardware context(s). Validating with register read test..." + ) valid_contexts = _validate_contexts_with_read(current_contexts, device, aie_iface) # Path 2a: No contexts passed validation -> prompt user for input @@ -405,7 +416,9 @@ def check_hw_context(args) -> tuple[int, int]: ctx = int(selected_context_id) pid = int(valid_only[selected_context_id]["pid"]) else: - LOGGER.log(f"Context ID {selected_context_id} not found. Valid options: {', '.join(valid_only.keys())}") + LOGGER.log( + f"Context ID {selected_context_id} not found. Valid options: {', '.join(valid_only.keys())}" + ) cleanup_and_exit(args, 1) return ctx, pid @@ -519,12 +532,15 @@ def get_subgraph(args) -> tuple[str, Subgraph]: "flag in vaiml_config in vitisai_config.json" ) return model_folder_name, Subgraph( - folder_path=f"{vaiml_folder_path}/{model_folder_name}/{vaiml_subgraphs[0]}", name=vaiml_subgraphs[0] + folder_path=f"{vaiml_folder_path}/{model_folder_name}/{vaiml_subgraphs[0]}", + name=vaiml_subgraphs[0], ) break if len(subgraphs) > 1: - raise RuntimeError("Error: Multi-partition design detected. Specify a partition in vitisai_config.json") + raise RuntimeError( + "Error: Multi-partition design detected. Specify a partition in vitisai_config.json" + ) if len(subgraphs) == 0: raise RuntimeError("Error: No partition found in the input model folder") return model_folder_name, subgraphs[0] diff --git a/src/mldebug/interactive_controller.py b/src/mldebug/interactive_controller.py index 3c17318..d863c94 100644 --- a/src/mldebug/interactive_controller.py +++ b/src/mldebug/interactive_controller.py @@ -117,7 +117,9 @@ def step_layer(self): self.design_info.update_work_dir(next_layer.layer_order) self.runner.schedule_layer_start(next_layer) m = f"Stepped from Layer:{layer.layer_order} {layer.stamps[0].name} Itr:{cur_it} -> " - LOGGER.log(m + f"Layer:{next_layer.layer_order} {next_layer.stamps[0].name} Itr:{1}", flush=False) + LOGGER.log( + m + f"Layer:{next_layer.layer_order} {next_layer.stamps[0].name} Itr:{1}", flush=False + ) else: self.state.continue_to_finish = True print("[INFO] Reached the end of the design.") @@ -141,7 +143,9 @@ def add_breakpoint(self, layer_num, iteration=1): current_layer_order = current_layer.layer_order final_layer_order = self.state.get_last_layer().layer_order if layer_num < current_layer_order or layer_num > final_layer_order: - print(f"[ERROR] Layer Out of bounds. Current: {current_layer_order} Final: {final_layer_order}") + print( + f"[ERROR] Layer Out of bounds. Current: {current_layer_order} Final: {final_layer_order}" + ) return self.state.add_breakpoint(layer_num, iteration) diff --git a/src/mldebug/layer_info.py b/src/mldebug/layer_info.py index 1c49a29..28120c7 100644 --- a/src/mldebug/layer_info.py +++ b/src/mldebug/layer_info.py @@ -25,8 +25,8 @@ # Stepping to this causes lock stall "mllib_graphs::resize_adf_wrapper", # This has many sublayers and needs to be better understood - "mllib_graphs::mha_type1::mha_adf_wrapper" - ] + "mllib_graphs::mha_type1::mha_adf_wrapper", +] def _strip_template(name): @@ -38,6 +38,7 @@ def _strip_template(name): idx = name.find("<") return name[:idx] if idx != -1 else name + # For now skip these kernels for end pc skip_end_pc_kernels = [ # kernel with 3 end pc release based on depth, width and height iter @@ -111,7 +112,9 @@ def __init__(self, entry, buf_type, size_shift, aie_iface, ifm=False, ofm=False, if "l1_ping" in entry: ping = entry["l1_ping"] pong = entry.get("l1_pong", ping) - self.l1 = L1Buffer(int(ping[0], 16), ping[1] * size_shift, int(pong[0], 16), pong[1] * size_shift) + self.l1 = L1Buffer( + int(ping[0], 16), ping[1] * size_shift, int(pong[0], 16), pong[1] * size_shift + ) # Handle both "l2" format and "l2_ping/l2_pong" format l2_bufs_list = [] @@ -199,6 +202,7 @@ class Stamp: end_pc (int): End program counter. elf_name (str): Associated ELF object. """ + name: str start_pc: int = 0 end_pc: int = 0 @@ -276,7 +280,9 @@ def __init__(self, info, size_shift, version, aie_iface, num_stamps, mladf_repor # 1. Layers without any kernel should be skipped # 2. Unsupported superkernel should be skipped if info.get("is_concat") or not kname or any(k in kname for k in unsupported_superkernels): - LOGGER.verbose_print(f"[WARNING] unsupported kernel {kname} at Layer {self.layer_order} will be skipped.") + LOGGER.verbose_print( + f"[WARNING] unsupported kernel {kname} at Layer {self.layer_order} will be skipped." + ) self.is_unsupported = True return @@ -285,8 +291,14 @@ def __init__(self, info, size_shift, version, aie_iface, num_stamps, mladf_repor for sid, stamp in enumerate(self.stamps): stamp.name = mladf_report.get_skname_for_bilo(self.layer_order, sid) stamp.elf_name = mladf_report.get_elfid_for_bilo(self.layer_order, sid) - if not stamp.name or stamp.elf_name == -1 or any(k in stamp.name for k in unsupported_superkernels): - LOGGER.verbose_print(f"[WARNING] unsupported kernel {stamp.name} at Layer {self.layer_order} will be skipped.") + if ( + not stamp.name + or stamp.elf_name == -1 + or any(k in stamp.name for k in unsupported_superkernels) + ): + LOGGER.verbose_print( + f"[WARNING] unsupported kernel {stamp.name} at Layer {self.layer_order} will be skipped." + ) self.is_unsupported = True return self.lcp.num_iter = mladf_report._get_iters_for_bilo(self.layer_order) @@ -396,7 +408,10 @@ def _match_l3_buffer(self, fm, l3_buffer_names, l3_buffer_sizes, size_shift): if size.get("type"): size_shift = SIZE_BYTES.get(size["type"], 1) buffer = L3Buffer( - name=full_l3_name, tensor_name=size.get("tensor_name"), size=int(size["size"] * size_shift), offset=None + name=full_l3_name, + tensor_name=size.get("tensor_name"), + size=int(size["size"] * size_shift), + offset=None, ) if "ifm" in fm: self.l3_ifm_buffers.append(buffer) @@ -406,7 +421,9 @@ def _match_l3_buffer(self, fm, l3_buffer_names, l3_buffer_sizes, size_shift): break if not is_substr: - raise RuntimeError(f"The sub-name {sub_name} is not in the list of full L3 names {l3_buffer_names}") + raise RuntimeError( + f"The sub-name {sub_name} is not in the list of full L3 names {l3_buffer_names}" + ) def _initialize_l3_buffers(self, info, version): """ @@ -421,13 +438,18 @@ def _initialize_l3_buffers(self, info, version): if "ifm" in info: for _, entry in enumerate(info["ifm"], start=1): if "l3" in entry: - self._match_l3_buffer("ifm", entry["l3_buffer_names"], entry["l3"], SIZE_BYTES.get(entry.get("dtype"), 1)) + self._match_l3_buffer( + "ifm", entry["l3_buffer_names"], entry["l3"], SIZE_BYTES.get(entry.get("dtype"), 1) + ) else: fms = ["ifm", "ifm2", "ofm"] for name in fms: if name in info and "l3" in info[name]: self._match_l3_buffer( - name, info[name]["l3_buffer_names"], info[name]["l3"], SIZE_BYTES.get(info[name].get("dtype"), 1) + name, + info[name]["l3_buffer_names"], + info[name]["l3"], + SIZE_BYTES.get(info[name].get("dtype"), 1), ) def _initialize_buffers(self, info, aie_iface, size_shift, version): @@ -491,7 +513,9 @@ def __init__(self, args): self.mladf_report = None has_bi = args.buffer_info and Path(args.buffer_info).is_file() - use_mladf = args.mladf_report and Path(args.mladf_report).is_file() and not args.run_flags.disable_tg + use_mladf = ( + args.mladf_report and Path(args.mladf_report).is_file() and not args.run_flags.disable_tg + ) data = None # 1. Parse the buffer info to get Layout if has_bi: @@ -504,7 +528,9 @@ def __init__(self, args): # 3. Parse mladf report. # TBD: memory optimize this as this json can be large if not args.aie_only and has_bi and use_mladf: - self.mladf_report = MladfReport(args.buffer_info, args.mladf_report, self.overlay.get_stampwidth()) + self.mladf_report = MladfReport( + args.buffer_info, args.mladf_report, self.overlay.get_stampwidth() + ) # 4. Initialize Layers if not args.aie_only: self._init_layers(data, args.aie_iface, num_stamps, num_batches) @@ -514,12 +540,15 @@ def __init__(self, args): if layer.pm_work_dir: path = os.path.join(args.aie_dir, layer.pm_work_dir) if layer.pm_work_dir not in self.x2_work_dirs: - self.x2_work_dirs[layer.pm_work_dir] = WorkDir(path, args.peano, self.overlay, self.aie_iface.ARCH_NAME) + self.x2_work_dirs[layer.pm_work_dir] = WorkDir( + path, args.peano, self.overlay, self.aie_iface.ARCH_NAME + ) self.layer_workdir_map[layer.layer_order] = self.x2_work_dirs[layer.pm_work_dir] self.work_dir = next(iter(self.layer_workdir_map.values())) else: - self.work_dir = WorkDir(args.aie_dir, args.peano, self.overlay, - self.aie_iface.ARCH_NAME, args.run_flags.dump_temps) + self.work_dir = WorkDir( + args.aie_dir, args.peano, self.overlay, self.aie_iface.ARCH_NAME, args.run_flags.dump_temps + ) if not args.aie_only: # Set PC Value for layers @@ -694,7 +723,7 @@ def initialize_l3_layer_mapping(self, flexmlrt_hsi, external_buffer_id): name=f"{orig_buffer.name}_stamp_{b}", tensor_name=orig_buffer.tensor_name, size=orig_buffer.size, - offset=None + offset=None, ) layer.l3_buffers.append(stamped_buffer) @@ -837,8 +866,9 @@ def _init_layers(self, raw_info, aie_iface, num_stamps, num_batches=1): raw_layers = sorted(raw_layers.items(), key=lambda item: item[1]["layer_order"]) for entry in raw_layers: info = entry[1] - layer = Layer(info, size_shift, version, aie_iface, num_stamps, self.mladf_report, - num_batches=num_batches) + layer = Layer( + info, size_shift, version, aie_iface, num_stamps, self.mladf_report, num_batches=num_batches + ) self.layers.append(layer) def _initialize_layers_from_workdir_x2(self, args): @@ -863,7 +893,9 @@ def _initialize_layers_from_workdir_x2(self, args): # Resolve PCs once per stamp for sid in range(self.overlay.get_stamps_per_batch()): for layer in self.layers: - flist = list(self.layer_workdir_map[layer.layer_order].stamps[sid].aie_functions.values())[0] + flist = list(self.layer_workdir_map[layer.layer_order].stamps[sid].aie_functions.values())[ + 0 + ] self.layer_workdir_map[layer.layer_order].stamps[sid].pm_reload_en = True for f in flist: if _strip_template(layer.stamps[sid].name.lower()) == _strip_template(f.name.lower()): @@ -907,8 +939,7 @@ def _initialize_layers_from_workdir(self, args): aiec_info = self.work_dir.stamp(sid) # Index functions by elf_id and stripped name for direct lookup. funcs_by_elf = { - elf_name.split("reloadable")[-1]: - {_strip_template(f.name.lower()): f for f in flist} + elf_name.split("reloadable")[-1]: {_strip_template(f.name.lower()): f for f in flist} for elf_name, flist in aiec_info.aie_functions.items() } for layer in self.layers: @@ -923,10 +954,14 @@ def _initialize_layers_from_workdir(self, args): elif aiec_info.pm_reload_en: # In buffer_info the flexml_ids might not be in order of stamps, so # match on flexml-id membership and name within the same ELF. - elf_id = next((e for e, fns in funcs_by_elf.items() - if key in fns - and any(i in aiec_info.elf_flxmlid_maps[e] for i in layer.flexml_ids)), - None) + elf_id = next( + ( + e + for e, fns in funcs_by_elf.items() + if key in fns and any(i in aiec_info.elf_flxmlid_maps[e] for i in layer.flexml_ids) + ), + None, + ) else: elf_id = next((e for e, fns in funcs_by_elf.items() if key in fns), None) @@ -947,13 +982,19 @@ def _initialize_layers_from_workdir(self, args): if idx >= len(self.layers) - 1: layer.lcp.num_iter = 1 break - next_layer_stamps = self.layers[idx+1].stamps + next_layer_stamps = self.layers[idx + 1].stamps if args.run_flags.multistamp: - if (layer.stamps[0].name != next_layer_stamps[0].name + if ( + layer.stamps[0].name != next_layer_stamps[0].name and len(layer.stamps) == len(next_layer_stamps) - and all(layer.stamps[i].elf_name == next_layer_stamps[i].elf_name for i in range(len(layer.stamps))) - ): + and all( + layer.stamps[i].elf_name == next_layer_stamps[i].elf_name + for i in range(len(layer.stamps)) + ) + ): layer.lcp.num_iter = 1 - elif (layer.stamps[0].name != next_layer_stamps[0].name - and layer.stamps[0].elf_name == next_layer_stamps[0].elf_name ): + elif ( + layer.stamps[0].name != next_layer_stamps[0].name + and layer.stamps[0].elf_name == next_layer_stamps[0].elf_name + ): layer.lcp.num_iter = 1 diff --git a/src/mldebug/memory_dumper.py b/src/mldebug/memory_dumper.py index 10ad0c0..325e4cd 100644 --- a/src/mldebug/memory_dumper.py +++ b/src/mldebug/memory_dumper.py @@ -204,7 +204,9 @@ def dump_x2_buffers(self, layer, it): self.dump_l3_buffers(layer, x2=True) previous_layer = self.state.get_previous_layer() if previous_layer: - self.dump_memory_l2(previous_layer.out_buffers, it, previous_layer.layer_order, use_l2_names=True) + self.dump_memory_l2( + previous_layer.out_buffers, it, previous_layer.layer_order, use_l2_names=True + ) def dump_l3_buffers(self, layer, x2=False): """ @@ -268,5 +270,7 @@ def dump_l3_buffers_interactive(self): self.dump_l3_buffers(self.state.get_current_layer(), x2=self.args.x2_folder_path is not None) if self.state.get_current_layer() and self.state.get_current_layer().l3_buffers: for buffer in self.state.get_current_layer().l3_buffers: - LOGGER.log(f"[INFO] L3 buffer '{buffer.name}' dumped successfully (offset={buffer.offset}, size={buffer.size})") + LOGGER.log( + f"[INFO] L3 buffer '{buffer.name}' dumped successfully (offset={buffer.offset}, size={buffer.size})" + ) LOGGER.log(f"[INFO] Memory dump complete at : {self.get_output_path()}") diff --git a/src/mldebug/mladf_report.py b/src/mldebug/mladf_report.py index 9dfeb17..6050d33 100644 --- a/src/mldebug/mladf_report.py +++ b/src/mldebug/mladf_report.py @@ -10,6 +10,7 @@ from pathlib import Path + def load_json(path): """ utility @@ -21,10 +22,12 @@ def load_json(path): print(e) return {} + class MladfReport: """ Encapsulates MLADF Details """ + def __init__(self, bi_file, m2_file, cps=4): """ bi_file: path to buffer_info.json @@ -53,7 +56,7 @@ def get_skname_for_bilo(self, bilo, sid=0): """ aiec_layers = self.get_aiec_layers_by_bilo(bilo) if aiec_layers: - core = f"{sid*self.cps}_0" + core = f"{sid * self.cps}_0" if aiec_layers[0]["core_information"].get(core): try: kname = aiec_layers[0]["core_information"][core]["kernel_name"] @@ -84,7 +87,7 @@ def get_elfid_for_bilo(self, bilo, sid): if not aiec_layers: return -1 - core = f"{sid*self.cps}_0" + core = f"{sid * self.cps}_0" pm_info = {} if aiec_layers[0]["core_information"].get(core): pm_info = aiec_layers[0]["core_information"][core].get("pm_information", {}) @@ -115,12 +118,12 @@ def _extract_m2_parent_graphs(self, kernel_instances_str): if not inst: continue - flexml_match = re.search(r'(flexml_layers\[\d+\])', inst) + flexml_match = re.search(r"(flexml_layers\[\d+\])", inst) if flexml_match: parents.add(flexml_match.group(1)) continue - flexml_flat = re.search(r'flexml_layer_(\d+)', inst) + flexml_flat = re.search(r"flexml_layer_(\d+)", inst) if flexml_flat: parents.add(f"flexml_layers[{flexml_flat.group(1)}]") continue @@ -128,14 +131,14 @@ def _extract_m2_parent_graphs(self, kernel_instances_str): parts = inst.split(".") found = False for part in parts: - if re.search(r'_layer_\d+', part) and "_mk[" not in part: - parent = re.sub(r'_layer_\d+$', '', part) + if re.search(r"_layer_\d+", part) and "_mk[" not in part: + parent = re.sub(r"_layer_\d+$", "", part) parents.add(parent) found = True break if not found and len(parts) >= 2: - candidate = re.sub(r'^compute_graph\.', '', inst).split(".")[0] + candidate = re.sub(r"^compute_graph\.", "", inst).split(".")[0] if candidate: parents.add(candidate) @@ -147,7 +150,7 @@ def _extract_m2_parent_graphs(self, kernel_instances_str): # strip regex below handles inner names like `..._layer_0_0[0]` too. for part in parts: if part.startswith("templated_graph_"): - outer = re.sub(r'_layer_\d+(?:_\d+)*(?:\[\d+\])?$', '', part) + outer = re.sub(r"_layer_\d+(?:_\d+)*(?:\[\d+\])?$", "", part) parents.add(outer) break @@ -160,8 +163,8 @@ def _extract_parent_graph(self, name): "compute_graph.flexml_layers[3]" -> "flexml_layers[3]" """ - stripped = re.sub(r'^compute_graph\.', '', name) - parent = re.sub(r'_layer_\d+$', '', stripped) + stripped = re.sub(r"^compute_graph\.", "", name) + parent = re.sub(r"_layer_\d+$", "", stripped) return parent def _approach1_map(self, bi_layers, m2_layers): @@ -190,4 +193,3 @@ def _approach1_map(self, bi_layers, m2_layers): bi_to_m2[bi_key].append(m2_key) return bi_to_m2 - diff --git a/src/mldebug/mldebug_cli.py b/src/mldebug/mldebug_cli.py index b41ac19..328ca08 100644 --- a/src/mldebug/mldebug_cli.py +++ b/src/mldebug/mldebug_cli.py @@ -37,7 +37,8 @@ def _apply_unsupported_kernels_from_args(args): This must happen before LayerInfo creates Layer objects (ClientDebug -> LayerInfo). """ - from mldebug import layer_info # pylint: disable=import-outside-toplevel + from mldebug import layer_info # pylint: disable=import-outside-toplevel + values = args.unsupported_kernels if not values: return @@ -87,7 +88,9 @@ def check_args(args): args.interactive = True print("[INFO] Using standalone mode for core dumps") if args.backend == "core_dump" and args.core_dump is None: - print("[ERROR] Core dump file is required when backend is 'core_dump'. Please use -h or --help for usage") + print( + "[ERROR] Core dump file is required when backend is 'core_dump'. Please use -h or --help for usage" + ) if args.device == AIE_DEV_NPU3 and not args.overlay: args.overlay = "3x4" return True @@ -152,11 +155,7 @@ def launch_debug(args, output_dir): _apply_unsupported_kernels_from_args(args) handle = ClientDebug(args, context_id, pid, output_dir) if args.dump_aie_status: - handle.status_handle.get( - args.dump_aie_status, - advanced=True, - guidance=False - ) + handle.status_handle.get(args.dump_aie_status, advanced=True, guidance=False) print(f"[INFO] Advanced AIE status written to {args.dump_aie_status}") return if args.exec_cmd is not None: @@ -168,6 +167,7 @@ def launch_debug(args, output_dir): else: handle.execute_and_dump() + def _dev_cli_help(text): """ Show help text only when ENABLE_DEV is set (e.g. via mldebug.py launcher). @@ -177,6 +177,7 @@ def _dev_cli_help(text): return text return argparse.SUPPRESS + def app(): """ Entry Point @@ -201,17 +202,27 @@ def app(): required=False, metavar="", ) - p.add_argument("-a", "--aie_dir", help="Path to AIE Work Directory. Default: Work", default="Work", metavar="") + p.add_argument( + "-a", + "--aie_dir", + help="Path to AIE Work Directory. Default: Work", + default="Work", + metavar="", + ) # Hidden Argument # XRT backend is applicable on the Client host. # Test backend is for internal testing # Core_dump backend is for reading from the core_dump file - p.add_argument("-x", "--backend", help=argparse.SUPPRESS, choices=["xrt", "test", "core_dump"], default="xrt") - p.add_argument("-c", "--core_dump", - help="Run standalone mode for core-dump inspection.\n" - "Use -d flag to specify device.", - type=str, - metavar = "COREDUMP_FILE") + p.add_argument( + "-x", "--backend", help=argparse.SUPPRESS, choices=["xrt", "test", "core_dump"], default="xrt" + ) + p.add_argument( + "-c", + "--core_dump", + help="Run standalone mode for core-dump inspection.\nUse -d flag to specify device.", + type=str, + metavar="COREDUMP_FILE", + ) p.add_argument( "--dump-aie-status", dest="dump_aie_status", @@ -219,8 +230,9 @@ def app(): help="Write AIE status to a file and exit.\n", default=None, ) - p.add_argument("--no_header", action="store_true", - help="Assume raw core dump without header. Use with -c.") + p.add_argument( + "--no_header", action="store_true", help="Assume raw core dump without header. Use with -c." + ) # Hidden Argument # 'AIE Device type' p.add_argument( @@ -242,7 +254,9 @@ def app(): # aie_status to aie_status_.txt p.add_argument("-n", "--name", help=argparse.SUPPRESS, required=False, metavar="") p.add_argument("-o", "--overlay", help="Overlay used by design. Default: 4x4", metavar="") - p.add_argument("-i", "--interactive", action="store_true", help="Launch in Interactive Mode. Default: Batch") + p.add_argument( + "-i", "--interactive", action="store_true", help="Launch in Interactive Mode. Default: Batch" + ) p.add_argument( "-l", "--output_dir", @@ -267,7 +281,10 @@ def app(): metavar="", ) p.add_argument( - "-s", "--aie_only", action="store_true", help="Standalone AIE debug. Work dir can be optionally specified." + "-s", + "--aie_only", + action="store_true", + help="Standalone AIE debug. Work dir can be optionally specified.", ) p.add_argument( "--exec_cmd", @@ -277,10 +294,12 @@ def app(): help="Execute a command in the advanced shell (-s) and exit.", ) p.add_argument( - "-e", "--exit_at_layer", type=int, - #help="Run until this layer and exit in batch mode.", + "-e", + "--exit_at_layer", + type=int, + # help="Run until this layer and exit in batch mode.", help=argparse.SUPPRESS, - metavar="LAYER" + metavar="LAYER", ) p.add_argument( "-l3", @@ -294,7 +313,7 @@ def app(): action="store_true", help=argparse.SUPPRESS, # This was needed for fsp - #help="Coordinate with flexmlrt to automatically run the design. Run with ENABLE_ML_DEBUG=3", + # help="Coordinate with flexmlrt to automatically run the design. Run with ENABLE_ML_DEBUG=3", ) # Hidden Argument # Use this tool with AIESim @@ -302,9 +321,7 @@ def app(): p.add_argument( "--peano", action="store_true", - help=_dev_cli_help( - "Enable support for peano.\nWith -v flag, peano support is autodetected." - ), + help=_dev_cli_help("Enable support for peano.\nWith -v flag, peano support is autodetected."), ) p.add_argument( "--unsupported_kernels", @@ -313,8 +330,8 @@ def app(): default=None, metavar="KERNEL", help=argparse.SUPPRESS, - #help="Additional kernel names to treat as unsupported and skip during execution.\n" - #"Example: --unsupported_kernels conv2d_maxpool superkernel_clip1d\n", + # help="Additional kernel names to treat as unsupported and skip during execution.\n" + # "Example: --unsupported_kernels conv2d_maxpool superkernel_clip1d\n", ) p.add_argument( "-f", @@ -333,20 +350,20 @@ def app(): "dump_temps", "multistamp", "disable_tg", - "skip_iter2" + "skip_iter2", ], help="Specify one or more runtime flags:\n" "skip_dump : Do not dump memory\n" - #"layer_status : Dump AIE status at start of each layer\n" - #"l2_dump_only : Dump only L2 buffers\n" + # "layer_status : Dump AIE status at start of each layer\n" + # "l2_dump_only : Dump only L2 buffers\n" "l2_ifm_dump : Dump only L2 IFM buffers\n" "l1_ofm_dump : Dump L1 ofm buffers in addition to others\n" "text_dump : Dump in text format\n" "skip_iter : Skip iterations in batch mode when possible\n" "skip_iter2 : skip_iter using lcp lock.(Telluride only)\n" - #"dump_temps : Write intermediate (.lst) files to disk\n" + # "dump_temps : Write intermediate (.lst) files to disk\n" "multistamp : Enable N Stamp/Batch mode\n", - #"disable_tg : Disable Step to TG layers\n", + # "disable_tg : Disable Step to TG layers\n", # 'mock_hang' : Simulate hang at one of the layers in test mode metavar=" ", ) diff --git a/src/mldebug/utils.py b/src/mldebug/utils.py index 92db172..0623dd0 100644 --- a/src/mldebug/utils.py +++ b/src/mldebug/utils.py @@ -336,6 +336,7 @@ def print_tile_grid(title, tiles, register_values=None, format_type="hex"): print(f"{'=' * total_width}") + def input_with_timeout(prompt, timeout): """ Read a line from stdin, or return None after ``timeout`` seconds. @@ -409,7 +410,8 @@ def is_aarch64(): """ ARM """ - return platform.machine().lower() in ['aarch64', 'arm64'] + return platform.machine().lower() in ["aarch64", "arm64"] + def is_windows(): """ @@ -417,6 +419,7 @@ def is_windows(): """ return os.name == "nt" + def is_linux(): """ x86 Linux diff --git a/src/mldebug/work_dir.py b/src/mldebug/work_dir.py index f77ed1f..4c33121 100644 --- a/src/mldebug/work_dir.py +++ b/src/mldebug/work_dir.py @@ -15,6 +15,7 @@ from mldebug.extra.calltree import AIECallTree from mldebug.utils import LOGGER, is_aarch64, is_windows + @dataclass class AIEFunction: """ @@ -94,7 +95,7 @@ def _parse_flexml_layer_id(objstr): if m: return int(m[0]) return -1 - #raise RuntimeError(f"Unable to parse flexml layer id from {objstr}") + # raise RuntimeError(f"Unable to parse flexml layer id from {objstr}") class WorkDir: @@ -102,7 +103,7 @@ class WorkDir: Abstraction for AIE Work Directory """ - def __init__(self, aie_dir, peano, overlay, arch_name, dump_lst=False): + def __init__(self, aie_dir, peano, overlay, arch_name, dump_lst=False): """ Initialize the AIE Work Directory abstraction. Sets up internal state and parses functions. Args: @@ -169,7 +170,9 @@ def _parse_aie_runtime_control(self, work_dir, col, row, stampid): """ elf_layer_map = {} # Elfs for different columns can be reloaded in same line so we have to create multiple groups - pattern = re.compile("reloadable elf for .*{?\\[col:" + f"{col}" + " row:" + f"{row}" + "\\]([0-9]+)(.+)") + pattern = re.compile( + "reloadable elf for .*{?\\[col:" + f"{col}" + " row:" + f"{row}" + "\\]([0-9]+)(.+)" + ) with open(work_dir + "/ps/c_rts/aie_runtime_control.cpp", encoding="utf-8") as fd: for line in fd: match = pattern.search(line) @@ -212,7 +215,15 @@ def _get_lst(self, elf_path, elf_name, arch_name, dump_lst): exe = "llvm-objdump.aarch64" with resources.as_file(resources.files("mldebug") / "bin" / exe) as objdump_path: lst = subprocess.check_output( - [str(objdump_path), "-d", "-z", "--no-show-raw-insn", f"--arch-name={arch_name}", "-C", elf_path] + [ + str(objdump_path), + "-d", + "-z", + "--no-show-raw-insn", + f"--arch-name={arch_name}", + "-C", + elf_path, + ] ) lst_data = lst.decode("utf-8") @@ -487,8 +498,12 @@ def _extract_var(lines, var_name): tokens = line.split() if len(tokens) >= 3: try: - self.stamps[sid].globals.append(GlobalVar(var_name, int(tokens[0], base=16), int(tokens[2], base=16))) - LOGGER.verbose_print(f"[INFO] Found global variable: {var_name} at {tokens[0]} size {tokens[2]}") + self.stamps[sid].globals.append( + GlobalVar(var_name, int(tokens[0], base=16), int(tokens[2], base=16)) + ) + LOGGER.verbose_print( + f"[INFO] Found global variable: {var_name} at {tokens[0]} size {tokens[2]}" + ) except ValueError: pass # Ignore lines that cannot be parsed break @@ -534,7 +549,9 @@ def _extract_var(lines, var_name): end_addr = int(tokens[1], base=16) size = end_addr - start_addr + 1 self.stamps[sid].globals.append(GlobalVar(var_name, start_addr, size)) - LOGGER.verbose_print(f"[INFO] Found global variable: {var_name} at {start_addr} size {size}") + LOGGER.verbose_print( + f"[INFO] Found global variable: {var_name} at {start_addr} size {size}" + ) except ValueError: pass # Ignore lines that cannot be parsed break