Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added gridfm_graphkit PR-VLDloss.zip
Binary file not shown.
7 changes: 7 additions & 0 deletions gridfm_graphkit/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
PowerFlowTransforms,
OptimalPowerFlowTransforms,
StateEstimationTransforms,
##############
VoltageLossDetectionTransforms
#############

)

__all__ = [
"HeteroDataMVANormalizer",
"PowerFlowTransforms",
"OptimalPowerFlowTransforms",
"StateEstimationTransforms",
###################
"VoltageLossDetectionTransforms"
##################
]
13 changes: 13 additions & 0 deletions gridfm_graphkit/datasets/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
BS = 13 # Shunt susceptance (p.u.)
VN_KV = 14 # Nominal voltage

##ADDITIONAL INPUT FEATURES OF BUS STATUS
BUS_BASE_STATUS_H = 15 # bus ON/OFF status in the pre-contingency(base topology)
BUS_CONT_H = 16 # bus contingency to be applied

# =========================
# === OUTPUT FEATURE INDICES ==
# =========================
Expand All @@ -26,6 +30,10 @@
QG_OUT = 3
PG_OUT_GEN = 0

##ADDITIONAL OUTPUT FEATURE OF BUS STATUS
PHYSICAL_BUS_DIM = 4 # Physical bus outputs predicted by the model for VLD tasks
BUS_STATUS_TARGET = 5 # post-contingency energized/de-energized target
BUS_STATUS_LOGIT_OUT = 4 # # Extra model output columns for VLD tasks

# ================================
# === GENERATOR FEATURE INDICES ==
Expand All @@ -52,3 +60,8 @@
ANG_MAX = 8 # Angle max (deg)
RATE_A = 9 # Thermal limit
B_ON = 10 # Branch on/off

##ADDITIONAL INPUT FEATURES OF BUS STATUS
BRANCH_BASE_STATUS_E = 11 # branch ON/OFF status in the pre-contingency(base topology)
BRANCH_CONT_E = 12 # branch contingency to be applied

142 changes: 42 additions & 100 deletions gridfm_graphkit/datasets/hetero_powergrid_datamodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
from gridfm_graphkit.datasets.utils import (
split_dataset,
split_dataset_by_load_scenario_idx,
split_from_existing_files,
)
from gridfm_graphkit.datasets.powergrid_hetero_dataset import HeteroGridDatasetDisk
import numpy as np
import random
import warnings
import lightning as L
from pathlib import Path
from typing import List
from lightning.pytorch.loggers import MLFlowLogger

Expand Down Expand Up @@ -103,11 +101,6 @@ def __init__(
"split_by_load_scenario_idx",
False,
)
self.split_from_existing_files = getattr(
args.data,
"split_from_existing_files",
None,
)
self.args = args
self.normalizer_stats_path = normalizer_stats_path
self.data_normalizers = []
Expand All @@ -120,15 +113,6 @@ def __init__(
self.test_scenario_ids: List[List[int]] = []
self._is_setup_done = False

if self.split_by_load_scenario_idx:
assert self.split_from_existing_files is None, " either `split_by_load_scenario_idx` or `split_from_existing_files` may be used, not both"

if self.split_from_existing_files is not None:
assert isinstance(self.split_from_existing_files, str), "`split_from_existing_files` must be an existing folder in string format"
self.split_from_existing_files = Path(self.split_from_existing_files)
assert self.split_from_existing_files.is_dir(), "`split_from_existing_files` must be an existing folder in string format"


def setup(self, stage: str):
if self._is_setup_done:
print(f"Setup already done for stage={stage}, skipping...")
Expand Down Expand Up @@ -183,94 +167,54 @@ def setup(self, stage: str):

# Create a subset
all_indices = list(range(len(dataset)))
# Random seed set before every shuffle for reproducibility in case the power grid datasets are analyzed in a different order
random.seed(self.args.seed)
random.shuffle(all_indices)
subset_indices = all_indices[:num_scenarios]

# load_scenario for each scenario in the subset
load_scenarios = dataset.load_scenarios[subset_indices]

if self.split_from_existing_files is not None:
warnings.warn(
"`data.scenarios` is ignored when `split_from_existing_files` is set; "
"train/val/test scenario ids are loaded from the provided split files.",
)
dataset = Subset(dataset, subset_indices)

if self.dataset_wrapper is not None:
wrapper_cls = DATASET_WRAPPER_REGISTRY.get(self.dataset_wrapper)
dataset = wrapper_cls(
dataset,
cache_dir=self.dataset_wrapper_cache_dir,
)
if self.dataset_wrapper is not None:
wrapper_cls = DATASET_WRAPPER_REGISTRY.get(self.dataset_wrapper)
dataset = wrapper_cls(dataset, cache_dir=self.dataset_wrapper_cache_dir)

(train_dataset, val_dataset, test_dataset), subset_indices = (
split_from_existing_files(
dataset,
self.split_from_existing_files,
)
)
train_scenario_ids = subset_indices["train"]
val_scenario_ids = subset_indices["val"]
test_scenario_ids = subset_indices["test"]
num_scenarios = int(
np.unique(
train_scenario_ids + val_scenario_ids + test_scenario_ids,
).shape[0],
)
else:
# Random seed set before every shuffle for reproducibility in case the power grid datasets are analyzed in a different order
random.seed(self.args.seed)
random.shuffle(all_indices)
subset_indices = all_indices[:num_scenarios]

load_scenarios = None
if self.split_by_load_scenario_idx:
if not hasattr(dataset, "load_scenarios"):
raise ValueError(
"`data.split_by_load_scenario_idx=true` requires "
"`load_scenario_idx` in raw bus data so "
"`processed/load_scenarios.pt` can be created.",
)
# load_scenario for each scenario in the subset
load_scenarios = dataset.load_scenarios[subset_indices]


dataset = Subset(dataset, subset_indices)

if self.dataset_wrapper is not None:
wrapper_cls = DATASET_WRAPPER_REGISTRY.get(self.dataset_wrapper)
dataset = wrapper_cls(dataset, cache_dir=self.dataset_wrapper_cache_dir)


# Random seed set before every split, same as above
np.random.seed(self.args.seed)
if self.split_by_load_scenario_idx:
train_dataset, val_dataset, test_dataset = (
split_dataset_by_load_scenario_idx(
dataset,
self.data_dir,
load_scenarios,
self.args.data.val_ratio,
self.args.data.test_ratio,
)
)
else:
train_dataset, val_dataset, test_dataset = split_dataset(
# Random seed set before every split, same as above
np.random.seed(self.args.seed)
if self.split_by_load_scenario_idx:
train_dataset, val_dataset, test_dataset = (
split_dataset_by_load_scenario_idx(
dataset,
self.data_dir,
load_scenarios,
self.args.data.val_ratio,
self.args.data.test_ratio,
)

# Extract scenario IDs for each split
train_scenario_ids = self._extract_scenario_ids(
train_dataset,
subset_indices,
)
val_scenario_ids = self._extract_scenario_ids(
val_dataset,
subset_indices,
)
test_scenario_ids = self._extract_scenario_ids(
test_dataset,
subset_indices,
else:
train_dataset, val_dataset, test_dataset = split_dataset(
dataset,
self.data_dir,
self.args.data.val_ratio,
self.args.data.test_ratio,
)

# Extract scenario IDs for each split
train_scenario_ids = self._extract_scenario_ids(
train_dataset,
subset_indices,
)
val_scenario_ids = self._extract_scenario_ids(
val_dataset,
subset_indices,
)
test_scenario_ids = self._extract_scenario_ids(
test_dataset,
subset_indices,
)

# Fit normalizer: restore from saved stats only for fit_on_train
# normalizers (global baseMVA must match the model's training run).
# fit_on_dataset normalizers compute per-scenario stats and must
Expand Down Expand Up @@ -425,24 +369,22 @@ def _dataloader_kwargs(self):
pin_memory=torch.cuda.is_available(),
persistent_workers=num_workers > 0,
)
# Use 'fork' on Linux. It avoids the forkserver intermediary pipe which
# is fragile when the process has many threads (e.g. OpenBLAS). In
# container environments (Kubernetes) fork works correctly. On
# traditional HPC systems with strict fd-passing restrictions the
# original 'forkserver' may be needed, but the pipe truncation it
# produces under thread pressure is worse than the ancdata warning.
# On Linux some HPC environments restrict passing open file descriptors
# via Unix socket ancillary data (SCM_RIGHTS), which causes
# "received 0 items of ancdata" with the default 'fork' start method.
# 'forkserver' avoids fd-passing by having a dedicated server process
# that re-opens shared memory objects by name instead.
if (
num_workers > 0
and torch.multiprocessing.get_start_method(allow_none=True) != "spawn"
):
import platform

if platform.system() == "Linux":
kwargs["multiprocessing_context"] = "fork"
kwargs["multiprocessing_context"] = "forkserver"
return kwargs

def train_dataloader(self):
print("creating train dataloader for rank ", dist.get_rank() if dist.is_available() and dist.is_initialized() else "not distributed")
return DataLoader(
self.train_dataset_multi,
batch_size=self.batch_size,
Expand Down
73 changes: 71 additions & 2 deletions gridfm_graphkit/datasets/masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def forward(self, data):


class BusToGenBroadcaster(MessagePassing):
"""Broadcast per-bus values to connected generators via graph propagation."""
def __init__(self, aggr="add"):
super().__init__(aggr=aggr)

Expand All @@ -175,7 +174,6 @@ def message(self, x_j):


class SimulateMeasurements(BaseTransform):
"""Add configurable noise/outliers and masks to simulate measured quantities."""
def __init__(self, args):
super().__init__()
self.measurements = args.task.measurements
Expand Down Expand Up @@ -323,3 +321,74 @@ def forward(self, data):
}

return data

#######################
class AddVLDHeteroMask(BaseTransform):
"""
PF-like masking for VLD:
- keeps PF bus-type masks required by the physics decoder
- masks only physical reconstruction channels
- leaves appended topology/status metadata unmasked
"""

def __init__(self):
super().__init__()

def forward(self, data):
bus_x = data.x_dict["bus"]
gen_x = data.x_dict["gen"]

mask_PQ = bus_x[:, PQ_H] == 1
mask_PV = bus_x[:, PV_H] == 1
mask_REF = bus_x[:, REF_H] == 1

mask_bus = torch.zeros_like(bus_x, dtype=torch.bool)
mask_gen = torch.zeros_like(gen_x, dtype=torch.bool)

# Keep same physical masking pattern as PF
mask_bus[:, MIN_VM_H] = True
mask_bus[:, MAX_VM_H] = True
mask_bus[:, MIN_QG_H] = True
mask_bus[:, MAX_QG_H] = True
mask_bus[:, VN_KV] = True

mask_gen[:, MIN_PG] = True
mask_gen[:, MAX_PG] = True
mask_gen[:, C0_H] = True
mask_gen[:, C1_H] = True
mask_gen[:, C2_H] = True

mask_bus[mask_PQ, VM_H] = True
mask_bus[mask_PQ, VA_H] = True

mask_bus[mask_PV, VA_H] = True
mask_bus[mask_PV, QG_H] = True

mask_bus[mask_REF, VM_H] = True
mask_bus[mask_REF, QG_H] = True

gen_bus_edges = data.edge_index_dict[("gen", "connected_to", "bus")]
gen_indices, bus_indices = gen_bus_edges
ref_gens = gen_indices[mask_REF[bus_indices]]
mask_gen[ref_gens, PG_H] = True

mask_branch = torch.zeros_like(
data.edge_attr_dict[("bus", "connects", "bus")],
dtype=torch.bool,
)
mask_branch[:, P_E] = True
mask_branch[:, Q_E] = True
mask_branch[:, ANG_MIN] = True
mask_branch[:, ANG_MAX] = True
mask_branch[:, RATE_A] = True

data.mask_dict = {
"bus": mask_bus,
"gen": mask_gen,
"branch": mask_branch,
"PQ": mask_PQ,
"PV": mask_PV,
"REF": mask_REF,
}
return data
#######################
12 changes: 6 additions & 6 deletions gridfm_graphkit/datasets/normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ def transform(self, data: HeteroData):
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MIN] *= torch.pi / 180.0
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MAX] *= torch.pi / 180.0
data.edge_attr_dict[("bus", "connects", "bus")][:, RATE_A] /= self.baseMVA
data.baseMVA = torch.tensor(self.baseMVA, dtype=data.x_dict["bus"].dtype) # # needs to be float32 for MPS
data.is_normalized = torch.tensor(True, dtype=torch.bool) # needs to be bool for MPS
data.baseMVA = self.baseMVA
data.is_normalized = True

def inverse_transform(self, data: HeteroData):
if self.baseMVA is None or self.baseMVA == 0:
Expand Down Expand Up @@ -299,7 +299,7 @@ def inverse_transform(self, data: HeteroData):
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MAX] *= 180.0 / torch.pi

data.edge_attr_dict[("bus", "connects", "bus")][:, RATE_A] *= self.baseMVA
data.is_normalized = torch.tensor(False, dtype=torch.bool) # needs to be bool for MPS
data.is_normalized = False

def inverse_output(self, output, batch):
bus_output = output["bus"]
Expand Down Expand Up @@ -510,10 +510,10 @@ def transform(self, data: HeteroData):
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MIN] *= torch.pi / 180.0
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MAX] *= torch.pi / 180.0
data.edge_attr_dict[("bus", "connects", "bus")][:, RATE_A] /= e_b
data.is_normalized = torch.tensor(True, dtype=torch.bool) # needs to be bool for MPS
data.is_normalized = True

def inverse_transform(self, data: HeteroData):
"""Undo per-unit normalization (multiply by baseMVA, inverse log1p for cost coeffs)."""
"""Undo per-unit normalization (multiply by baseMVA, rad->deg, inverse log1p for cost coeffs)."""
if self._baseMVA_lookup is None:
raise ValueError("Normalizer not fitted or lookups not loaded")
if not data.is_normalized.all():
Expand Down Expand Up @@ -573,7 +573,7 @@ def inverse_transform(self, data: HeteroData):
data.edge_attr_dict[("bus", "connects", "bus")][:, ANG_MAX] *= 180.0 / torch.pi

data.edge_attr_dict[("bus", "connects", "bus")][:, RATE_A] *= e_b
data.is_normalized = torch.tensor(False, dtype=torch.bool) # needs to be bool for MPS
data.is_normalized = False

def inverse_output(self, output, batch):
"""
Expand Down
Loading