Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 56 additions & 11 deletions docs/launchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ madengine provides unified support for multiple distributed frameworks, enabling
| **DeepSpeed** | Training | ZeRO optimization training | ✅ | ✅ | ✅ |
| **Megatron-LM** | Training | Large-scale transformer training | ✅ | ✅ | ✅ |
| **TorchTitan** | Training | LLM pre-training (FSDP2+TP+PP) | ✅ | ✅ | ✅ |
| **Primus** | Training | Megatron/TorchTitan/Jax via Primus config | ✅ | ✅ | ✅ |
| **vLLM** | Inference | High-throughput LLM serving | ✅ | ✅ | ✅ |
| **SGLang** | Inference | Fast LLM inference | ✅ | ✅ | ✅ |
| **SGLang Disaggregated** | Inference | Large-scale disaggregated inference | ✅ | ✅ | ✅ (min 3) |
Expand Down Expand Up @@ -224,6 +225,43 @@ TORCHTITAN_CONTEXT_PARALLEL_SIZE=1
- K8s: `examples/k8s-configs/minimal/torchtitan-single-node-minimal.json`
- SLURM: `examples/slurm-configs/minimal/torchtitan-single-node-minimal.json`

---

### 5b. Primus

**Purpose**: Unified pretrain entry for Megatron-LM, TorchTitan, and Jax/MaxText via Primus experiment YAML.

**When to Use**:
- Run Primus example configs (e.g. `examples/megatron/configs/MI300X/*.yaml`) via madengine
- Single image plus config path; scheduling and tools/metrics from madengine

**Configuration**:
```json
{
"distributed": {
"launcher": "primus",
"nnodes": 2,
"nproc_per_node": 8,
"primus": {
"config_path": "examples/megatron/configs/MI300X/deepseek_v2_lite-BF16-pretrain.yaml",
"cli_extra": ""
}
}
}
```

**Features**:
- Launcher only sets `PRIMUS_CONFIG_PATH` and optional `PRIMUS_CLI_EXTRA`; no `MAD_MULTI_NODE_RUNNER`
- Model script (e.g. `run.sh`) sets `EXP` and calls Primus `run_pretrain.sh`
- NNODES, NODE_RANK, MASTER_ADDR, etc. set by madengine job template
- Use with MAD-Internal Primus submodule and `scripts/primus_pretrain/run.sh`

**Examples**:
- SLURM: `examples/slurm-configs/minimal/primus-minimal.json`
- K8s: `examples/k8s-configs/minimal/primus-minimal.json`

---

**Model Configuration** (TOML):
```toml
[model]
Expand Down Expand Up @@ -519,16 +557,16 @@ madengine run --tags model --config custom-split-config.json

### Training Launchers

| Feature | torchrun | DeepSpeed | Megatron-LM | TorchTitan |
|---------|----------|-----------|-------------|------------|
| **Data Parallel** | ✅ DDP | ✅ ZeRO | ✅ | ✅ FSDP2 |
| **Tensor Parallel** | ❌ | ❌ | ✅ | ✅ |
| **Pipeline Parallel** | ❌ | ✅ | ✅ | ✅ |
| **Memory Efficiency** | Medium | High (ZeRO) | High | Very High |
| **Ease of Use** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
| **Model Size** | Small-Medium | Medium-Large | Very Large | Very Large |
| **K8s Support** | ✅ | ✅ | ✅ | ✅ |
| **SLURM Support** | ✅ | ✅ | ✅ | ✅ |
| Feature | torchrun | DeepSpeed | Megatron-LM | TorchTitan | Primus |
|---------|----------|-----------|-------------|------------|--------|
| **Data Parallel** | ✅ DDP | ✅ ZeRO | ✅ | ✅ FSDP2 | via config |
| **Tensor Parallel** | ❌ | ❌ | ✅ | ✅ | via config |
| **Pipeline Parallel** | ❌ | ✅ | ✅ | ✅ | via config |
| **Memory Efficiency** | Medium | High (ZeRO) | High | Very High | config-driven |
| **Ease of Use** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| **Model Size** | Small-Medium | Medium-Large | Very Large | Very Large | config-driven |
| **K8s Support** | ✅ | ✅ | ✅ | ✅ | ✅ |
| **SLURM Support** | ✅ | ✅ | ✅ | ✅ | ✅ |

### Inference Launchers

Expand Down Expand Up @@ -646,6 +684,13 @@ TORCHTITAN_FSDP_ENABLED=1
MAD_MULTI_NODE_RUNNER="torchrun ..."
```

**Primus**:
```bash
PRIMUS_CONFIG_PATH="examples/megatron/configs/MI300X/..."
PRIMUS_CLI_EXTRA="" # optional
# No MAD_MULTI_NODE_RUNNER (model script calls Primus run_pretrain.sh)
```

**vLLM**:
```bash
VLLM_TENSOR_PARALLEL_SIZE=4
Expand Down Expand Up @@ -681,7 +726,7 @@ SGLANG_NODE_RANK=${SLURM_PROCID}
```bash
Error: Unknown launcher type 'xyz'
```
Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `vllm`, `sglang`, `sglang-disagg`
Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `primus`, `vllm`, `sglang`, `sglang-disagg`

**2. Multi-Node Communication Fails**
```bash
Expand Down
47 changes: 46 additions & 1 deletion src/madengine/deployment/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"torchtitan",
"deepspeed",
"megatron-lm",
"primus",
"vllm",
"sglang",
"sglang-disagg"
Expand Down Expand Up @@ -873,6 +874,14 @@ def _prepare_template_context(

self.console.print(f"[cyan]Configuring Megatron-LM: {nnodes} nodes × {nproc_per_node} GPUs/node[/cyan]")

elif launcher_type == "primus":
if not isinstance(nnodes, int) or nnodes < 1:
raise ValueError(f"Invalid nnodes: {nnodes}. Must be positive integer >= 1")
if not isinstance(nproc_per_node, int) or nproc_per_node < 1:
raise ValueError(f"Invalid nproc_per_node: {nproc_per_node}. Must be positive integer >= 1")

self.console.print(f"[cyan]Configuring Primus: {nnodes} nodes × {nproc_per_node} GPUs/node[/cyan]")

# Determine if we need multi-node setup
create_headless_service = False
launcher_command = None
Expand Down Expand Up @@ -990,6 +999,19 @@ def _prepare_template_context(
model_script=model_info.get("scripts", "run.sh")
)

elif launcher_type == "primus":
if nnodes > 1:
create_headless_service = True
self.console.print(f"[dim]Multi-node Primus: Creating headless service for pod discovery[/dim]")

# Generate Primus launcher command (env-only: PRIMUS_CONFIG_PATH, PRIMUS_CLI_EXTRA)
launcher_command = self._generate_primus_command(
nnodes=nnodes,
nproc_per_node=nproc_per_node,
master_port=master_port,
model_script=model_info.get("scripts", "run.sh")
)

# Prepare pre/post scripts (similar to local execution)
pre_scripts = []
post_scripts = []
Expand Down Expand Up @@ -2104,7 +2126,30 @@ def _generate_megatron_command(
--master_addr=${{MASTER_ADDR}} \\
--master_port={master_port} \\
{model_script}"""


def _generate_primus_command(
self, nnodes: int, nproc_per_node: int, master_port: int, model_script: str
) -> str:
"""
Generate Primus launcher command for K8s Indexed Jobs.

Primus (Megatron-LM, TorchTitan, Jax/MaxText) runs via model script that calls
run_pretrain.sh. We only export PRIMUS_CONFIG_PATH and optional PRIMUS_CLI_EXTRA.
NNODES, NODE_RANK, MASTER_ADDR, etc. are set by the job template.
"""
primus_cfg = self.config.additional_context.get("distributed", {}).get("primus", {})
config_path = primus_cfg.get("config_path", "exp_pretrain.yaml")
cli_extra = primus_cfg.get("cli_extra", "")
config_path_quoted = config_path.replace('"', '\\"')
lines = [
"# Primus launcher (model script runs run_pretrain.sh)",
f'export PRIMUS_CONFIG_PATH="{config_path_quoted}"',
]
if (cli_extra or "").strip():
cli_extra_quoted = cli_extra.replace('"', '\\"')
lines.append(f'export PRIMUS_CLI_EXTRA="{cli_extra_quoted}"')
return "\n".join(lines)

def _load_k8s_tools(self) -> Dict:
"""
Load K8s-specific tools configuration.
Expand Down
25 changes: 25 additions & 0 deletions src/madengine/deployment/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"torchtitan",
"deepspeed",
"megatron-lm",
"primus",
"vllm",
"sglang",
"sglang-disagg"
Expand Down Expand Up @@ -531,6 +532,8 @@ def _generate_launcher_command(
return self._generate_megatron_command(nnodes, nproc_per_node, master_port)
elif launcher_type == "torchtitan":
return self._generate_torchtitan_command(nnodes, nproc_per_node, master_port)
elif launcher_type == "primus":
return self._generate_primus_command(nnodes, nproc_per_node, master_port)
else:
# For unknown launchers, provide basic environment variables
# and let the model script handle launcher invocation
Expand Down Expand Up @@ -819,6 +822,28 @@ def _generate_torchtitan_command(
# Use torchrun as launcher (TorchTitan built on top of it)
export MAD_MULTI_NODE_RUNNER="torchrun --nnodes={nnodes} --nproc_per_node={nproc_per_node} --node_rank=${{NODE_RANK}} --master_addr=${{MASTER_ADDR}} --master_port={master_port}"'''

def _generate_primus_command(
self, nnodes: int, nproc_per_node: int, master_port: int
) -> str:
"""
Generate Primus launcher environment for SLURM.

Primus (Megatron-LM, TorchTitan, Jax/MaxText) runs via model script that calls
run_pretrain.sh; NNODES, NODE_RANK, MASTER_ADDR, etc. are set by the job script.
We only export PRIMUS_CONFIG_PATH and optional PRIMUS_CLI_EXTRA. No MAD_MULTI_NODE_RUNNER.
"""
primus_cfg = self.distributed_config.get("primus", {})
config_path = primus_cfg.get("config_path", "exp_pretrain.yaml")
cli_extra = primus_cfg.get("cli_extra", "")
# Safe shell quoting for config_path and cli_extra
config_path_quoted = config_path.replace('"', '\\"')
lines = [f'# Primus launcher (model script runs run_pretrain.sh)',
f'export PRIMUS_CONFIG_PATH="{config_path_quoted}"']
if (cli_extra or "").strip():
cli_extra_quoted = cli_extra.replace('"', '\\"')
lines.append(f'export PRIMUS_CLI_EXTRA="{cli_extra_quoted}"')
return "\n".join(lines)

def _generate_basic_env_command(
self, nnodes: int, nproc_per_node: int, master_port: int
) -> str:
Expand Down
2 changes: 2 additions & 0 deletions src/madengine/deployment/templates/slurm/job.sh.j2
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ echo " RANK (node rank): ${RANK}"
echo " NODE_RANK: ${NODE_RANK}"
echo " NNODES: ${NNODES}"
echo " NPROC_PER_NODE: ${GPUS_PER_NODE}"
{% if launcher_type in ['torchrun', 'deepspeed', 'megatron'] %}
echo " MAD_MULTI_NODE_RUNNER: ${MAD_MULTI_NODE_RUNNER}"
{% endif %}
if [ "${SLURM_PROCID}" = "0" ]; then
echo " MAD_IS_MASTER_NODE: true (will collect performance metrics)"
else
Expand Down
84 changes: 70 additions & 14 deletions src/madengine/execution/container_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def create_run_details_dict(
"docker_file": build_info.get("dockerfile", ""),
"base_docker": build_info.get("base_docker", ""),
"docker_sha": build_info.get("docker_sha", ""),
"docker_image": build_info.get("docker_image", ""),
"docker_image": run_results.get("docker_image", build_info.get("docker_image", "")),
"git_commit": run_results.get("git_commit", ""),
"machine_name": run_results.get("machine_name", ""),
"deployment_type": os.environ.get("MAD_DEPLOYMENT_TYPE", "local"), # local, slurm, etc.
Expand Down Expand Up @@ -624,6 +624,31 @@ def gather_system_env_details(
pre_encapsulate_post_scripts["pre_scripts"].append(pre_env_details)
print(f"pre encap post scripts: {pre_encapsulate_post_scripts}")

def _resolve_docker_image(self, docker_image: str, model_name: str) -> str:
"""Resolve Docker image: use requested image if present, else primus_pretrain fallback with clear error."""
try:
self.console.sh(f"docker image inspect {docker_image} >/dev/null 2>&1")
return docker_image
except (subprocess.CalledProcessError, RuntimeError, Exception):
pass
if model_name.startswith("primus_pretrain/"):
fallback = "ci-primus_pretrain_primus.ubuntu.amd"
try:
self.console.sh(f"docker image inspect {fallback} >/dev/null 2>&1")
print(
f"ℹ️ Using shared Primus image (one build for all primus_pretrain configs): {fallback}"
)
return fallback
except (subprocess.CalledProcessError, RuntimeError, Exception):
raise RuntimeError(
f"Docker image '{docker_image}' not found and fallback '{fallback}' not found. "
"Build the Primus image first: madengine build --tags primus_pretrain --additional-context-file <config>.json"
) from None
raise RuntimeError(
f"Docker image '{docker_image}' not found. "
"Build it first: madengine build --tags <model_tag> --additional-context-file <config>.json"
) from None

def run_container(
self,
model_info: typing.Dict,
Expand Down Expand Up @@ -654,6 +679,9 @@ def run_container(
"""
self.rich_console.print(f"[bold green]🏃 Running model:[/bold green] [bold cyan]{model_info['name']}[/bold cyan] [dim]in container[/dim] [yellow]{docker_image}[/yellow]")

# Resolve image: if model-specific image is missing, try shared primus_pretrain image (one build for all configs)
docker_image = self._resolve_docker_image(docker_image, model_info["name"])

# Apply timeout logic: model timeout can override default timeout
# If model has a timeout in models.json and CLI timeout is default (7200), use model's timeout
# If CLI timeout is explicitly set (not default), it overrides model timeout
Expand Down Expand Up @@ -703,6 +731,8 @@ def run_container(
# If build info provided, merge it
if build_info:
run_results.update(build_info)
# Preserve actual image used (resolved, possibly fallback) for perf reporting
run_results["docker_image"] = docker_image

# Prepare docker run options
gpu_vendor = self.context.ctx["gpu_vendor"]
Expand Down Expand Up @@ -782,6 +812,8 @@ def run_container(
'NNODES', 'NPROC_PER_NODE', 'MAD_MULTI_NODE_RUNNER',
'MAD_COLLECT_METRICS', 'NCCL_SOCKET_IFNAME', 'GLOO_SOCKET_IFNAME',
'NCCL_DEBUG', 'NCCL_IB_DISABLE', 'NCCL_NET_GDR_LEVEL',
# Primus launcher (config path and optional CLI extra args)
'PRIMUS_CONFIG_PATH', 'PRIMUS_CLI_EXTRA',
# GPU visibility variables for Ray-based launchers (vLLM, SGLang)
# CRITICAL: These must be passed to Docker for proper GPU device mapping
'HIP_VISIBLE_DEVICES', 'ROCR_VISIBLE_DEVICES', 'CUDA_VISIBLE_DEVICES'
Expand Down Expand Up @@ -1085,40 +1117,64 @@ def run_container(
pre_encapsulate_post_scripts["post_scripts"],
)

# When model writes performance to a file in run_directory, copy to cwd
# so the host can read it (e.g. bind-mounted workspace) before extraction.
multiple_results_file = (model_info.get("multiple_results") or "").strip()
if multiple_results_file:
try:
model_docker.sh(
f"cp {model_dir}/{multiple_results_file} . 2>/dev/null || true"
)
except Exception:
pass

# Extract performance metrics from logs
# Look for performance data in the log output similar to original run_models.py
try:
# Check if multiple results file is specified in model_info
multiple_results = model_info.get("multiple_results", None)
if multiple_results:
multiple_results = multiple_results.strip()

if multiple_results:
run_results["performance"] = multiple_results
# Validate multiple results file format using proper CSV parsing
# Validate multiple results file (format: model, performance, metric)
# and set primary performance from tokens_per_second row, else first valid row
try:
import csv
with open(multiple_results, "r") as f:
csv_reader = csv.DictReader(f)

# Check if 'performance' column exists
if 'performance' not in csv_reader.fieldnames:
if csv_reader.fieldnames and 'performance' not in csv_reader.fieldnames:
print("Error: 'performance' column not found in multiple results file.")
run_results["performance"] = None
run_results["metric"] = None
else:
# Check if at least one row has a non-empty performance value
has_valid_perf = False
run_results["performance"] = None
run_results["metric"] = None
first_valid = None
for row in csv_reader:
if row.get('performance', '').strip():
has_valid_perf = True
break

if not has_valid_perf:
run_results["performance"] = None
perf_val = (row.get('performance') or '').strip()
metric_val = (row.get('metric') or '').strip()
if perf_val:
if first_valid is None:
first_valid = (perf_val, metric_val or "tokens_per_second")
if metric_val == "tokens_per_second":
run_results["performance"] = perf_val
run_results["metric"] = "tokens_per_second"
break
if run_results.get("performance") is None and first_valid:
run_results["performance"], run_results["metric"] = first_valid
if run_results.get("performance"):
print(
f"✓ Extracted performance (CSV): {run_results['performance']} {run_results['metric']}"
)
elif not first_valid:
print("Error: Performance metric is empty in all rows of multiple results file.")
except Exception as e:
self.rich_console.print(
f"[yellow]Warning: Could not validate multiple results file: {e}[/yellow]"
)
run_results["performance"] = None
run_results["metric"] = None
else:
# Match the actual output format: "performance: 14164 samples_per_second"
# Simple pattern to capture number and metric unit
Expand Down
Loading