From e8e556e676ddaed70e08b4cc7b038bce7fed9d4b Mon Sep 17 00:00:00 2001 From: Adam Plowman Date: Sun, 5 Apr 2026 19:01:12 +0100 Subject: [PATCH 1/4] fix: tweaks to subset simulation to make reproducible via random seed --- matflow/data/scripts/uq/collate_results.py | 16 +- .../data/scripts/uq/generate_next_state.py | 37 +- matflow/data/scripts/uq/sample_direct_MC.py | 13 +- .../template_components/task_schemas.yaml | 4 + .../demo_workflows/test_subset_simulation.py | 33 ++ matflow/tests/subset_simulation.py | 344 ++++++++++++++++++ 6 files changed, 434 insertions(+), 13 deletions(-) create mode 100644 matflow/tests/subset_simulation.py diff --git a/matflow/data/scripts/uq/collate_results.py b/matflow/data/scripts/uq/collate_results.py index 3b98868e..11d76b60 100644 --- a/matflow/data/scripts/uq/collate_results.py +++ b/matflow/data/scripts/uq/collate_results.py @@ -47,8 +47,7 @@ def collate_results(g, x, p_0, all_g, all_x, all_accept, level_cov): if all_g: # from multiple Markov chains: - g_2D = np.array([i[:] for i in all_g]) # (num_chains, num_states) - g_unsrt = np.concatenate(g_2D) + g_unsrt = np.concatenate([i[:] for i in all_g]) accept = np.vstack([i[:] for i in all_accept]) x = np.vstack([i[:] for i in all_x]) accept_rate = np.mean(accept) @@ -87,14 +86,18 @@ def collate_results(g, x, p_0, all_g, all_x, all_accept, level_cov): threshold = (g[num_chains - 1] + g[num_chains]) / 2 + # indicator function: + g_2D = np.reshape(g_unsrt, (num_chains, num_states)) + indicator = (g_2D > np.minimum(threshold, 0)).astype(int) + # failure probability at this level: - fail_bool = g > 0 - level_pf = np.mean(fail_bool) if threshold > 0 else p_0 + level_pf = np.mean(indicator) chain_seeds = x[:num_chains] chain_g = g[:num_chains] + # print(f"{chain_g=!r}") - is_finished = (num_failed / num_samples) >= p_0 + is_finished = (threshold > 0).item() pf = p_0**level_idx * num_failed / num_samples @@ -106,9 +109,6 @@ def collate_results(g, x, p_0, all_g, all_x, all_accept, level_cov): if all_g: # from multiple Markov chains: - indicator = np.reshape( - g_2D > np.minimum(threshold, 0), (num_chains, num_states) - ).astype(int) level_cov = estimate_cov(indicator, level_pf) else: # from initial direct Monte Carlo samples: diff --git a/matflow/data/scripts/uq/generate_next_state.py b/matflow/data/scripts/uq/generate_next_state.py index b3e16edb..4cd76e36 100644 --- a/matflow/data/scripts/uq/generate_next_state.py +++ b/matflow/data/scripts/uq/generate_next_state.py @@ -1,4 +1,5 @@ import logging +import os import numpy as np from numpy.typing import NDArray @@ -17,7 +18,24 @@ def set_up_logger(): return logger -def generate_next_state(x, prop_std): +def _init_rng(chain_index, loop_idx): + """Initialise the random number generator, which must happen once for each Markov + chain.""" + random_seed = int(os.environ["MATFLOW_RUN_RANDOM_SEED"]) + spawn_key_str = os.environ["MATFLOW_RUN_RNG_SPAWN_KEY"] + task_iID = int(os.environ["MATFLOW_TASK_INSERT_ID"]) + spawn_key = tuple(int(i) for i in spawn_key_str.split(",")) if spawn_key_str else () + spawn_key = tuple([*spawn_key, task_iID, loop_idx["levels"], chain_index]) + rng = np.random.default_rng( + np.random.SeedSequence( + random_seed, + spawn_key=spawn_key, + ) + ) + return rng + + +def generate_next_state(x, prop_std, rng, chain_index): """Generate the next candidate state in a modified Metropolis algorithm. Parameters @@ -32,18 +50,29 @@ def generate_next_state(x, prop_std): Generated candidate state. """ + loop_idx = { + loop_name: int(loop_idx) + for loop_name, loop_idx in ( + item.split("=") + for item in os.environ["MATFLOW_ELEMENT_ITER_LOOP_IDX"].split(";") + ) + } + + if loop_idx["markov_chain_state"] == 0: + # new chain, so want a new RNG: + rng = _init_rng(chain_index, loop_idx) + x = x[:] # convert to numpy array dim = len(x) current_state = x - rng = np.random.default_rng() xi = np.empty(dim) proposal = norm(loc=current_state, scale=prop_std) - xi_hat = np.atleast_1d(proposal.rvs()) + xi_hat = np.atleast_1d(proposal.rvs(random_state=rng)) accept_ratios = np.divide(*norm.pdf([xi_hat, current_state])) accept_idx = rng.random(len(accept_ratios)) < np.minimum(1, accept_ratios) xi[accept_idx] = xi_hat[accept_idx] xi[~accept_idx] = current_state[~accept_idx] - return {"x": xi} + return {"x": xi, "rng": rng} diff --git a/matflow/data/scripts/uq/sample_direct_MC.py b/matflow/data/scripts/uq/sample_direct_MC.py index 57ce3746..43ab7222 100644 --- a/matflow/data/scripts/uq/sample_direct_MC.py +++ b/matflow/data/scripts/uq/sample_direct_MC.py @@ -1,10 +1,21 @@ +import os + import numpy as np from scipy.stats import multivariate_normal def sample_direct_MC(dimension): + # RNG: + random_seed = int(os.environ["MATFLOW_RUN_RANDOM_SEED"]) + sample_idx = int(os.environ["MATFLOW_ELEMENT_IDX"]) # TODO: use repeats idx? + task_iID = int(os.environ["MATFLOW_TASK_INSERT_ID"]) + spawn_key_str = os.environ["MATFLOW_RUN_RNG_SPAWN_KEY"] + spawn_key = tuple(int(i) for i in spawn_key_str.split(",")) if spawn_key_str else () + spawn_key = tuple([*spawn_key, task_iID, sample_idx]) + rng = np.random.default_rng(np.random.SeedSequence(random_seed, spawn_key=spawn_key)) + # sample in standard normal space, variates are transformed before they are passed to # the performance function: - pi = multivariate_normal(mean=np.zeros(dimension), cov=None) + pi = multivariate_normal(mean=np.zeros(dimension), cov=None, seed=rng) x = np.atleast_1d(pi.rvs()) return {"x": x} diff --git a/matflow/data/template_components/task_schemas.yaml b/matflow/data/template_components/task_schemas.yaml index 583bed30..e6bb316a 100644 --- a/matflow/data/template_components/task_schemas.yaml +++ b/matflow/data/template_components/task_schemas.yaml @@ -1867,8 +1867,12 @@ inputs: - parameter: x - parameter: prop_std + - parameter: chain_index + - parameter: rng + default_value: null outputs: - parameter: x + - parameter: rng actions: - script: <> script_data_in: direct diff --git a/matflow/tests/demo_workflows/test_subset_simulation.py b/matflow/tests/demo_workflows/test_subset_simulation.py index 50966777..8d617b15 100644 --- a/matflow/tests/demo_workflows/test_subset_simulation.py +++ b/matflow/tests/demo_workflows/test_subset_simulation.py @@ -1,5 +1,6 @@ import pytest import matflow as mf +from matflow.tests.subset_simulation import subset_simulation @pytest.mark.integration @@ -394,3 +395,35 @@ def test_subset_sim_DAMASK_Mg_two_level_parameter_flow( inc_chain_iters[l_idx].get_data_idx()["inputs.g"] == iter_i.get_data_idx()["outputs.g"] ) + + +@pytest.mark.integration +@pytest.mark.skip(reason="Needs changes in PR #526") +def test_subset_simulation_toy_model_prediction(tmp_path): + """Validate the MatFlow subset simulation implementation for a toy""" + + # run via a MatFlow workflow: + wk = mf.make_and_submit_demo_workflow( + "subset_simulation_toy_model", + path=tmp_path, + status=False, + add_to_known=False, + wait=True, + ) + final_iter = wk.tasks.collate_results.elements[0].latest_iteration_non_skipped + pf = final_iter.get("outputs.pf") + cov = final_iter.get("outputs.cov") + + # run via single function implementation: + pf_sf, cov_sf = subset_simulation( + dimension=200, + target_pf=1e-4, + p_0=0.1, + num_samples=100, + num_levels=7, + prop_std=1.0, + master_seed=1234, # match the master seed in the demo workflow + ) + + assert pf == pf_sf + assert cov == cov_sf diff --git a/matflow/tests/subset_simulation.py b/matflow/tests/subset_simulation.py new file mode 100644 index 00000000..59f8b356 --- /dev/null +++ b/matflow/tests/subset_simulation.py @@ -0,0 +1,344 @@ +"""Module containing functions to run a subset simulation on a simple toy model, used as a +validation of the MatFlow implementation.""" + +from matplotlib import pyplot as plt +import numpy as np +from scipy.stats import norm, multivariate_normal +from numpy.exceptions import AxisError + + +def sample_direct_MC( + dimension, num_samples, seed: int = None, spawn_key: tuple[int] | None = None +): + """This is convoluted to mimic the MatFlow implementation where individual samples + are separate elements, with distinct RNG spawn keys.""" + samples = [] + for sample_idx in range(num_samples): + seed_seq = np.random.SeedSequence(seed, spawn_key=tuple([*spawn_key, sample_idx])) + rng = np.random.default_rng(seed_seq) + pi = multivariate_normal(mean=np.zeros(dimension), cov=None, seed=rng) + samples.append(np.atleast_1d(pi.rvs())) + return np.array(samples) + + +def model(x): + try: + return np.sum(x, axis=1) + except AxisError: + return np.sum(x) + + +def get_y_star(p_f, dimension): + return np.sqrt(dimension) * norm.ppf(1 - p_f) + + +def system_analysis_toy_model(x, dimension: int, target_pf: float): + """`x` is within the failure domain if the return is greater than zero.""" + y_star = get_y_star(target_pf, dimension) + g_i = model(x) - y_star + return g_i + + +def estimate_cov(indicator, p_i: float) -> float: + """Estimate the coefficient of variation at a given conditional level of the subset + simulation.""" + + num_chains, num_states = indicator.shape + N = num_chains * num_states # samples per level + + # covariance sequence (estimated), Eq. 29 + r = np.zeros(num_states - 1) + for k in range(num_states - 1): + r_k = 0 + for l in range(num_states - (k + 1)): + i_1 = indicator[:, l] + i_2 = indicator[:, l + k + 1] + r_k_i = np.dot(i_1, i_2) + r_k += r_k_i + r[k] = (r_k / (N - (k + 1) * num_chains)) - p_i**2 + + r_0 = np.sum(indicator**2) / N - p_i**2 # autocovariance at lag zero (exact) + r_0 = p_i * (1 - p_i) + + rho = r / r_0 + + gamma = 2 * sum( + (1 - (k + 1) * num_chains / N) * rho[k] for k in range(num_states - 1) + ) + delta = np.sqrt((1 - p_i) / (p_i * N) * (1 + gamma)) + + return delta + + +def subset_simulation( + dimension=200, + target_pf=1e-4, + p_0=0.1, + num_samples=100, + num_levels=10, + prop_std=0.1, + master_seed=None, +): + + x = sample_direct_MC(dimension, num_samples, seed=master_seed, spawn_key=(0,)) + g = system_analysis_toy_model(x, dimension, target_pf=target_pf) + + level_covs = [] + for level_idx in range(num_levels): + num_failed = int(np.sum(g > 0)) + num_chains = int(len(g) * p_0) + num_states = int(num_samples / num_chains) + g_unsrt = g.copy() + + # sort responses + srt_idx = np.argsort(g)[::-1] # sort by closest-to-failure first + g = g[srt_idx] + x = x[srt_idx, :] + + threshold = (g[num_chains - 1] + g[num_chains]) / 2 + + # failure probability at this level: + indicator = np.reshape( + g_unsrt > np.minimum(threshold, 0), (num_chains, num_states) + ).astype(int) + level_pf = np.mean(indicator) + + chain_seeds = x[:num_chains] + chain_g = g[:num_chains] + + pf = p_0**level_idx * num_failed / num_samples + if level_idx == 0: + level_cov = np.sqrt((1 - level_pf) / (num_samples * level_pf)) + else: + level_cov = estimate_cov(indicator, level_pf) + level_covs.append(level_cov) + + if is_finished := threshold > 0: + cov = np.sqrt(sum(np.pow(level_covs, 2))).item() + return pf, cov + + all_x = np.ones((num_chains, num_states, dimension)) * np.nan + all_g = np.ones((num_chains, num_states)) * np.nan + + for chain_index in range(num_chains): + + # proceed this Markov chain until all states have been generated + all_x[chain_index, 0] = chain_seeds[chain_index] + all_g[chain_index, 0] = chain_g[chain_index] + + rng_state = None + for state_idx in range(1, num_states): + + # RNG seed sequence for Markov chains: + # note: this could be moved out of the state loop, but in this position + # it matches the MatFlow implementation + spawn_key = (4, level_idx, chain_index) + chain_rng = np.random.default_rng( + np.random.SeedSequence(master_seed, spawn_key=spawn_key) + ) + + if rng_state is not None: + chain_rng.bit_generator.state = rng_state + + x = all_x[chain_index, state_idx - 1] + g = all_g[chain_index, state_idx - 1] + + dim = len(x) + current_state = x + xi = np.empty(dim) + + proposal = norm(loc=current_state, scale=prop_std) + xi_hat = np.atleast_1d(proposal.rvs(random_state=chain_rng)) + + # accept_ratios = np.divide(*norm.pdf([xi_hat, current_state])) + accept_ratios = np.exp(-0.5 * (xi_hat**2 - current_state**2)) + + accept_idx = chain_rng.random(len(accept_ratios)) < np.minimum( + 1, accept_ratios + ) + + xi[accept_idx] = xi_hat[accept_idx] + xi[~accept_idx] = current_state[~accept_idx] + + trial_x = xi + + trial_g = system_analysis_toy_model( + trial_x, dimension, target_pf=target_pf + ) + + current_x = x + current_g = g + + is_accept = trial_g > threshold + new_x = trial_x if is_accept else current_x + new_g = trial_g if is_accept else current_g + + all_x[chain_index, state_idx] = new_x + all_g[chain_index, state_idx] = new_g + + rng_state = chain_rng.bit_generator.state + + g = all_g.reshape((num_samples)) + x = all_x.reshape((num_samples, dimension)) + + +def get_stats(all_pf, all_cov): + + pf_mean = np.mean(all_pf).item() + cov_empirical = np.std(all_pf) / pf_mean + cov_estimate = np.mean(all_cov) + cov_estimate_std = np.std(all_cov) + + return { + "pf": all_pf, + "cov": all_cov, + "pf_mean": pf_mean, + "cov_empirical": cov_empirical, + "cov_estimate": cov_estimate, + "cov_estimate_std": cov_estimate_std, + } + + +def run_repeats(num_samples, prop_std=1.0, num_repeats=100): + all_pf = [] + all_cov = [] + for repeat_idx in range(num_repeats): + pc = (100 * (repeat_idx + 1)) // num_repeats + if pc % 1 == 0: + print( + f"\rrunning {num_repeats} repeats with N={num_samples}...{pc:3d}%", end="" + ) + pf, cov = subset_simulation( + dimension=200, + target_pf=1e-4, + p_0=0.1, + num_samples=num_samples, + num_levels=10, + prop_std=prop_std, + ) + all_pf.append(pf) + all_cov.append(cov) + print() + return get_stats(all_pf, all_cov) + + +def get_toy_model_results_from_matflow_workflows(root_path): + """ + Parameters + ---------- + root_path + A directory containing only (zipped) workflows that are toy-model subset + simulation run repeats. + """ + all_pf = [] + all_cov = [] + for wk_path in root_path.glob("subset_simulation_toy_model_*"): + wk = mf.Workflow(wk_path) + iter_final = wk.tasks.collate_results.elements[0].latest_iteration_non_skipped + all_pf.append(iter_final.get("outputs.pf")) + all_cov.append(iter_final.get("outputs.cov")) + + return get_stats(all_pf, all_cov) + + +def plot_pf_num_samples(all_results, quantile_range, num_samples=None, num_repeats=None): + """ + Specify exactly one of `num_samples` and `num_repeats`. The non-specified is taken to + be the varying key in `all_results`. + """ + + if sum(i is not None for i in (num_samples, num_repeats)) != 1: + raise ValueError("Specify num_samples or num_repeats") + + all_x = [] + all_y = [] + all_lower = [] + all_upper = [] + + for N, data in all_results.items(): + + pf_srt = sorted(data["pf"]) + _x0 = (1 - quantile_range) / 2 + _x1 = _x0 + quantile_range + lower = np.quantile(pf_srt, _x0) + upper = np.quantile(pf_srt, _x1) + + all_x.append(N) + all_y.append(data["pf_mean"]) + all_lower.append(lower) + all_upper.append(upper) + + plt.figure(figsize=(5, 4)) + plt.fill_between( + x=all_x, y1=all_lower, y2=all_upper, alpha=0.5, label=f"CI: {quantile_range}" + ) + plt.scatter(x=all_x, y=all_y, marker="o", label="Mean") + plt.hlines( + y=1e-4, xmin=min(all_results), xmax=max(all_results), color="gray", label="Target" + ) + # plt.xscale("log") + plt.yscale("log") + plt.xlabel(f"{'Num samples, N' if num_repeats else 'Num repeats, R'}") + plt.ylabel("Prob. of failure, pf") + plt.title(f"{num_repeats=!r}" if num_repeats else f"{num_samples=!r}") + plt.legend() + + +def plot_cov_estimates(results, ylim=None, num_repeats=None, num_samples=None): + """ + Specify exactly one of `num_samples` and `num_repeats`. The non-specified is taken to + be the varying key in `all_results`. + """ + if sum(i is not None for i in (num_samples, num_repeats)) != 1: + raise ValueError("Specify num_samples or num_repeats") + + all_x = [] + cov_empirical = [] + cov_estimated = [] + cov_estimated_std = [] + + for N, data in results.items(): + + all_x.append(N) + cov_empirical.append(data["cov_empirical"]) + cov_estimated.append(data["cov_estimate"]) + cov_estimated_std.append(data["cov_estimate_std"]) + + plt.figure(figsize=(5, 4)) + plt.plot(all_x, cov_empirical, label="empirical CoV") + plt.errorbar(x=all_x, y=cov_estimated, yerr=cov_estimated_std, label="estimated CoV") + plt.xlabel(f"{'Num samples, N' if num_repeats else 'Num repeats, R'}") + plt.ylabel("CoV") + plt.title(f"{num_repeats=!r}" if num_repeats else f"{num_samples=!r}") + plt.legend() + if ylim: + plt.ylim(ylim) + + +def plot_pf_dist(results, target_pf): + fig, axs = plt.subplots( + 1, len(results), figsize=(len(results) * 2.5, 3), sharex=True, sharey=True + ) + for idx, (N, data) in enumerate(results.items()): + axs[idx].hist(np.log10(data["pf"])) + axs[idx].set_title(f"Num. samples, N={N}") + axs[idx].axvline(np.log10(target_pf), color="gray") + + # One label for the whole figure + fig.supxlabel("log10(Prob. of failure)") + fig.supylabel("Frequency") + plt.tight_layout() + + +def plot_cov_dist(results): + fig, axs = plt.subplots( + 1, len(results), figsize=(len(results) * 2.5, 3), sharex=True, sharey=True + ) + for idx, (N, data) in enumerate(results.items()): + axs[idx].hist(data["cov"]) + axs[idx].set_title(f"Num. samples, N={N}") + + # One label for the whole figure + fig.supxlabel("CoV") + fig.supylabel("Frequency") + plt.tight_layout() From b4c2872b045b37fb7c3bc2b6384468d506d991f5 Mon Sep 17 00:00:00 2001 From: Adam Plowman Date: Sun, 5 Apr 2026 19:04:12 +0100 Subject: [PATCH 2/4] fix: add random seed to toy model subset simulation demo --- matflow/data/workflows/subset_simulation_toy_model.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/matflow/data/workflows/subset_simulation_toy_model.yaml b/matflow/data/workflows/subset_simulation_toy_model.yaml index 6f9d0111..d5859f68 100644 --- a/matflow/data/workflows/subset_simulation_toy_model.yaml +++ b/matflow/data/workflows/subset_simulation_toy_model.yaml @@ -13,12 +13,15 @@ loops: resources: any: combine_scripts: true + random_seed: 1234 tasks: - schema: sample_direct_MC inputs: dimension: 200 - repeats: 100 # num_samples + repeats: + number: 100 # num_samples + generate_seed_sequence: false groups: - name: all - schema: system_analysis_toy_model From 55e44c2f441702a594b416291b67024907d60cb0 Mon Sep 17 00:00:00 2001 From: Adam Plowman Date: Thu, 9 Apr 2026 14:18:25 +0100 Subject: [PATCH 3/4] test: update direct implementation of subset simulation for toy models --- matflow/data/scripts/uq/collate_results.py | 6 +- .../data/scripts/uq/generate_next_state.py | 9 + .../scripts/uq/generate_next_state_ACS.py | 47 ++- .../data/scripts/uq/generate_next_state_CS.py | 45 ++- .../template_components/task_schemas.yaml | 8 + .../subset_simulation_toy_model_ACS.yaml | 5 +- .../subset_simulation_toy_model_CS.yaml | 5 +- matflow/tests/subset_simulation.py | 284 +++++++++++++----- 8 files changed, 319 insertions(+), 90 deletions(-) diff --git a/matflow/data/scripts/uq/collate_results.py b/matflow/data/scripts/uq/collate_results.py index 11d76b60..e72fbf9b 100644 --- a/matflow/data/scripts/uq/collate_results.py +++ b/matflow/data/scripts/uq/collate_results.py @@ -19,9 +19,12 @@ def estimate_cov(indicator, p_i: float) -> float: r_k += r_k_i r[k] = (r_k / (N - (k + 1) * num_chains)) - p_i**2 - r_0 = np.sum(indicator**2) / N - p_i**2 # autocovariance at lag zero (exact) r_0 = p_i * (1 - p_i) + if np.isclose(r_0, 0.0): + # i.e. p_i is 1.0 + return 0.0 + rho = r / r_0 gamma = 2 * sum( @@ -95,7 +98,6 @@ def collate_results(g, x, p_0, all_g, all_x, all_accept, level_cov): chain_seeds = x[:num_chains] chain_g = g[:num_chains] - # print(f"{chain_g=!r}") is_finished = (threshold > 0).item() diff --git a/matflow/data/scripts/uq/generate_next_state.py b/matflow/data/scripts/uq/generate_next_state.py index 4cd76e36..2825ad1d 100644 --- a/matflow/data/scripts/uq/generate_next_state.py +++ b/matflow/data/scripts/uq/generate_next_state.py @@ -42,12 +42,21 @@ def generate_next_state(x, prop_std, rng, chain_index): ---------- x Current state on which the candidate state will depend. + prop_std + Proposal distribution standard deviation. + rng + Random number generator to be used in this function. + chain_index + Index of the Markov chain within the subset simulation level loop. Returns ------- dict: x: Generated candidate state. + rng: + Random number generator to be used in the next invocation of this function, + for this chain. """ loop_idx = { diff --git a/matflow/data/scripts/uq/generate_next_state_ACS.py b/matflow/data/scripts/uq/generate_next_state_ACS.py index bdab0bfc..cc2d01e2 100644 --- a/matflow/data/scripts/uq/generate_next_state_ACS.py +++ b/matflow/data/scripts/uq/generate_next_state_ACS.py @@ -1,9 +1,28 @@ +import os + import numpy as np from numpy.typing import NDArray from scipy.stats import norm -def generate_next_state_ACS(x, prop_std, lambda_): +def _init_rng(chain_index, loop_idx): + """Initialise the random number generator, which must happen once for each Markov + chain.""" + random_seed = int(os.environ["MATFLOW_RUN_RANDOM_SEED"]) + spawn_key_str = os.environ["MATFLOW_RUN_RNG_SPAWN_KEY"] + task_iID = int(os.environ["MATFLOW_TASK_INSERT_ID"]) + spawn_key = tuple(int(i) for i in spawn_key_str.split(",")) if spawn_key_str else () + spawn_key = tuple([*spawn_key, task_iID, loop_idx["levels"], chain_index]) + rng = np.random.default_rng( + np.random.SeedSequence( + random_seed, + spawn_key=spawn_key, + ) + ) + return rng + + +def generate_next_state_ACS(x, prop_std, lambda_, rng, chain_index): """Generate the next candidate state using adaptive conditional sampling (a.k.a adaptive subset-infinity) @@ -11,16 +30,38 @@ def generate_next_state_ACS(x, prop_std, lambda_): ---------- x Current state on which the candidate state will depend. + prop_std + Proposal distribution standard deviation. + lambda_ + Scaling parameter. + rng + Random number generator to be used in this function. + chain_index + Index of the Markov chain within the subset simulation level loop. Returns ------- dict: x: Generated candidate state. + rng: + Random number generator to be used in the next invocation of this function, + for this chain. """ + loop_idx = { + loop_name: int(loop_idx) + for loop_name, loop_idx in ( + item.split("=") + for item in os.environ["MATFLOW_ELEMENT_ITER_LOOP_IDX"].split(";") + ) + } + + if loop_idx["markov_chain_state"] == 0: + # new chain, so want a new RNG: + rng = _init_rng(chain_index, loop_idx) + x = x[:] # convert to numpy array sigma = np.minimum(1, lambda_ * prop_std) rho = np.sqrt(1 - sigma**2) - - return {"x": norm.rvs(loc=x * rho, scale=sigma)} + return {"x": norm.rvs(loc=x * rho, scale=sigma, random_state=rng), "rng": rng} diff --git a/matflow/data/scripts/uq/generate_next_state_CS.py b/matflow/data/scripts/uq/generate_next_state_CS.py index 51bc12d8..cea84ae2 100644 --- a/matflow/data/scripts/uq/generate_next_state_CS.py +++ b/matflow/data/scripts/uq/generate_next_state_CS.py @@ -1,9 +1,28 @@ +import os + import numpy as np from numpy.typing import NDArray from scipy.stats import norm -def generate_next_state_CS(x, prop_std): +def _init_rng(chain_index, loop_idx): + """Initialise the random number generator, which must happen once for each Markov + chain.""" + random_seed = int(os.environ["MATFLOW_RUN_RANDOM_SEED"]) + spawn_key_str = os.environ["MATFLOW_RUN_RNG_SPAWN_KEY"] + task_iID = int(os.environ["MATFLOW_TASK_INSERT_ID"]) + spawn_key = tuple(int(i) for i in spawn_key_str.split(",")) if spawn_key_str else () + spawn_key = tuple([*spawn_key, task_iID, loop_idx["levels"], chain_index]) + rng = np.random.default_rng( + np.random.SeedSequence( + random_seed, + spawn_key=spawn_key, + ) + ) + return rng + + +def generate_next_state_CS(x, prop_std, rng, chain_index): """Generate the next candidate state using conditional sampling (a.k.a subset-infinity) @@ -11,15 +30,35 @@ def generate_next_state_CS(x, prop_std): ---------- x Current state on which the candidate state will depend. + prop_std + Proposal distribution standard deviation. + rng + Random number generator to be used in this function. + chain_index + Index of the Markov chain within the subset simulation level loop. Returns ------- dict: x: Generated candidate state. + rng: + Random number generator to be used in the next invocation of this function, + for this chain. """ + loop_idx = { + loop_name: int(loop_idx) + for loop_name, loop_idx in ( + item.split("=") + for item in os.environ["MATFLOW_ELEMENT_ITER_LOOP_IDX"].split(";") + ) + } + + if loop_idx["markov_chain_state"] == 0: + # new chain, so want a new RNG: + rng = _init_rng(chain_index, loop_idx) + x = x[:] # convert to numpy array rho = np.sqrt(1 - prop_std**2) - - return {"x": norm.rvs(loc=x * rho, scale=prop_std)} + return {"x": norm.rvs(loc=x * rho, scale=prop_std, random_state=rng), "rng": rng} diff --git a/matflow/data/template_components/task_schemas.yaml b/matflow/data/template_components/task_schemas.yaml index e6bb316a..92ef78a4 100644 --- a/matflow/data/template_components/task_schemas.yaml +++ b/matflow/data/template_components/task_schemas.yaml @@ -1888,8 +1888,12 @@ inputs: - parameter: x - parameter: prop_std + - parameter: chain_index + - parameter: rng + default_value: null outputs: - parameter: x + - parameter: rng actions: - script: <> script_data_in: direct @@ -1906,8 +1910,12 @@ - parameter: x - parameter: prop_std - parameter: lambda_ + - parameter: chain_index + - parameter: rng + default_value: null outputs: - parameter: x + - parameter: rng actions: - script: <> script_data_in: direct diff --git a/matflow/data/workflows/subset_simulation_toy_model_ACS.yaml b/matflow/data/workflows/subset_simulation_toy_model_ACS.yaml index 93e86344..1b507a7e 100644 --- a/matflow/data/workflows/subset_simulation_toy_model_ACS.yaml +++ b/matflow/data/workflows/subset_simulation_toy_model_ACS.yaml @@ -18,12 +18,15 @@ loops: resources: any: combine_scripts: true + random_seed: 1234 tasks: - schema: sample_direct_MC inputs: dimension: 200 - repeats: 1000 # num_samples + repeats: + number: 100 # num_samples + generate_seed_sequence: false groups: - name: all - schema: system_analysis_toy_model diff --git a/matflow/data/workflows/subset_simulation_toy_model_CS.yaml b/matflow/data/workflows/subset_simulation_toy_model_CS.yaml index 4bd80abb..5d7bec46 100644 --- a/matflow/data/workflows/subset_simulation_toy_model_CS.yaml +++ b/matflow/data/workflows/subset_simulation_toy_model_CS.yaml @@ -13,12 +13,15 @@ loops: resources: any: combine_scripts: true + random_seed: 1234 tasks: - schema: sample_direct_MC inputs: dimension: 200 - repeats: 100 # num_samples + repeats: + number: 100 # num_samples + generate_seed_sequence: false groups: - name: all - schema: system_analysis_toy_model diff --git a/matflow/tests/subset_simulation.py b/matflow/tests/subset_simulation.py index 59f8b356..ebb8622d 100644 --- a/matflow/tests/subset_simulation.py +++ b/matflow/tests/subset_simulation.py @@ -1,24 +1,42 @@ """Module containing functions to run a subset simulation on a simple toy model, used as a validation of the MatFlow implementation.""" +from datetime import datetime +import pickle +from pathlib import Path + from matplotlib import pyplot as plt import numpy as np from scipy.stats import norm, multivariate_normal from numpy.exceptions import AxisError +from hpcflow.sdk.log import TimeIt +@TimeIt.decorator def sample_direct_MC( - dimension, num_samples, seed: int = None, spawn_key: tuple[int] | None = None + dimension, + num_samples, + seed: int = None, + spawn_key: tuple[int] | None = None, + mimic_matflow: bool = False, ): - """This is convoluted to mimic the MatFlow implementation where individual samples - are separate elements, with distinct RNG spawn keys.""" - samples = [] - for sample_idx in range(num_samples): - seed_seq = np.random.SeedSequence(seed, spawn_key=tuple([*spawn_key, sample_idx])) - rng = np.random.default_rng(seed_seq) + if mimic_matflow: + # convoluted to mimic the MatFlow implementation where individual samples + # are separate elements, with distinct RNG spawn keys. + samples = [] + for sample_idx in range(num_samples): + seed_seq = np.random.SeedSequence( + seed, spawn_key=tuple([*spawn_key, sample_idx]) + ) + rng = np.random.default_rng(seed_seq) + pi = multivariate_normal(mean=np.zeros(dimension), cov=None, seed=rng) + samples.append(np.atleast_1d(pi.rvs())) + return np.array(samples) + else: + # simpler and considerably faster! + rng = np.random.default_rng(seed) pi = multivariate_normal(mean=np.zeros(dimension), cov=None, seed=rng) - samples.append(np.atleast_1d(pi.rvs())) - return np.array(samples) + return pi.rvs(num_samples) def model(x): @@ -57,9 +75,12 @@ def estimate_cov(indicator, p_i: float) -> float: r_k += r_k_i r[k] = (r_k / (N - (k + 1) * num_chains)) - p_i**2 - r_0 = np.sum(indicator**2) / N - p_i**2 # autocovariance at lag zero (exact) r_0 = p_i * (1 - p_i) + if np.isclose(r_0, 0.0): + # i.e. p_i is 1.0 + return 0.0 + rho = r / r_0 gamma = 2 * sum( @@ -70,17 +91,56 @@ def estimate_cov(indicator, p_i: float) -> float: return delta +def generate_next_state(x, prop_std, rng): + + dim = len(x) + current_state = x + xi = np.empty(dim) + + proposal = norm(loc=current_state, scale=prop_std) + xi_hat = np.atleast_1d(proposal.rvs(random_state=rng)) + + accept_ratios = np.divide(*norm.pdf([xi_hat, current_state])) + # accept_ratios = np.exp(-0.5 * (xi_hat**2 - current_state**2)) + + accept_idx = rng.random(len(accept_ratios)) < np.minimum(1, accept_ratios) + + xi[accept_idx] = xi_hat[accept_idx] + xi[~accept_idx] = current_state[~accept_idx] + + return xi + + +def generate_next_state_CS(x, prop_std, rng): + rho = np.sqrt(1 - prop_std**2) + return norm.rvs(loc=x * rho, scale=prop_std, random_state=rng) + + +def generate_next_state_ACS(x, prop_std, lambda_, rng): + sigma = np.minimum(1, lambda_ * prop_std) + rho = np.sqrt(1 - sigma**2) + return norm.rvs(loc=x * rho, scale=sigma, random_state=rng) + + def subset_simulation( dimension=200, target_pf=1e-4, p_0=0.1, num_samples=100, num_levels=10, - prop_std=0.1, master_seed=None, + next_state=generate_next_state, + next_state_kwargs=None, + mimic_matflow: bool = False, ): - x = sample_direct_MC(dimension, num_samples, seed=master_seed, spawn_key=(0,)) + x = sample_direct_MC( + dimension, + num_samples, + seed=master_seed, + spawn_key=(0,), # spawn key to match the task ID in the matflow workflow + mimic_matflow=mimic_matflow, + ) g = system_analysis_toy_model(x, dimension, target_pf=target_pf) level_covs = [] @@ -126,42 +186,26 @@ def subset_simulation( all_x[chain_index, 0] = chain_seeds[chain_index] all_g[chain_index, 0] = chain_g[chain_index] - rng_state = None + chain_rng = None for state_idx in range(1, num_states): # RNG seed sequence for Markov chains: - # note: this could be moved out of the state loop, but in this position - # it matches the MatFlow implementation - spawn_key = (4, level_idx, chain_index) - chain_rng = np.random.default_rng( - np.random.SeedSequence(master_seed, spawn_key=spawn_key) - ) - - if rng_state is not None: - chain_rng.bit_generator.state = rng_state + if state_idx == 1: + # spawn key to match the task ID in the matflow workflow + spawn_key = (4, level_idx, chain_index) + chain_rng = np.random.default_rng( + np.random.SeedSequence(master_seed, spawn_key=spawn_key) + ) x = all_x[chain_index, state_idx - 1] g = all_g[chain_index, state_idx - 1] - dim = len(x) - current_state = x - xi = np.empty(dim) - - proposal = norm(loc=current_state, scale=prop_std) - xi_hat = np.atleast_1d(proposal.rvs(random_state=chain_rng)) - - # accept_ratios = np.divide(*norm.pdf([xi_hat, current_state])) - accept_ratios = np.exp(-0.5 * (xi_hat**2 - current_state**2)) - - accept_idx = chain_rng.random(len(accept_ratios)) < np.minimum( - 1, accept_ratios - ) - - xi[accept_idx] = xi_hat[accept_idx] - xi[~accept_idx] = current_state[~accept_idx] - - trial_x = xi - + next_state_kwargs_i = { + "x": x, + "rng": chain_rng, + **(next_state_kwargs or {}), + } + trial_x = next_state(**next_state_kwargs_i) trial_g = system_analysis_toy_model( trial_x, dimension, target_pf=target_pf ) @@ -176,8 +220,6 @@ def subset_simulation( all_x[chain_index, state_idx] = new_x all_g[chain_index, state_idx] = new_g - rng_state = chain_rng.bit_generator.state - g = all_g.reshape((num_samples)) x = all_x.reshape((num_samples, dimension)) @@ -199,7 +241,18 @@ def get_stats(all_pf, all_cov): } -def run_repeats(num_samples, prop_std=1.0, num_repeats=100): +def run_repeats( + num_samples, + next_state, + next_state_kwargs=None, + num_repeats=100, + dimension=200, + target_pf=1e-4, + p_0=0.1, + num_levels=10, + mimic_matflow=False, +): + seeds = np.random.SeedSequence().generate_state(num_repeats) all_pf = [] all_cov = [] for repeat_idx in range(num_repeats): @@ -209,12 +262,15 @@ def run_repeats(num_samples, prop_std=1.0, num_repeats=100): f"\rrunning {num_repeats} repeats with N={num_samples}...{pc:3d}%", end="" ) pf, cov = subset_simulation( - dimension=200, - target_pf=1e-4, - p_0=0.1, + dimension=dimension, + target_pf=target_pf, + p_0=p_0, num_samples=num_samples, - num_levels=10, - prop_std=prop_std, + num_levels=num_levels, + next_state=next_state, + next_state_kwargs=next_state_kwargs, + master_seed=seeds[repeat_idx], + mimic_matflow=mimic_matflow, ) all_pf.append(pf) all_cov.append(cov) @@ -222,6 +278,63 @@ def run_repeats(num_samples, prop_std=1.0, num_repeats=100): return get_stats(all_pf, all_cov) +def run_convergence( + converge_label: str, + fixed_num: int, + series: list[int], + next_state, + next_state_kwargs: dict, + mimic_matflow: bool, +): + """Run a convergence test on the toy model subset simulation, for either number of samples per level, N, or number of repeats, R. + + Parameters + ---------- + converge_label + Either "N" (num samples) or "R" (num repeats) + fixed_num + The size of the non-varying parameter (i.e. num_repeats if converge_label is "N") + series + List of integers corresponding to the `converge_label` quantity + next_state + The callable to use to generate the next state + next_state_kwargs + Keyword arguments to pass to the next state callable. + """ + + fixed_str = ( + f"{'R' if converge_label == 'N' else 'N'}{fixed_num}" # e.g. N200 or R100 etc + ) + direct_results = { + "data": {}, + "next_state": next_state.__name__, + "next_state_kwargs": next_state_kwargs, + "converge_label": converge_label, + "fixed_num": fixed_num, + "series": series, + } + for num in series: + run_kwargs_i = { + "next_state": next_state, + "next_state_kwargs": next_state_kwargs, + "mimic_matflow": mimic_matflow, + } + if converge_label == "N": + run_kwargs_i["num_samples"] = num + run_kwargs_i["num_repeats"] = fixed_num + elif converge_label == "R": + run_kwargs_i["num_repeats"] = num + run_kwargs_i["num_samples"] = fixed_num + direct_results["data"][num] = run_repeats(**run_kwargs_i) + + timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") + file_name = f"toy_model_runs_{converge_label}_converge_{fixed_str}_{next_state.__name__}_std{next_state_kwargs['prop_std']:.2f}_mimic{str(int(mimic_matflow))}_{timestamp}.pkl" + with Path(file_name).open("wb") as fh: + pickle.dump(direct_results, fh) + + return direct_results + + def get_toy_model_results_from_matflow_workflows(root_path): """ Parameters @@ -241,21 +354,20 @@ def get_toy_model_results_from_matflow_workflows(root_path): return get_stats(all_pf, all_cov) -def plot_pf_num_samples(all_results, quantile_range, num_samples=None, num_repeats=None): - """ - Specify exactly one of `num_samples` and `num_repeats`. The non-specified is taken to - be the varying key in `all_results`. - """ +def plot_pf_num_samples(all_results, quantile_range): - if sum(i is not None for i in (num_samples, num_repeats)) != 1: - raise ValueError("Specify num_samples or num_repeats") + converge_label = all_results["converge_label"] + fixed_num = all_results["fixed_num"] + title = f"{'num_repeats' if converge_label == 'N' else 'num_samples'}={fixed_num!r}" + xlabel = "Num samples, N" if converge_label == "N" else "Num repeats, R" all_x = [] all_y = [] all_lower = [] all_upper = [] - for N, data in all_results.items(): + all_data = all_results["data"] + for N, data in all_data.items(): pf_srt = sorted(data["pf"]) _x0 = (1 - quantile_range) / 2 @@ -274,31 +386,30 @@ def plot_pf_num_samples(all_results, quantile_range, num_samples=None, num_repea ) plt.scatter(x=all_x, y=all_y, marker="o", label="Mean") plt.hlines( - y=1e-4, xmin=min(all_results), xmax=max(all_results), color="gray", label="Target" + y=1e-4, xmin=min(all_data), xmax=max(all_data), color="gray", label="Target" ) # plt.xscale("log") - plt.yscale("log") - plt.xlabel(f"{'Num samples, N' if num_repeats else 'Num repeats, R'}") + # plt.yscale("log") + plt.xlabel(xlabel) plt.ylabel("Prob. of failure, pf") - plt.title(f"{num_repeats=!r}" if num_repeats else f"{num_samples=!r}") + plt.title(title) plt.legend() -def plot_cov_estimates(results, ylim=None, num_repeats=None, num_samples=None): - """ - Specify exactly one of `num_samples` and `num_repeats`. The non-specified is taken to - be the varying key in `all_results`. - """ - if sum(i is not None for i in (num_samples, num_repeats)) != 1: - raise ValueError("Specify num_samples or num_repeats") +def plot_cov_estimates(all_results, ylim=None): + + converge_label = all_results["converge_label"] + fixed_num = all_results["fixed_num"] + title = f"{'num_repeats' if converge_label == 'N' else 'num_samples'}={fixed_num!r}" + xlabel = "Num samples, N" if converge_label == "N" else "Num repeats, R" all_x = [] cov_empirical = [] cov_estimated = [] cov_estimated_std = [] - for N, data in results.items(): - + all_data = all_results["data"] + for N, data in all_data.items(): all_x.append(N) cov_empirical.append(data["cov_empirical"]) cov_estimated.append(data["cov_estimate"]) @@ -307,38 +418,51 @@ def plot_cov_estimates(results, ylim=None, num_repeats=None, num_samples=None): plt.figure(figsize=(5, 4)) plt.plot(all_x, cov_empirical, label="empirical CoV") plt.errorbar(x=all_x, y=cov_estimated, yerr=cov_estimated_std, label="estimated CoV") - plt.xlabel(f"{'Num samples, N' if num_repeats else 'Num repeats, R'}") + plt.xlabel(xlabel) plt.ylabel("CoV") - plt.title(f"{num_repeats=!r}" if num_repeats else f"{num_samples=!r}") + plt.title(title) plt.legend() if ylim: plt.ylim(ylim) -def plot_pf_dist(results, target_pf): +def plot_pf_dist(all_results, target_pf): + all_data = all_results["data"] + converge_label = all_results["converge_label"] + fixed_num = all_results["fixed_num"] + title = f"{'num_repeats' if converge_label == 'N' else 'num_samples'}={fixed_num!r}" + fig, axs = plt.subplots( - 1, len(results), figsize=(len(results) * 2.5, 3), sharex=True, sharey=True + 1, len(all_data), figsize=(len(all_data) * 2.5, 3), sharex=True, sharey=True ) - for idx, (N, data) in enumerate(results.items()): + for idx, (N, data) in enumerate(all_data.items()): + ax_t = f"Num samples, N={N}" if converge_label == "N" else f"Num repeats, R={N}" axs[idx].hist(np.log10(data["pf"])) - axs[idx].set_title(f"Num. samples, N={N}") + axs[idx].set_title(ax_t) axs[idx].axvline(np.log10(target_pf), color="gray") # One label for the whole figure fig.supxlabel("log10(Prob. of failure)") fig.supylabel("Frequency") + plt.title(title) plt.tight_layout() -def plot_cov_dist(results): +def plot_cov_dist(all_results): + all_data = all_results["data"] + converge_label = all_results["converge_label"] + fixed_num = all_results["fixed_num"] + title = f"{'num_repeats' if converge_label == 'N' else 'num_samples'}={fixed_num!r}" fig, axs = plt.subplots( - 1, len(results), figsize=(len(results) * 2.5, 3), sharex=True, sharey=True + 1, len(all_data), figsize=(len(all_data) * 2.5, 3), sharex=True, sharey=True ) - for idx, (N, data) in enumerate(results.items()): + for idx, (N, data) in enumerate(all_data.items()): + ax_t = f"Num samples, N={N}" if converge_label == "N" else f"Num repeats, R={N}" axs[idx].hist(data["cov"]) - axs[idx].set_title(f"Num. samples, N={N}") + axs[idx].set_title(ax_t) # One label for the whole figure fig.supxlabel("CoV") fig.supylabel("Frequency") + plt.title(title) plt.tight_layout() From 6f423239717c9212dbaab5ed5689b0fd2144900e Mon Sep 17 00:00:00 2001 From: Adam Plowman Date: Fri, 10 Apr 2026 13:56:04 +0100 Subject: [PATCH 4/4] test: enable subset simulation toy model test --- .../subset_simulation_toy_model.yaml | 1 - .../demo_workflows/test_demo_workflows.py | 45 +++++++++++++++++++ .../demo_workflows/test_subset_simulation.py | 33 -------------- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/matflow/data/workflows/subset_simulation_toy_model.yaml b/matflow/data/workflows/subset_simulation_toy_model.yaml index d5859f68..cf541b9b 100644 --- a/matflow/data/workflows/subset_simulation_toy_model.yaml +++ b/matflow/data/workflows/subset_simulation_toy_model.yaml @@ -13,7 +13,6 @@ loops: resources: any: combine_scripts: true - random_seed: 1234 tasks: - schema: sample_direct_MC diff --git a/matflow/tests/demo_workflows/test_demo_workflows.py b/matflow/tests/demo_workflows/test_demo_workflows.py index ec5ef9ee..789f978c 100644 --- a/matflow/tests/demo_workflows/test_demo_workflows.py +++ b/matflow/tests/demo_workflows/test_demo_workflows.py @@ -3,6 +3,7 @@ from hpcflow.sdk.core.enums import EARStatus import matflow as mf +from matflow.tests.subset_simulation import generate_next_state, subset_simulation @pytest.mark.demo_workflows @@ -47,3 +48,47 @@ def test_damask_input_files(tmp_path, save_fig, reference_array_data): name="stress_strain.npz", rtol=1e-10, ) + + +@pytest.mark.demo_workflows +def test_subset_simulation_toy_model_prediction(tmp_path): + """Validate the MatFlow subset simulation implementation for a toy model. + + Note this test must be run with a `--with-env-source /path/to/envs.yaml` option that + points to an environment file with definitions for: + - `damask_parse_env` + + """ + + seed = 1234 + + # run via a MatFlow workflow: + wk = mf.make_and_submit_demo_workflow( + "subset_simulation_toy_model", + path=tmp_path, + status=False, + add_to_known=False, + wait=True, + resources={"random_seed": seed}, + ) + final_iter = wk.tasks.collate_results.elements[0].latest_iteration_non_skipped + pf = final_iter.get("outputs.pf") + cov = final_iter.get("outputs.cov") + + # run via single function implementation: + pf_sf, cov_sf = subset_simulation( + dimension=200, + target_pf=1e-4, + p_0=0.1, + num_samples=100, + num_levels=7, + next_state=generate_next_state, + next_state_kwargs={ + "prop_std": 1.0, + }, + master_seed=seed, + mimic_matflow=True, + ) + + assert pf == pf_sf + assert cov == cov_sf diff --git a/matflow/tests/demo_workflows/test_subset_simulation.py b/matflow/tests/demo_workflows/test_subset_simulation.py index 8d617b15..50966777 100644 --- a/matflow/tests/demo_workflows/test_subset_simulation.py +++ b/matflow/tests/demo_workflows/test_subset_simulation.py @@ -1,6 +1,5 @@ import pytest import matflow as mf -from matflow.tests.subset_simulation import subset_simulation @pytest.mark.integration @@ -395,35 +394,3 @@ def test_subset_sim_DAMASK_Mg_two_level_parameter_flow( inc_chain_iters[l_idx].get_data_idx()["inputs.g"] == iter_i.get_data_idx()["outputs.g"] ) - - -@pytest.mark.integration -@pytest.mark.skip(reason="Needs changes in PR #526") -def test_subset_simulation_toy_model_prediction(tmp_path): - """Validate the MatFlow subset simulation implementation for a toy""" - - # run via a MatFlow workflow: - wk = mf.make_and_submit_demo_workflow( - "subset_simulation_toy_model", - path=tmp_path, - status=False, - add_to_known=False, - wait=True, - ) - final_iter = wk.tasks.collate_results.elements[0].latest_iteration_non_skipped - pf = final_iter.get("outputs.pf") - cov = final_iter.get("outputs.cov") - - # run via single function implementation: - pf_sf, cov_sf = subset_simulation( - dimension=200, - target_pf=1e-4, - p_0=0.1, - num_samples=100, - num_levels=7, - prop_std=1.0, - master_seed=1234, # match the master seed in the demo workflow - ) - - assert pf == pf_sf - assert cov == cov_sf