From 7ad7a85f9513d7f8dd5b4e6f077fa331fe1febf6 Mon Sep 17 00:00:00 2001 From: Stephen Shao Date: Thu, 5 Mar 2026 16:01:33 -0600 Subject: [PATCH 1/3] Implemented Priums launcher support --- docs/launchers.md | 67 ++++++++++++++++--- src/madengine/deployment/kubernetes.py | 47 ++++++++++++- src/madengine/deployment/slurm.py | 25 +++++++ .../deployment/templates/slurm/job.sh.j2 | 2 + src/madengine/execution/container_runner.py | 34 +++++++++- 5 files changed, 162 insertions(+), 13 deletions(-) diff --git a/docs/launchers.md b/docs/launchers.md index 836fc2d0..99dc1ed9 100644 --- a/docs/launchers.md +++ b/docs/launchers.md @@ -16,6 +16,7 @@ madengine provides unified support for multiple distributed frameworks, enabling | **DeepSpeed** | Training | ZeRO optimization training | ✅ | ✅ | ✅ | | **Megatron-LM** | Training | Large-scale transformer training | ✅ | ✅ | ✅ | | **TorchTitan** | Training | LLM pre-training (FSDP2+TP+PP) | ✅ | ✅ | ✅ | +| **Primus** | Training | Megatron/TorchTitan/Jax via Primus config | ✅ | ✅ | ✅ | | **vLLM** | Inference | High-throughput LLM serving | ✅ | ✅ | ✅ | | **SGLang** | Inference | Fast LLM inference | ✅ | ✅ | ✅ | | **SGLang Disaggregated** | Inference | Large-scale disaggregated inference | ✅ | ✅ | ✅ (min 3) | @@ -224,6 +225,43 @@ TORCHTITAN_CONTEXT_PARALLEL_SIZE=1 - K8s: `examples/k8s-configs/minimal/torchtitan-single-node-minimal.json` - SLURM: `examples/slurm-configs/minimal/torchtitan-single-node-minimal.json` +--- + +### 5b. Primus + +**Purpose**: Unified pretrain entry for Megatron-LM, TorchTitan, and Jax/MaxText via Primus experiment YAML. + +**When to Use**: +- Run Primus example configs (e.g. `examples/megatron/configs/MI300X/*.yaml`) via madengine +- Single image plus config path; scheduling and tools/metrics from madengine + +**Configuration**: +```json +{ + "distributed": { + "launcher": "primus", + "nnodes": 2, + "nproc_per_node": 8, + "primus": { + "config_path": "examples/megatron/configs/MI300X/deepseek_v2_lite-BF16-pretrain.yaml", + "cli_extra": "" + } + } +} +``` + +**Features**: +- Launcher only sets `PRIMUS_CONFIG_PATH` and optional `PRIMUS_CLI_EXTRA`; no `MAD_MULTI_NODE_RUNNER` +- Model script (e.g. `run.sh`) sets `EXP` and calls Primus `run_pretrain.sh` +- NNODES, NODE_RANK, MASTER_ADDR, etc. set by madengine job template +- Use with MAD-Internal Primus submodule and `scripts/primus_pretrain/run.sh` + +**Examples**: +- SLURM: `examples/slurm-configs/minimal/primus-minimal.json` +- K8s: `examples/k8s-configs/minimal/primus-minimal.json` + +--- + **Model Configuration** (TOML): ```toml [model] @@ -519,16 +557,16 @@ madengine run --tags model --config custom-split-config.json ### Training Launchers -| Feature | torchrun | DeepSpeed | Megatron-LM | TorchTitan | -|---------|----------|-----------|-------------|------------| -| **Data Parallel** | ✅ DDP | ✅ ZeRO | ✅ | ✅ FSDP2 | -| **Tensor Parallel** | ❌ | ❌ | ✅ | ✅ | -| **Pipeline Parallel** | ❌ | ✅ | ✅ | ✅ | -| **Memory Efficiency** | Medium | High (ZeRO) | High | Very High | -| **Ease of Use** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | -| **Model Size** | Small-Medium | Medium-Large | Very Large | Very Large | -| **K8s Support** | ✅ | ✅ | ✅ | ✅ | -| **SLURM Support** | ✅ | ✅ | ✅ | ✅ | +| Feature | torchrun | DeepSpeed | Megatron-LM | TorchTitan | Primus | +|---------|----------|-----------|-------------|------------|--------| +| **Data Parallel** | ✅ DDP | ✅ ZeRO | ✅ | ✅ FSDP2 | via config | +| **Tensor Parallel** | ❌ | ❌ | ✅ | ✅ | via config | +| **Pipeline Parallel** | ❌ | ✅ | ✅ | ✅ | via config | +| **Memory Efficiency** | Medium | High (ZeRO) | High | Very High | config-driven | +| **Ease of Use** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | +| **Model Size** | Small-Medium | Medium-Large | Very Large | Very Large | config-driven | +| **K8s Support** | ✅ | ✅ | ✅ | ✅ | ✅ | +| **SLURM Support** | ✅ | ✅ | ✅ | ✅ | ✅ | ### Inference Launchers @@ -646,6 +684,13 @@ TORCHTITAN_FSDP_ENABLED=1 MAD_MULTI_NODE_RUNNER="torchrun ..." ``` +**Primus**: +```bash +PRIMUS_CONFIG_PATH="examples/megatron/configs/MI300X/..." +PRIMUS_CLI_EXTRA="" # optional +# No MAD_MULTI_NODE_RUNNER (model script calls Primus run_pretrain.sh) +``` + **vLLM**: ```bash VLLM_TENSOR_PARALLEL_SIZE=4 @@ -681,7 +726,7 @@ SGLANG_NODE_RANK=${SLURM_PROCID} ```bash Error: Unknown launcher type 'xyz' ``` -Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `vllm`, `sglang`, `sglang-disagg` +Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `primus`, `vllm`, `sglang`, `sglang-disagg` **2. Multi-Node Communication Fails** ```bash diff --git a/src/madengine/deployment/kubernetes.py b/src/madengine/deployment/kubernetes.py index 29c9874e..1bd94698 100644 --- a/src/madengine/deployment/kubernetes.py +++ b/src/madengine/deployment/kubernetes.py @@ -56,6 +56,7 @@ "torchtitan", "deepspeed", "megatron-lm", + "primus", "vllm", "sglang", "sglang-disagg" @@ -873,6 +874,14 @@ def _prepare_template_context( self.console.print(f"[cyan]Configuring Megatron-LM: {nnodes} nodes × {nproc_per_node} GPUs/node[/cyan]") + elif launcher_type == "primus": + if not isinstance(nnodes, int) or nnodes < 1: + raise ValueError(f"Invalid nnodes: {nnodes}. Must be positive integer >= 1") + if not isinstance(nproc_per_node, int) or nproc_per_node < 1: + raise ValueError(f"Invalid nproc_per_node: {nproc_per_node}. Must be positive integer >= 1") + + self.console.print(f"[cyan]Configuring Primus: {nnodes} nodes × {nproc_per_node} GPUs/node[/cyan]") + # Determine if we need multi-node setup create_headless_service = False launcher_command = None @@ -990,6 +999,19 @@ def _prepare_template_context( model_script=model_info.get("scripts", "run.sh") ) + elif launcher_type == "primus": + if nnodes > 1: + create_headless_service = True + self.console.print(f"[dim]Multi-node Primus: Creating headless service for pod discovery[/dim]") + + # Generate Primus launcher command (env-only: PRIMUS_CONFIG_PATH, PRIMUS_CLI_EXTRA) + launcher_command = self._generate_primus_command( + nnodes=nnodes, + nproc_per_node=nproc_per_node, + master_port=master_port, + model_script=model_info.get("scripts", "run.sh") + ) + # Prepare pre/post scripts (similar to local execution) pre_scripts = [] post_scripts = [] @@ -2104,7 +2126,30 @@ def _generate_megatron_command( --master_addr=${{MASTER_ADDR}} \\ --master_port={master_port} \\ {model_script}""" - + + def _generate_primus_command( + self, nnodes: int, nproc_per_node: int, master_port: int, model_script: str + ) -> str: + """ + Generate Primus launcher command for K8s Indexed Jobs. + + Primus (Megatron-LM, TorchTitan, Jax/MaxText) runs via model script that calls + run_pretrain.sh. We only export PRIMUS_CONFIG_PATH and optional PRIMUS_CLI_EXTRA. + NNODES, NODE_RANK, MASTER_ADDR, etc. are set by the job template. + """ + primus_cfg = self.config.additional_context.get("distributed", {}).get("primus", {}) + config_path = primus_cfg.get("config_path", "exp_pretrain.yaml") + cli_extra = primus_cfg.get("cli_extra", "") + config_path_quoted = config_path.replace('"', '\\"') + lines = [ + "# Primus launcher (model script runs run_pretrain.sh)", + f'export PRIMUS_CONFIG_PATH="{config_path_quoted}"', + ] + if (cli_extra or "").strip(): + cli_extra_quoted = cli_extra.replace('"', '\\"') + lines.append(f'export PRIMUS_CLI_EXTRA="{cli_extra_quoted}"') + return "\n".join(lines) + def _load_k8s_tools(self) -> Dict: """ Load K8s-specific tools configuration. diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index 8a4a1f7b..1766e4b0 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -31,6 +31,7 @@ "torchtitan", "deepspeed", "megatron-lm", + "primus", "vllm", "sglang", "sglang-disagg" @@ -531,6 +532,8 @@ def _generate_launcher_command( return self._generate_megatron_command(nnodes, nproc_per_node, master_port) elif launcher_type == "torchtitan": return self._generate_torchtitan_command(nnodes, nproc_per_node, master_port) + elif launcher_type == "primus": + return self._generate_primus_command(nnodes, nproc_per_node, master_port) else: # For unknown launchers, provide basic environment variables # and let the model script handle launcher invocation @@ -819,6 +822,28 @@ def _generate_torchtitan_command( # Use torchrun as launcher (TorchTitan built on top of it) export MAD_MULTI_NODE_RUNNER="torchrun --nnodes={nnodes} --nproc_per_node={nproc_per_node} --node_rank=${{NODE_RANK}} --master_addr=${{MASTER_ADDR}} --master_port={master_port}"''' + def _generate_primus_command( + self, nnodes: int, nproc_per_node: int, master_port: int + ) -> str: + """ + Generate Primus launcher environment for SLURM. + + Primus (Megatron-LM, TorchTitan, Jax/MaxText) runs via model script that calls + run_pretrain.sh; NNODES, NODE_RANK, MASTER_ADDR, etc. are set by the job script. + We only export PRIMUS_CONFIG_PATH and optional PRIMUS_CLI_EXTRA. No MAD_MULTI_NODE_RUNNER. + """ + primus_cfg = self.distributed_config.get("primus", {}) + config_path = primus_cfg.get("config_path", "exp_pretrain.yaml") + cli_extra = primus_cfg.get("cli_extra", "") + # Safe shell quoting for config_path and cli_extra + config_path_quoted = config_path.replace('"', '\\"') + lines = [f'# Primus launcher (model script runs run_pretrain.sh)', + f'export PRIMUS_CONFIG_PATH="{config_path_quoted}"'] + if (cli_extra or "").strip(): + cli_extra_quoted = cli_extra.replace('"', '\\"') + lines.append(f'export PRIMUS_CLI_EXTRA="{cli_extra_quoted}"') + return "\n".join(lines) + def _generate_basic_env_command( self, nnodes: int, nproc_per_node: int, master_port: int ) -> str: diff --git a/src/madengine/deployment/templates/slurm/job.sh.j2 b/src/madengine/deployment/templates/slurm/job.sh.j2 index 06ee4c64..07c92869 100644 --- a/src/madengine/deployment/templates/slurm/job.sh.j2 +++ b/src/madengine/deployment/templates/slurm/job.sh.j2 @@ -594,7 +594,9 @@ echo " RANK (node rank): ${RANK}" echo " NODE_RANK: ${NODE_RANK}" echo " NNODES: ${NNODES}" echo " NPROC_PER_NODE: ${GPUS_PER_NODE}" +{% if launcher_type in ['torchrun', 'deepspeed', 'megatron'] %} echo " MAD_MULTI_NODE_RUNNER: ${MAD_MULTI_NODE_RUNNER}" +{% endif %} if [ "${SLURM_PROCID}" = "0" ]; then echo " MAD_IS_MASTER_NODE: true (will collect performance metrics)" else diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index c3299049..cfd4500b 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -208,7 +208,7 @@ def create_run_details_dict( "docker_file": build_info.get("dockerfile", ""), "base_docker": build_info.get("base_docker", ""), "docker_sha": build_info.get("docker_sha", ""), - "docker_image": build_info.get("docker_image", ""), + "docker_image": run_results.get("docker_image", build_info.get("docker_image", "")), "git_commit": run_results.get("git_commit", ""), "machine_name": run_results.get("machine_name", ""), "deployment_type": os.environ.get("MAD_DEPLOYMENT_TYPE", "local"), # local, slurm, etc. @@ -624,6 +624,31 @@ def gather_system_env_details( pre_encapsulate_post_scripts["pre_scripts"].append(pre_env_details) print(f"pre encap post scripts: {pre_encapsulate_post_scripts}") + def _resolve_docker_image(self, docker_image: str, model_name: str) -> str: + """Resolve Docker image: use requested image if present, else primus_pretrain fallback with clear error.""" + try: + self.console.sh(f"docker image inspect {docker_image} >/dev/null 2>&1") + return docker_image + except (subprocess.CalledProcessError, RuntimeError, Exception): + pass + if model_name.startswith("primus_pretrain/"): + fallback = "ci-primus_pretrain_primus.ubuntu.amd" + try: + self.console.sh(f"docker image inspect {fallback} >/dev/null 2>&1") + print( + f"ℹ️ Using shared Primus image (one build for all primus_pretrain configs): {fallback}" + ) + return fallback + except (subprocess.CalledProcessError, RuntimeError, Exception): + raise RuntimeError( + f"Docker image '{docker_image}' not found and fallback '{fallback}' not found. " + "Build the Primus image first: madengine build --tags primus_pretrain --additional-context-file .json" + ) from None + raise RuntimeError( + f"Docker image '{docker_image}' not found. " + "Build it first: madengine build --tags --additional-context-file .json" + ) from None + def run_container( self, model_info: typing.Dict, @@ -654,6 +679,9 @@ def run_container( """ self.rich_console.print(f"[bold green]🏃 Running model:[/bold green] [bold cyan]{model_info['name']}[/bold cyan] [dim]in container[/dim] [yellow]{docker_image}[/yellow]") + # Resolve image: if model-specific image is missing, try shared primus_pretrain image (one build for all configs) + docker_image = self._resolve_docker_image(docker_image, model_info["name"]) + # Apply timeout logic: model timeout can override default timeout # If model has a timeout in models.json and CLI timeout is default (7200), use model's timeout # If CLI timeout is explicitly set (not default), it overrides model timeout @@ -703,6 +731,8 @@ def run_container( # If build info provided, merge it if build_info: run_results.update(build_info) + # Preserve actual image used (resolved, possibly fallback) for perf reporting + run_results["docker_image"] = docker_image # Prepare docker run options gpu_vendor = self.context.ctx["gpu_vendor"] @@ -782,6 +812,8 @@ def run_container( 'NNODES', 'NPROC_PER_NODE', 'MAD_MULTI_NODE_RUNNER', 'MAD_COLLECT_METRICS', 'NCCL_SOCKET_IFNAME', 'GLOO_SOCKET_IFNAME', 'NCCL_DEBUG', 'NCCL_IB_DISABLE', 'NCCL_NET_GDR_LEVEL', + # Primus launcher (config path and optional CLI extra args) + 'PRIMUS_CONFIG_PATH', 'PRIMUS_CLI_EXTRA', # GPU visibility variables for Ray-based launchers (vLLM, SGLang) # CRITICAL: These must be passed to Docker for proper GPU device mapping 'HIP_VISIBLE_DEVICES', 'ROCR_VISIBLE_DEVICES', 'CUDA_VISIBLE_DEVICES' From e0f63af7c891fc629af4c264676f974981e0f97c Mon Sep 17 00:00:00 2001 From: Stephen Shao Date: Thu, 5 Mar 2026 20:52:57 -0600 Subject: [PATCH 2/3] Fixed the reporting module for primus --- src/madengine/execution/container_runner.py | 40 ++++++++++++++----- .../orchestration/run_orchestrator.py | 8 +++- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index cfd4500b..624bc41c 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -1117,40 +1117,60 @@ def run_container( pre_encapsulate_post_scripts["post_scripts"], ) + # When model writes performance to a file in run_directory, copy to cwd + # so the host can read it (e.g. bind-mounted workspace) before extraction. + multiple_results_file = (model_info.get("multiple_results") or "").strip() + if multiple_results_file: + try: + model_docker.sh( + f"cp {model_dir}/{multiple_results_file} . 2>/dev/null || true" + ) + except Exception: + pass + # Extract performance metrics from logs # Look for performance data in the log output similar to original run_models.py try: # Check if multiple results file is specified in model_info multiple_results = model_info.get("multiple_results", None) + if multiple_results: + multiple_results = multiple_results.strip() if multiple_results: - run_results["performance"] = multiple_results - # Validate multiple results file format using proper CSV parsing + # Validate multiple results file and read performance/metric from CSV try: import csv with open(multiple_results, "r") as f: csv_reader = csv.DictReader(f) # Check if 'performance' column exists - if 'performance' not in csv_reader.fieldnames: + if csv_reader.fieldnames and 'performance' not in csv_reader.fieldnames: print("Error: 'performance' column not found in multiple results file.") run_results["performance"] = None + run_results["metric"] = None else: - # Check if at least one row has a non-empty performance value - has_valid_perf = False + # Use first row with non-empty performance value + run_results["performance"] = None + run_results["metric"] = None for row in csv_reader: - if row.get('performance', '').strip(): - has_valid_perf = True + perf_val = (row.get('performance') or '').strip() + if perf_val: + run_results["performance"] = perf_val + run_results["metric"] = ( + row.get('metric') or '' + ).strip() or "tokens_per_second" + print( + f"✓ Extracted performance (CSV): {run_results['performance']} {run_results['metric']}" + ) break - - if not has_valid_perf: - run_results["performance"] = None + if not run_results.get("performance"): print("Error: Performance metric is empty in all rows of multiple results file.") except Exception as e: self.rich_console.print( f"[yellow]Warning: Could not validate multiple results file: {e}[/yellow]" ) run_results["performance"] = None + run_results["metric"] = None else: # Match the actual output format: "performance: 14164 samples_per_second" # Simple pattern to capture number and metric unit diff --git a/src/madengine/orchestration/run_orchestrator.py b/src/madengine/orchestration/run_orchestrator.py index e1513bb3..243a8dab 100644 --- a/src/madengine/orchestration/run_orchestrator.py +++ b/src/madengine/orchestration/run_orchestrator.py @@ -530,7 +530,13 @@ def _execute_local(self, manifest_file: str, timeout: int) -> Dict: self.context.ctx["post_scripts"] = manifest_context["post_scripts"] if "encapsulate_script" in manifest_context: self.context.ctx["encapsulate_script"] = manifest_context["encapsulate_script"] - + # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs) + if "docker_env_vars" in manifest_context and manifest_context["docker_env_vars"]: + if "docker_env_vars" not in self.context.ctx: + self.context.ctx["docker_env_vars"] = {} + for k, v in manifest_context["docker_env_vars"].items(): + self.context.ctx["docker_env_vars"][k] = v + # Merge runtime additional_context (takes precedence over manifest) # This allows users to override tools/scripts at runtime if self.additional_context: From adbc493b426d8e34f2cd2e8e80a2f0a3b896fb19 Mon Sep 17 00:00:00 2001 From: Stephen Shao Date: Thu, 5 Mar 2026 22:23:29 -0600 Subject: [PATCH 3/3] Updated parser of multi results --- src/madengine/execution/container_runner.py | 30 ++++++++++++--------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 624bc41c..f1912dc8 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -1137,33 +1137,37 @@ def run_container( multiple_results = multiple_results.strip() if multiple_results: - # Validate multiple results file and read performance/metric from CSV + # Validate multiple results file (format: model, performance, metric) + # and set primary performance from tokens_per_second row, else first valid row try: import csv with open(multiple_results, "r") as f: csv_reader = csv.DictReader(f) - - # Check if 'performance' column exists if csv_reader.fieldnames and 'performance' not in csv_reader.fieldnames: print("Error: 'performance' column not found in multiple results file.") run_results["performance"] = None run_results["metric"] = None else: - # Use first row with non-empty performance value run_results["performance"] = None run_results["metric"] = None + first_valid = None for row in csv_reader: perf_val = (row.get('performance') or '').strip() + metric_val = (row.get('metric') or '').strip() if perf_val: - run_results["performance"] = perf_val - run_results["metric"] = ( - row.get('metric') or '' - ).strip() or "tokens_per_second" - print( - f"✓ Extracted performance (CSV): {run_results['performance']} {run_results['metric']}" - ) - break - if not run_results.get("performance"): + if first_valid is None: + first_valid = (perf_val, metric_val or "tokens_per_second") + if metric_val == "tokens_per_second": + run_results["performance"] = perf_val + run_results["metric"] = "tokens_per_second" + break + if run_results.get("performance") is None and first_valid: + run_results["performance"], run_results["metric"] = first_valid + if run_results.get("performance"): + print( + f"✓ Extracted performance (CSV): {run_results['performance']} {run_results['metric']}" + ) + elif not first_valid: print("Error: Performance metric is empty in all rows of multiple results file.") except Exception as e: self.rich_console.print(