Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ The model is trained from a random initialization until convergence, which is de
1. Once fractal generation completes, run the benchmark:
`torchrun-hpc -N 1 -n 4 --gpus-per-proc 1 $(which scaffold) benchmark -c ScaFFold/configs/benchmark_default.yml`

ScaFFold benchmark training always uses PyTorch distributed execution with DistConv spatial parallelism. For a singleton run, launch one distributed rank rather than disabling distributed execution.

`benchmark` creates a folder for the benchmark run(s) at `base_run_dir` set in the config file. For reproducibility, we store a copy of the benchmark run config yml. Within each run subfolder, `benchmark` creates a yml config for that specific run.

After each run completes, statistics from the run are stored in `train_stats.csv`. Additionally, users can inspect plots of the training and validation losses over time in `<base_run_dir/figures`.
Expand All @@ -69,14 +71,19 @@ Parameters are set in a `.yml` config file and can be modified by the user:
```yml
# External/user-facing
base_run_dir: "benchmark_runs" # Subfolder of $(pwd) in which to run jobs.
dataset_dir: "datasets" # Directory in which to store and query generated datasets.
fract_base_dir: "fractals" # Base directory for fractal IFS and instances.
n_categories: 5 # Number of fractal categories present in the dataset.
n_categories: 5 # Positive number of fractal categories present in the dataset.
n_instances_used_per_fractal: 145 # Number of unique instances to pull from each fractal class. There are 145 unique; exceeding this number will reuse some instances.
problem_scale: 6 # Determines dataset resolution and number of unet layers. Default is 6.
problem_scale: 7 # Determines dataset resolution and number of unet layers.
unet_bottleneck_dim: 3 # Power of 2 of the unet bottleneck layer dimension. Default of 3 -> bottleneck layer of size 8.
seed: 42 # Random seed.
batch_size: 1 # Batch sizes for each vol size.
optimizer: "ADAM" # "ADAM" is preferred option, otherwise training defautls to RMSProp.
batch_size: 1 # Batch size per data-parallel rank.
dataloader_num_workers: 1 # Number of DataLoader worker processes per rank.
optimizer: "ADAM" # "ADAM" is preferred option, otherwise training defaults to RMSProp.
dc_num_shards: [1, 1, 1] # DistConv spatial shard counts.
dc_shard_dims: [2, 3, 4] # Tensor dimensions sharded by DistConv.
checkpoint_interval: -1 # Checkpoint every C epochs; set to -1 to disable checkpointing entirely.

# Internal/dev use only
variance_threshold: 0.15 # Variance threshold for valid fractals. Default is 0.15.
Expand All @@ -91,12 +98,12 @@ disable_scheduler: 1 # If 1, disable scheduler during training to
more_determinism: 0 # If 1, improve model training determinism.
datagen_from_scratch: 0 # If 1, delete existing fractals and instances, then regenerate from scratch.
train_from_scratch: 1 # If 1, delete existing train stats and checkpoint files. Keep 0 if want to restart runs where we left off.
dist: 1 # If 1, use torch DDP.
torch_amp: 1 # If 1, use mixed precision in training.
framework: "torch" # The DL framework to train with. Only valid option for now is "torch".
checkpoint_dir: "checkpoints" # Subfolder in which to save training checkpoints.
checkpoint_interval: 1 # Number of epochs between saving training checkpoints.
loss_freq: 1 # Number of epochs between logging the overall loss.
warmup_batches: 64 # How many warmup batches per rank to run before training.
target_dice: 0.95 # Validation Dice score threshold for convergence when epochs is -1.
```

## How the benchmark works
Expand Down
41 changes: 5 additions & 36 deletions ScaFFold/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,26 @@
#
# SPDX-License-Identifier: (Apache-2.0)

import logging
import shutil
from argparse import Namespace
from pathlib import Path, PosixPath

import yaml
from mpi4py import MPI

from ScaFFold import worker
from ScaFFold.utils.distributed import get_world_rank
from ScaFFold.utils.perf_measure import adiak_init, adiak_value


def create_run_directory(base_dir, combination_index, num_runs):
"""
Create new directory for current run, named using unique combination_index
"""
run_dir = base_dir / f"param_set_{combination_index}"
for i in range(num_runs):
run_dir_with_iter = Path(f"{run_dir}/run{i}")
run_dir_with_iter.mkdir(parents=True, exist_ok=True)
return run_dir


def write_run_config(run_dir, iter, keys, combination):
"""
Write run config to a yaml file, and create optional override yaml
"""
run_config = {key: value for key, value in zip(keys, combination)}
run_config["run_dir"] = str(
run_dir.resolve()
) # Add abs path to run dir as entry in dict
run_config["run_iter"] = iter # Add run_iter identifier as entry in dict
run_config_path = run_dir / "run_config.yaml"
with open(run_config_path, "w") as file:
yaml.dump(run_config, file)
return run_config_path
from ScaFFold.utils.utils import setup_mpi_logger


def main(kwargs_dict: dict = {}):
args = Namespace(**kwargs_dict)

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
log = setup_mpi_logger(__file__, args.verbose)

# Get MPI information
comm = MPI.COMM_WORLD
rank = get_world_rank(required=args.dist)
if rank == 0:
print(f"args found: {args}")
rank = get_world_rank(required=True)
log.debug("args found: %s", args)

kdict = None
# Now set up and start benchmark run(s)
Expand All @@ -88,7 +57,7 @@ def main(kwargs_dict: dict = {}):
adiak_init(comm)
for key, value in kdict.items():
if isinstance(value, dict):
print(f"Adiak: skipping key with dict value '{key}'")
log.debug("Adiak: skipping key with dict value '%s'", key)
continue
if isinstance(value, PosixPath):
value = str(value)
Expand Down
48 changes: 34 additions & 14 deletions ScaFFold/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import yaml
from mpi4py import MPI

from ScaFFold import benchmark, generate_fractals
from ScaFFold.utils import config_utils
from ScaFFold.utils.collect_scheduler_info import collect_scheduler_metadata
from ScaFFold.utils.create_restart_script import create_restart_script
from ScaFFold.utils.utils import customlog
from ScaFFold.utils.utils import setup_mpi_logger


def main():
Expand Down Expand Up @@ -55,7 +54,7 @@ def main():
generate_fractals_parser = subparsers.add_parser(
"generate_fractals",
help="Generate fractal classes and instances.",
description="Must be ran before 'benchmark'",
description="Must be run before 'benchmark'",
)
generate_fractals_parser.add_argument(
"-c",
Expand Down Expand Up @@ -96,9 +95,9 @@ def main():
"benchmark",
help="Run the benchmark.",
description=(
"The default run method for ScaFFold."
"Users may specify lists of run parameters in the config file."
"This subcommand runs one instance of the benchmark for each parameter combination."
"The default run method for ScaFFold. "
"Users may specify lists of run parameters in the config file. "
"This subcommand runs one instance of the benchmark for each parameter combination. "
"Requires path to config file."
),
)
Expand Down Expand Up @@ -143,7 +142,7 @@ def main():
)
benchmark_parser.add_argument("--seed", type=int, help="Random seed.")
benchmark_parser.add_argument(
"--batch-size", type=int, help="Batch sizes for each volume size."
"--batch-size", type=int, help="Batch size per data-parallel rank."
)
benchmark_parser.add_argument(
"--warmup-batches",
Expand Down Expand Up @@ -177,6 +176,12 @@ def main():
nargs=3,
help="DistConv param: number of shards to divide the tensor into. It's best to choose the fewest ranks needed to fit one sample in GPU memory, since that keeps communication at a minimum",
)
benchmark_parser.add_argument(
"--dc-shard-dims",
type=int,
nargs=3,
help="DistConv param: tensor dimensions to shard.",
)
benchmark_parser.add_argument(
"--epochs",
type=int,
Expand Down Expand Up @@ -209,10 +214,11 @@ def main():
rank = comm.Get_rank()
# Parse the command-line arguments.
args = parser.parse_args()
log = setup_mpi_logger(__file__, args.verbose)
combined_config = None

if rank == 0:
print(f"args = {args}")
log.debug("args = %s", args)

bench_config = config_utils.load_config(Path(args.config), "sweep")
bench_config_dict = (
Expand All @@ -226,13 +232,22 @@ def main():
if key not in combined_config:
combined_config[key] = value
elif value is not None and key != "command":
print(f"Overriding '{key}={combined_config[key]}' with '{key}={value}'")
log.info(
"Overriding '%s=%s' with '%s=%s'",
key,
combined_config[key],
key,
value,
)
combined_config[key] = value

# Recalculate unet_layers to capture any CLI overrides
combined_config["unet_layers"] = (
combined_config["problem_scale"] - combined_config["unet_bottleneck_dim"]
)
config_utils.require_positive_int(
"n_categories", combined_config["n_categories"]
)

# Resolve paths to absolute, matching Config() behavior
if "base_run_dir" in combined_config and combined_config["base_run_dir"]:
Expand All @@ -256,13 +271,13 @@ def main():

# Handle Restart / Resume logic
if hasattr(args, "restart") and args.restart:
print("Restart flag detected: Forcing train_from_scratch = False")
log.info("Restart flag detected: forcing train_from_scratch = False")
combined_config["train_from_scratch"] = False
combined_config["restart"] = True

# If user manually supplied --run-dir (via restart script), use it.
if hasattr(args, "run_dir") and args.run_dir is not None:
print(f"Resuming in existing directory: {args.run_dir}")
log.info("Resuming in existing directory: %s", args.run_dir)
benchmark_run_dir = Path(args.run_dir)
# Ensure we don't accidentally wipe checkpoints even if --restart wasn't explicitly passed
combined_config["train_from_scratch"] = False
Expand All @@ -272,8 +287,9 @@ def main():
f"{combined_config.get('job_name')}_%Y%m%d-%H%M%S"
)
benchmark_run_dir = base_run_dir / timestamp
customlog(
f"benchmark_run_dir created at path {Path.resolve(benchmark_run_dir)}"
log.info(
"benchmark_run_dir created at path %s",
Path.resolve(benchmark_run_dir),
)

combined_config["benchmark_run_dir"] = str(benchmark_run_dir)
Expand All @@ -298,11 +314,15 @@ def main():
comm.Barrier()
combined_config = comm.bcast(combined_config, root=0)
if rank == 0:
print(f"combined_config = {combined_config}")
log.debug("combined_config = %s", combined_config)

if args.command == "benchmark":
from ScaFFold import benchmark

benchmark.main(kwargs_dict=combined_config)
elif args.command == "generate_fractals":
from ScaFFold import generate_fractals

generate_fractals.main(kwargs_dict=combined_config)
else:
raise ValueError(
Expand Down
5 changes: 2 additions & 3 deletions ScaFFold/configs/benchmark_default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
base_run_dir: "benchmark_runs" # Subfolder of $(pwd) in which to run jobs.
dataset_dir: "datasets" # Directory in which to store and query for datasets.
fract_base_dir: "fractals" # Base directory for fractal IFS and instances.
n_categories: 5 # Number of fractal categories present in the dataset.
n_categories: 5 # Positive number of fractal categories present in the dataset.
n_instances_used_per_fractal: 145 # Number of unique instances to pull from each fractal class. There are 145 unique; exceeding this number will reuse some instances.
problem_scale: 7 # Determines dataset resolution and number of unet layers. Default is 6.
unet_bottleneck_dim: 3 # Power of 2 of the unet bottleneck layer dimension. Default of 3 -> bottleneck layer of size 8.
seed: 42 # Random seed.
batch_size: 1 # Batch sizes for each vol size per rank.
dataloader_num_workers: 1 # Number of DataLoader worker processes per rank. More workers will use more memory
optimizer: "ADAM" # "ADAM" is preferred option, otherwise training defautls to RMSProp.
optimizer: "ADAM" # "ADAM" is preferred option, otherwise training defaults to RMSProp.
dc_num_shards: [1, 1, 1] # DistConv param: number of shards to divide the tensor into. It's best to choose the fewest ranks needed to fit one sample in GPU memory, since that keeps communication at a minimum
dc_shard_dims: [2, 3, 4] # DistConv param: dimension on which to shard
checkpoint_interval: -1 # Checkpoint every C epochs; set to -1 to disable checkpointing entirely.
Expand All @@ -27,7 +27,6 @@ disable_scheduler: 0 # If 1, disable scheduler during training to
more_determinism: 0 # If 1, improve model training determinism.
datagen_from_scratch: 0 # If 1, delete existing fractals and instances, then regenerate from scratch.
train_from_scratch: 1 # If 1, delete existing train stats and checkpoint files. Keep 0 if want to restart runs where we left off.
dist: 1 # If 1, use torch DDP.
torch_amp: 1 # If 1, use mixed precision in training.
framework: "torch" # The DL framework to train with. Only valid option for now is "torch".
checkpoint_dir: "checkpoints" # Subfolder in which to save training checkpoints.
Expand Down
39 changes: 0 additions & 39 deletions ScaFFold/configs/benchmark_testing.yml

This file was deleted.

Loading