diff --git a/aframe/base.py b/aframe/base.py index 6f45ceb95..f1c4494d3 100644 --- a/aframe/base.py +++ b/aframe/base.py @@ -111,7 +111,7 @@ def sandbox_env(self, _): # map in local tmpdir (should be /local/albert.einstein) # which has is enough memory to write large temp # files with luigi/law - env["TMPDIR"] = f"/local/{os.getenv('USER')}" + env["TMPDIR"] = os.getenv("AFRAME_TMPDIR") # if gpus are specified, expose them inside container # via CUDA_VISIBLE_DEVICES env variable diff --git a/aframe/pipelines/sandbox/sandbox.cfg b/aframe/pipelines/sandbox/sandbox.cfg index e5fe2c086..c8919d7b6 100644 --- a/aframe/pipelines/sandbox/sandbox.cfg +++ b/aframe/pipelines/sandbox/sandbox.cfg @@ -63,10 +63,11 @@ reference_frequency = &::luigi_base::reference_frequency waveform_approximant = &::luigi_base::waveform_approximant prior = &::luigi_base::prior + [luigi_ValidationWaveforms] workflow = htcondor -num_jobs = 20 -num_signals = 1000 +num_jobs = 200 +num_signals = 20000 ifos = &::luigi_base::ifos highpass = &::luigi_base::highpass sample_rate = &::luigi_base::sample_rate diff --git a/aframe/pipelines/seed_variance/__init__.py b/aframe/pipelines/seed_variance/__init__.py new file mode 100644 index 000000000..ee4a77956 --- /dev/null +++ b/aframe/pipelines/seed_variance/__init__.py @@ -0,0 +1 @@ +from .seed import SeedVariability diff --git a/aframe/pipelines/seed_variance/seed.cfg b/aframe/pipelines/seed_variance/seed.cfg new file mode 100644 index 000000000..8137f357c --- /dev/null +++ b/aframe/pipelines/seed_variance/seed.cfg @@ -0,0 +1,167 @@ +# luigi level config +[luigi_core] +local_scheduler = true +module = aframe +log_level = INFO + +# configuration for pipeline parameters + +[luigi_base] +ifos = ["H1", "L1"] + +# data generation parameters +train_start = 1240579783 +train_stop = 1241443783 +test_stop = 1244035783 +max_duration = 20000 +Tb = 3153600 +flag = DATA +channels = ["H1", "L1"] +shifts = [0, 1] +seed = 1122 + +streams_per_gpu = 2 + +# waveform parameters +waveform_approximant = IMRPhenomPv2 +waveform_duration = 8 +minimum_frequency = 20 +reference_frequency = 50 + +# training parameters +kernel_length = 1.5 +batch_size = 512 +prior = priors.priors.end_o3_ratesandpops + +# data conditioning / preprocessing parameters +sample_rate = 2048 +fduration = 1 +highpass = 32 + +# inference / export parameters +inference_psd_length = 64 +inference_sampling_rate = 4 +inference_batch_size = 128 + +[luigi_FetchTrain] +workflow = htcondor +start = &::luigi_base::train_start +end = &::luigi_base::train_stop +sample_rate = &::luigi_base::sample_rate +min_duration = 1024 +max_duration = &::luigi_base::max_duration +flag = &::luigi_base::flag +ifos = &::luigi_base::ifos +channels = &::luigi_base::channels +request_memory = 32678 + +[luigi_TrainingWaveforms] +num_signals = 100000 +sample_rate = &::luigi_base::sample_rate +waveform_duration = &::luigi_base::waveform_duration +minimum_frequency = &::luigi_base::minimum_frequency +reference_frequency = &::luigi_base::reference_frequency +waveform_approximant = &::luigi_base::waveform_approximant +prior = &::luigi_base::prior + +[luigi_FetchTest] +workflow = htcondor +start = &::luigi_base::train_stop +end = &::luigi_base::test_stop +sample_rate = &::luigi_base::sample_rate +min_duration = 128 +max_duration = &::luigi_base::max_duration +flag = &::luigi_base::flag +channels = &::luigi_base::channels +request_memory = 32678 + + +[luigi_TimeslideWaveforms] +workflow = htcondor +seed = 112296 +Tb = &::luigi_base::Tb +shifts = &::luigi_base::shifts +spacing = 16 +buffer = 16 +snr_threshold = 4 +prior = &::luigi_base::prior +start = &::luigi_base::train_stop +end = &::luigi_base::test_stop +ifos = &::luigi_base::ifos +psd_length = &::luigi_base::inference_psd_length +sample_rate = &::luigi_base::sample_rate +minimum_frequency = &::luigi_base::minimum_frequency +reference_frequency = &::luigi_base::reference_frequency +waveform_duration = &::luigi_base::waveform_duration +waveform_approximant = &::luigi_base::waveform_approximant +highpass = &::luigi_base::highpass + +[luigi_ValidationWaveforms] +workflow = htcondor +num_jobs = 200 +num_signals = 20000 +ifos = &::luigi_base::ifos +highpass = &::luigi_base::highpass +sample_rate = &::luigi_base::sample_rate +waveform_duration = &::luigi_base::waveform_duration +minimum_frequency = &::luigi_base::minimum_frequency +reference_frequency = &::luigi_base::reference_frequency +waveform_approximant = &::luigi_base::waveform_approximant +prior = &::luigi_base::prior +snr_threshold = 4 + +[luigi_SeedVarExport] +fduration = &::luigi_base::fduration +kernel_length = &::luigi_base::kernel_length +inference_sampling_rate = &::luigi_base::inference_sampling_rate +sample_rate = &::luigi_base::sample_rate + +# TODO: resolve enum platform parsing error +# platform = luigi.Parameter(default="TENSORRT") +ifos = &::luigi_base::ifos +batch_size = &::luigi_base::inference_batch_size +psd_length = &::luigi_base::inference_psd_length +highpass = &::luigi_base::highpass + +streams_per_gpu = &::luigi_base::streams_per_gpu +aframe_instances = 1 +preproc_instances = 1 +clean = true + +[luigi_Train] +config = /home/ethan.marx/projects/aframev2/projects/train/config.yaml +train_remote = false +ifos = &::luigi_base::ifos +kernel_length = &::luigi_base::kernel_length +highpass = &::luigi_base::highpass +fduration = &::luigi_base::fduration +seed = &::luigi_base::seed +use_wandb = true +request_gpus = 2 + +[luigi_SeedVarInfer] +fduration = &::luigi_base::fduration +batch_size = &::luigi_base::inference_batch_size +psd_length = &::luigi_base::inference_psd_length +ifos = &::luigi_base::ifos +inference_sampling_rate = &::luigi_base::inference_sampling_rate +cluster_window_length = 8 +integration_window_length = 1 +Tb = &::luigi_base::Tb +shifts = &::luigi_base::shifts +streams_per_gpu = &::luigi_base::streams_per_gpu +rate_per_gpu = 70 + +# triton args +model_name = aframe-stream +model_version = -1 +triton_image = hermes/tritonserver:22.12 +sequence_id = 1001 + + +[logging] +law: INFO +law.sandbox.base: DEBUG +law.patches: INFO +luigi-interface: INFO +law.workflow.base: DEBUG diff --git a/aframe/pipelines/seed_variance/seed.py b/aframe/pipelines/seed_variance/seed.py new file mode 100644 index 000000000..af97beb8c --- /dev/null +++ b/aframe/pipelines/seed_variance/seed.py @@ -0,0 +1,85 @@ +""" +Train multiple models with different seeds +for comparing performance variability +""" +import os + +import luigi +import numpy as np + +from aframe.base import AframeWrapperTask +from aframe.pipelines.config import paths +from aframe.tasks import ExportLocal, TimeslideWaveforms, Train +from aframe.tasks.infer import InferLocal +from aframe.tasks.plots.sv import SensitiveVolume + + +class SeedVarExport(ExportLocal): + train_seed = luigi.IntParameter() + + def requires(self): + return Train.req( + self, + data_dir=paths().train_datadir, + run_dir=os.path.join(paths().train_rundir, str(self.train_seed)), + seed=self.train_seed, + train_remote=True, + ) + + +class SeedVarInfer(InferLocal): + train_seed = luigi.IntParameter() + + def requires(self): + reqs = {} + reqs["model_repository"] = SeedVarExport.req( + self, + repository_directory=os.path.join( + paths().results_dir, "model_repo" + ), + train_seed=self.train_seed, + ) + ts_waveforms = TimeslideWaveforms.req( + self, + output_dir=paths().test_datadir, + ) + fetch = ts_waveforms.requires().workflow_requires()["test_segments"] + + reqs["data"] = fetch + reqs["waveforms"] = ts_waveforms + return reqs + + +class SeedVarSV(SensitiveVolume): + train_seed = luigi.IntParameter() + + def requires(self): + reqs = {} + reqs["ts"] = TimeslideWaveforms.req( + self, output_dir=paths().test_datadir + ) + + reqs["infer"] = SeedVarInfer.req( + self, + output_dir=os.path.join(paths().results_dir, "infer"), + train_seed=self.train_seed, + ) + return reqs + + +class SeedVariability(AframeWrapperTask): + num_seeds = luigi.IntParameter( + default=2, + description="Number of training jobs with unique seeds to launch", + ) + + def requires(self): + seeds = np.random.randint(0, 1e5, size=self.num_seeds) + for seed in seeds: + yield SeedVarSV.req( + self, + train_seed=seed, + output_dir=os.path.join( + paths().results_dir, str(seed), "plots" + ), + ) diff --git a/aframe/tasks/data/condor/base.py b/aframe/tasks/data/condor/base.py index 19de5a49a..a35035e5f 100644 --- a/aframe/tasks/data/condor/base.py +++ b/aframe/tasks/data/condor/base.py @@ -15,10 +15,14 @@ class LDGCondorWorkflow(htcondor.HTCondorWorkflow): condor_directory = luigi.Parameter() accounting_group_user = luigi.Parameter(default=os.getenv("LIGO_USERNAME")) accounting_group = luigi.Parameter(default=os.getenv("LIGO_GROUP")) - request_disk = luigi.Parameter(default="1024") - request_memory = luigi.Parameter(default="32678") + request_disk = luigi.Parameter(default="1024K") + request_memory = luigi.Parameter(default="4096M") request_cpus = luigi.IntParameter(default=1) + # don't pass computing requirements between tasks + # since different tasks will often have different requirements + exclude_params_req = {"request_memory", "request_disk", "request_cpus"} + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.htcondor_log_dir.touch() @@ -50,7 +54,7 @@ def htcondor_log_dir(self): @property def job_file_dir(self): - return self.htcondor_output_directory().child("jobs", type="d").path + return os.path.join(self.condor_directory, "jobs") @property def law_config(self): @@ -73,6 +77,7 @@ def build_environment(self): environment += f'PATH={os.getenv("PATH")} ' environment += f"LAW_CONFIG_FILE={self.law_config} " environment += f"USER={os.getenv('USER')} " + environment += f"TMPDIR={os.getenv('TMPDIR')} " # forward any env variables that start with AFRAME_ # that the law config may need to parse diff --git a/aframe/tasks/data/timeslide_waveforms/timeslide_waveforms.py b/aframe/tasks/data/timeslide_waveforms/timeslide_waveforms.py index 1c3985779..2da0f3c34 100644 --- a/aframe/tasks/data/timeslide_waveforms/timeslide_waveforms.py +++ b/aframe/tasks/data/timeslide_waveforms/timeslide_waveforms.py @@ -179,7 +179,9 @@ def output(self): ] def requires(self): - return DeployTimeslideWaveforms.req(self) + return DeployTimeslideWaveforms.req( + self, condor_directory=self.condor_directory + ) @property def targets(self): diff --git a/aframe/tasks/data/waveforms.py b/aframe/tasks/data/waveforms.py index 61b259f20..f337663b4 100644 --- a/aframe/tasks/data/waveforms.py +++ b/aframe/tasks/data/waveforms.py @@ -98,9 +98,12 @@ class DeployValidationWaveforms( Generate waveforms for validation via rejection sampling """ - output_dir = luigi.Parameter( - description="Directory where validation waveforms will be saved" + tmp_dir = luigi.Parameter( + description="Directory where temporary validation " + "waveforms will be saved before being merged", + default=os.getenv("AFRAME_TMPDIR", "/tmp/aframe/"), ) + output_dir = luigi.Parameter() ifos = luigi.ListParameter( description="Interferometers for which waveforms will be generated" ) @@ -126,12 +129,12 @@ def workflow_requires(self): return reqs @property - def tmp_dir(self): - return os.path.join(self.output_dir, f"tmp-{self.branch}") + def branch_tmp_dir(self): + return os.path.join(self.tmp_dir, f"tmp-{self.branch}") def output(self): return law.LocalFileTarget( - os.path.join(self.tmp_dir, "val_waveforms.hdf5") + os.path.join(self.branch_tmp_dir, "val_waveforms.hdf5") ) def create_branch_map(self): @@ -203,7 +206,7 @@ class ValidationWaveforms(AframeDataTask): """ output_dir = luigi.Parameter( - description="Directory where validation waveforms will be saved" + description="Directory where merged validation waveforms will be saved" ) condor_directory = luigi.Parameter( default=os.path.join( @@ -217,10 +220,12 @@ def __init__(self, *args, **kwargs): self.output_file = os.path.join(self.output_dir, "val_waveforms.hdf5") def output(self): - return (s3_or_local(self.output_file),) + return s3_or_local(self.output_file) def requires(self): - return DeployValidationWaveforms.req(self) + return DeployValidationWaveforms.req( + self, + ) @property def targets(self): @@ -233,9 +238,8 @@ def waveform_files(self): def run(self): from ledger.injections import LigoWaveformSet - LigoWaveformSet.aggregate( - self.waveform_files, self.output_file, clean=True - ) + with self.output().open("w") as f: + LigoWaveformSet.aggregate(self.waveform_files, f, clean=True) # clean up temporary directories for dirname in glob.glob(os.path.join(self.output_dir, "tmp-*")):