diff --git a/documentation/features/scheduling.rst b/documentation/features/scheduling.rst
index 4cc319c7fd..aacfa2db11 100644
--- a/documentation/features/scheduling.rst
+++ b/documentation/features/scheduling.rst
@@ -5,8 +5,8 @@ Scheduling
Scheduling is the main value-drive of FlexMeasures. We have two major types of schedulers built-in, for storage devices (usually batteries or hot water storage) and processes (usually in industry).
-FlexMeasures computes schedules for energy systems that consist of multiple devices that consume and/or produce electricity.
-We model a device as an asset with a power sensor, and compute schedules only for flexible devices, while taking into account inflexible devices.
+FlexMeasures computes schedules for energy systems that consist of multiple devices that consume and/or produce a commodity (e.g. electricity or gas).
+We model a device as an asset with a consumption/production sensor recording power values, and compute schedules only for flexible devices, while taking into account inflexible devices.
.. contents::
:local:
@@ -58,6 +58,9 @@ And if the asset belongs to a larger system (a hierarchy of assets), the schedul
* - Field
- Example value
- Description
+ * - ``commodity``
+ - |COMMODITY_FLEX_CONTEXT.example|
+ - .. include:: ../_autodoc/COMMODITY_FLEX_CONTEXT.rst
* - ``inflexible-device-sensors``
- |INFLEXIBLE_DEVICE_SENSORS.example|
- .. include:: ../_autodoc/INFLEXIBLE_DEVICE_SENSORS.rst
@@ -70,9 +73,6 @@ And if the asset belongs to a larger system (a hierarchy of assets), the schedul
* - ``production-price``
- |PRODUCTION_PRICE.example|
- .. include:: ../_autodoc/PRODUCTION_PRICE.rst
- * - ``gas-price``
- - |GAS_PRICE.example|
- - .. include:: ../_autodoc/GAS_PRICE.rst
* - ``site-power-capacity``
- |SITE_POWER_CAPACITY.example|
- .. include:: ../_autodoc/SITE_POWER_CAPACITY.rst
@@ -187,8 +187,8 @@ For more details on the possible formats for field values, see :ref:`variable_qu
- Example value
- Description
* - ``commodity``
- - |COMMODITY.example|
- - .. include:: ../_autodoc/COMMODITY.rst
+ - |COMMODITY_FLEX_MODEL.example|
+ - .. include:: ../_autodoc/COMMODITY_FLEX_MODEL.rst
* - ``consumption``
- |CONSUMPTION.example|
- .. include:: ../_autodoc/CONSUMPTION.rst
diff --git a/flexmeasures/data/models/planning/__init__.py b/flexmeasures/data/models/planning/__init__.py
index d7fbbb43b9..954f08960b 100644
--- a/flexmeasures/data/models/planning/__init__.py
+++ b/flexmeasures/data/models/planning/__init__.py
@@ -100,6 +100,70 @@ def _build_stock_groups(flex_model: list[dict]) -> dict:
return dict(groups)
+ @staticmethod
+ def _build_coupling_groups(
+ flex_model: list[dict],
+ ) -> dict[str, list[tuple[int, float]]]:
+ """Build coupling groups from the 'coupling' and 'coupling_coefficient' fields
+ of each device model.
+
+ Devices sharing the same coupling name form a coupling group.
+ The optimization model introduces a decision variable ``alpha`` per group per time
+ step, and constrains every device by ``P[d] == coeff_d * alpha``.
+
+ Coupling coefficients in flex-models are user-facing positive magnitudes.
+ The internal sign is inferred from directional capacities:
+
+ - ``consumption_capacity == 0`` -> output device -> internally negative coefficient
+ - ``production_capacity == 0`` -> input device -> internally positive coefficient
+
+ If neither direction is explicitly blocked, the coefficient stays positive.
+
+ Example — a CHP with 50% heat efficiency and 30% power efficiency:
+
+ [
+ {"coupling": "chp", "coupling_coefficient": 1.0}, # gas input (alpha = P_gas)
+ {"coupling": "chp", "coupling_coefficient": 0.5}, # heat output (50% of gas)
+ {"coupling": "chp", "coupling_coefficient": 0.3}, # power output (30% of gas)
+ ]
+
+ :param flex_model: List of deserialized device flex-model dicts.
+ :returns: Mapping from coupling-group name to a list of
+ ``(device_index, internal_signed_coefficient)`` tuples suitable for
+ passing to ``device_scheduler(coupling_groups=...)``. Returns an empty dict
+ when no device defines a ``coupling`` field.
+ """
+
+ def _is_zero_capacity(value: Any) -> bool:
+ """Return True if the capacity value is numerically zero."""
+
+ if value is None:
+ return False
+
+ # Pint quantities expose ``magnitude``.
+ magnitude = getattr(value, "magnitude", value)
+ try:
+ return bool(np.isclose(float(magnitude), 0.0))
+ except (TypeError, ValueError):
+ return False
+
+ groups: dict[str, list[tuple[int, float]]] = defaultdict(list)
+ for d, fm in enumerate(flex_model):
+ coupling_name = fm.get("coupling")
+ if coupling_name is None:
+ continue
+ coefficient = abs(float(fm.get("coupling_coefficient", 1.0)))
+
+ is_output = _is_zero_capacity(fm.get("consumption_capacity"))
+ is_input = _is_zero_capacity(fm.get("production_capacity"))
+
+ if is_output and not is_input:
+ coefficient = -coefficient
+
+ groups[coupling_name].append((d, coefficient))
+
+ return dict(groups)
+
def __init__(
self,
sensor: Sensor | None = None, # deprecated
diff --git a/flexmeasures/data/models/planning/linear_optimization.py b/flexmeasures/data/models/planning/linear_optimization.py
index 7a4e97405c..111cbb1576 100644
--- a/flexmeasures/data/models/planning/linear_optimization.py
+++ b/flexmeasures/data/models/planning/linear_optimization.py
@@ -42,6 +42,7 @@ def device_scheduler( # noqa C901
commitments: list[pd.DataFrame] | list[Commitment] | None = None,
initial_stock: float | list[float] = 0,
stock_groups: dict[int, list[int]] | None = None,
+ coupling_groups: dict[str, list[tuple[int, float]]] | None = None,
ems_constraint_groups: list[list[int]] | None = None,
) -> tuple[list[pd.Series], float, SolverResults, ConcreteModel]:
"""This generic device scheduler is able to handle an EMS with multiple devices,
@@ -80,6 +81,15 @@ def device_scheduler( # noqa C901
device: 0 (corresponds to device d; if not set, commitment is on an EMS level)
:param initial_stock: initial stock for each device. Use a list with the same number of devices as device_constraints,
or use a single value to set the initial stock to be the same for all devices.
+ :param coupling_groups: Hard flow-coupling constraints between devices. Each entry maps a group name to a list of
+ ``(device_index, coefficient)`` tuples. A decision variable ``alpha`` is introduced per group
+ per time step and every device ``d`` in the group is constrained by ``P[d, j] == coeff_d * alpha[group, j]``.
+ Sign convention: positive coefficient for input devices (consuming, positive ``ems_power``),
+ negative coefficient for output devices (producing, negative ``ems_power``).
+ Example — a CHP with gas input (d=0, coeff 1.0), heat output (d=1, coeff −0.5) and
+ power output (d=2, coeff −0.3)::
+
+ coupling_groups={"chp": [(0, 1.0), (1, -0.5), (2, -0.3)]}
Potentially deprecated arguments:
commitment_quantities: amounts of flow specified in commitments (both previously ordered and newly requested)
@@ -131,19 +141,36 @@ def device_scheduler( # noqa C901
# map device -> primary stock group (used for per-device stock bounds)
# and map stock group -> all member devices (used for stock accumulation).
device_to_group = {}
+ group_to_devices = {}
if stock_groups:
for g, devices in stock_groups.items():
+ group_to_devices[g] = list(devices)
for d in devices:
- device_to_group[d] = g
- # For devices not in any stock group (e.g., inflexible devices),
- # map them to themselves so they're treated as individual groups
+ # Keep first assignment as the primary group. A device can still
+ # participate in multiple groups via ``group_to_devices``.
+ if d not in device_to_group:
+ device_to_group[d] = g
+ # Devices not in any stock group are treated as single-device groups.
for d in range(len(device_constraints)):
if d not in device_to_group:
- device_to_group[d] = d
+ g = f"_device_{d}"
+ device_to_group[d] = g
+ group_to_devices[g] = [d]
else:
for d in range(len(device_constraints)):
- device_to_group[d] = d
+ g = f"_device_{d}"
+ device_to_group[d] = g
+ group_to_devices[g] = [d]
+
+ # Collect (group_index, device_index, coefficient) triples for coupling constraints.
+ # Each device in each group will be constrained: P[d, j] == coeff * alpha[group, j]
+ # where alpha is a free variable representing the common normalised flow.
+ coupling_device_specs: list[tuple[int, int, float]] = []
+ if coupling_groups:
+ for g_idx, (_group_name, members) in enumerate(coupling_groups.items()):
+ for d_idx, coeff in members:
+ coupling_device_specs.append((g_idx, d_idx, coeff))
# Move commitments from old structure to new
if commitments is None:
@@ -585,7 +612,7 @@ def _get_stock_change(m, d, j):
group = device_to_group[d]
# all devices belonging to this stock
- devices = [dev for dev, g in device_to_group.items() if g == group]
+ devices = group_to_devices[group]
# initial stock
if isinstance(initial_stock, list):
@@ -780,6 +807,29 @@ def device_derivative_equalities(m, d, j):
model.d, model.j, rule=device_derivative_equalities
)
+ if coupling_device_specs:
+ n_coupling_groups = len(coupling_groups)
+
+ # One free variable per group per time step: the common normalised flow.
+ model.coupling_group_range = RangeSet(0, n_coupling_groups - 1)
+ model.coupling_alpha = Var(model.coupling_group_range, model.j, domain=Reals)
+
+ model.coupling_device_range = RangeSet(0, len(coupling_device_specs) - 1)
+
+ def flow_coupling_rule(m, c, j):
+ """Enforce P[d, j] == coeff * alpha[group, j] for each coupled device.
+
+ This pins every device's flow to the same normalised level ``alpha``,
+ scaled by its coupling coefficient. The coefficient sign indicates direction:
+ positive for inputs (consuming), negative for outputs (producing).
+ """
+ g, d, coeff = coupling_device_specs[c]
+ return m.ems_power[d, j] == coeff * m.coupling_alpha[g, j]
+
+ model.flow_coupling_constraints = Constraint(
+ model.coupling_device_range, model.j, rule=flow_coupling_rule
+ )
+
# Add objective
def cost_function(m):
costs = 0
diff --git a/flexmeasures/data/models/planning/storage.py b/flexmeasures/data/models/planning/storage.py
index 6904562a8c..98a3e56f62 100644
--- a/flexmeasures/data/models/planning/storage.py
+++ b/flexmeasures/data/models/planning/storage.py
@@ -3,7 +3,8 @@
import re
import copy
from datetime import datetime, timedelta
-from typing import Type
+from itertools import chain
+from typing import Any, Type
import pandas as pd
import numpy as np
@@ -210,6 +211,10 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901
# This ensures the mapping aligns with the device indices
self.stock_groups = self._build_stock_groups(device_models)
+ # Build coupling_groups from the 'coupling' and 'coupling_coefficient' fields
+ # of each device model. Devices sharing the same coupling name form a group.
+ self.coupling_groups = self._build_coupling_groups(device_models)
+
# List the asset(s) and sensor(s) being scheduled
if self.asset is not None:
if not isinstance(self.flex_model, list):
@@ -286,7 +291,7 @@ def _prepare(self, skip_validation: bool = False) -> tuple: # noqa: C901
]
# Get info from flex-context
- inflexible_device_sensors = self.flex_context.get(
+ inflexible_device_sensors: list[Sensor] = self.flex_context.get(
"inflexible_device_sensors", []
)
@@ -327,22 +332,54 @@ def device_list_series(
) -> pd.Series:
return pd.Series([tuple(devices)] * len(index), index=index, name="device")
+ # Set up a mapping between commodities and a list of enumerated flexible and inflexible devices
+ # 1. The enumeration starts with all flexible devices in the order that they are given in the flex-model
+ # 2. The enumeration continues with the inflexible devices referenced in the top-level flex-context, in the order given
+ # 3. Then the enumeration goes through the commodities under the commodity_contexts field, in the order given,
+ # extending the enumeration with the inflexible devices referenced in these commodity contexts.
commodity_to_devices = {}
+ # Step 1: enumerate the flexible devices
for d, flex_model_d in enumerate(flex_model):
commodity = flex_model_d.get("commodity", "electricity")
commodity_to_devices.setdefault(commodity, []).append(d)
- # inflexible devices are electricity by default
- number_flexible_devices = len(flex_model)
- number_inflexible_devices = len(
- self.flex_context.get("inflexible_device_sensors", [])
- )
+ # Step 2: enumerate the top-level inflexible devices (electric for backwards compatibility)
num_flexible_devices = len(flex_model)
commodity_to_devices["electricity"] += list(
range(
- number_flexible_devices,
- number_flexible_devices + number_inflexible_devices,
+ num_flexible_devices,
+ num_flexible_devices + len(inflexible_device_sensors),
+ )
+ )
+
+ # Step 3: enumerate the inflexible devices per commodity
+ def list_to_map(listing: list, key: Any) -> dict:
+ """Note: the key is retained in the map values."""
+ return {l[key]: l for l in listing}
+
+ # Move inflexible-device-sensors per commodity into the device listing for the commodity
+ commodity_mapping: dict[str, dict] = list_to_map(
+ self.flex_context.get("commodity_contexts", []), key="commodity"
+ )
+ inflexible_devices_per_commodity = {
+ com: con.get("inflexible_device_sensors", [])
+ for com, con in commodity_mapping.items()
+ }
+ num_devices = num_flexible_devices + len(inflexible_device_sensors)
+ for (
+ commodity,
+ commodity_inflexible_device_sensors,
+ ) in inflexible_devices_per_commodity.items():
+ commodity_to_devices[commodity] += list(
+ range(
+ num_devices,
+ num_devices + len(commodity_inflexible_device_sensors),
+ )
)
+ num_devices = num_devices + len(commodity_inflexible_device_sensors)
+
+ inflexible_device_sensors = inflexible_device_sensors + list(
+ chain.from_iterable(inflexible_devices_per_commodity.values())
)
commodity_contexts = self._get_commodity_contexts()
@@ -365,10 +402,12 @@ def device_list_series(
if production_price is None:
production_price = consumption_price
- if consumption_price is None:
- raise ValueError(
- f"Missing consumption price for commodity '{commodity}'."
- )
+ # todo: log info statement if commodity has no associated prices
+ # todo: raise if none of the commodities (or maybe electricity specifically) has prices
+ # if consumption_price is None:
+ # raise ValueError(
+ # f"Missing consumption price for commodity '{commodity}'."
+ # )
# Energy prices for this commodity.
up_deviation_prices = get_continuous_series_sensor_or_quantity(
@@ -379,7 +418,8 @@ def device_list_series(
beliefs_before=belief_time,
fill_sides=True,
).to_frame(name="event_value")
- ensure_prices_are_not_empty(up_deviation_prices, consumption_price)
+ # todo: see above todo
+ # ensure_prices_are_not_empty(up_deviation_prices, consumption_price)
down_deviation_prices = get_continuous_series_sensor_or_quantity(
variable_quantity=production_price,
@@ -389,7 +429,8 @@ def device_list_series(
beliefs_before=belief_time,
fill_sides=True,
).to_frame(name="event_value")
- ensure_prices_are_not_empty(down_deviation_prices, production_price)
+ # todo: see above todo
+ # ensure_prices_are_not_empty(down_deviation_prices, production_price)
price_frames_by_commodity[commodity] = up_deviation_prices
@@ -2118,10 +2159,11 @@ def compute(self, skip_validation: bool = False) -> SchedulerOutputType:
ems_schedule, expected_costs, scheduler_results, model = device_scheduler(
device_constraints=device_constraints,
ems_constraints=ems_constraints,
- ems_constraint_groups=self.ems_constraint_groups,
commitments=commitments,
initial_stock=initial_stock,
stock_groups=self.stock_groups,
+ coupling_groups=self.coupling_groups if self.coupling_groups else None,
+ ems_constraint_groups=self.ems_constraint_groups,
)
if "infeasible" in (tc := scheduler_results.solver.termination_condition):
raise InfeasibleProblemException(tc)
diff --git a/flexmeasures/data/models/planning/tests/test_commitments.py b/flexmeasures/data/models/planning/tests/test_commitments.py
index 49528382d4..72c307b669 100644
--- a/flexmeasures/data/models/planning/tests/test_commitments.py
+++ b/flexmeasures/data/models/planning/tests/test_commitments.py
@@ -1552,3 +1552,664 @@ def test_simulation_with_dynamic_consumption_capacity(app, db):
assert heater_schedule.loc["2026-04-07T08:00:00+00:00"] == pytest.approx(
80.0
), "Electric heater should have one expected partial 80 kW dispatch step before the first cheap-electricity window."
+
+
+def test_chp_coupling():
+ """Test that coupling_groups enforces fixed flow ratios between CHP devices.
+
+ Models a Combined Heat and Power unit with three pure flow devices:
+
+ - d=0 gas input: can only consume gas (derivative_min=0)
+ - d=1 heat output: can only produce heat (derivative_max=0)
+ - d=2 power output: can only produce electricity (derivative_max=0)
+
+ The coupling group ``"chp"`` is specified with coefficients
+ ``[(0, 1.0), (1, -0.5), (2, -0.3)]``, introducing a decision variable ``alpha``
+ and enforcing ``P[d] == coeff * alpha`` for each device:
+
+ P_gas = 1.0 * alpha (input, coeff = 1.0)
+ P_heat = -0.5 * alpha (output, coeff = -0.5, heat efficiency 50%)
+ P_power = -0.3 * alpha (output, coeff = -0.3, power efficiency 30%)
+
+ Heat production is forced to exactly 10 kW via ``derivative equals = -10``
+ on device 1. Substituting ``P_heat = -10`` gives ``alpha = 20``, so:
+
+ P_gas = 20 kW (gas consumed)
+ P_heat = -10 kW (heat produced, forced)
+ P_power = 20 kW * -0.3
+ ≈ -6 kW (electricity produced)
+
+ """
+ start = pd.Timestamp("2026-01-01T00:00+01:00")
+ end = pd.Timestamp("2026-01-01T04:00+01:00")
+ resolution = pd.Timedelta("1h")
+ index = initialize_index(start=start, end=end, resolution=resolution)
+
+ # d=0: gas input — can only consume (derivative_min=0), capacity 100 kW.
+ # NaN stock bounds mean no cumulative-stock constraint (pure flow device).
+ gas_constraints = pd.DataFrame(
+ {
+ "min": np.nan,
+ "max": np.nan,
+ "equals": np.nan,
+ "derivative min": 0.0,
+ "derivative max": 100.0,
+ "derivative equals": np.nan,
+ "derivative down efficiency": 1.0,
+ "derivative up efficiency": 1.0,
+ },
+ index=index,
+ )
+
+ # d=1: heat output — can only produce (derivative_max=0).
+ # Forced to exactly -10 kW via derivative equals.
+ heat_constraints = pd.DataFrame(
+ {
+ "min": np.nan,
+ "max": np.nan,
+ "equals": np.nan,
+ "derivative min": -100.0,
+ "derivative max": 0.0,
+ "derivative equals": -10.0,
+ "derivative down efficiency": 1.0,
+ "derivative up efficiency": 1.0,
+ },
+ index=index,
+ )
+
+ # d=2: power output — can only produce (derivative_max=0), capacity 100 kW.
+ # Flow is free; the coupling constraint will determine its value.
+ power_constraints = pd.DataFrame(
+ {
+ "min": np.nan,
+ "max": np.nan,
+ "equals": np.nan,
+ "derivative min": -100.0,
+ "derivative max": 0.0,
+ "derivative equals": np.nan,
+ "derivative down efficiency": 1.0,
+ "derivative up efficiency": 1.0,
+ },
+ index=index,
+ )
+
+ ems_constraints = pd.DataFrame(
+ {"derivative min": -200.0, "derivative max": 200.0},
+ index=index,
+ )
+
+ # Coupling group: one reference device (gas, coeff 1.0) and two coupled
+ # devices (heat with coeff -0.5, power with coeff -0.3).
+ coupling_groups = {"chp": [(0, 1.0), (1, -0.5), (2, -0.3)]}
+
+ # Gas-price commitment gives the objective a finite value and models the
+ # cost of consuming gas. With quantity=0 and both prices set the
+ # commitment acts as a two-sided soft equality: any upward deviation
+ # (gas consumption) incurs a cost of 1 EUR/kW.
+ gas_price_commitment = FlowCommitment(
+ name="gas cost",
+ index=index,
+ quantity=pd.Series(0.0, index=index),
+ upwards_deviation_price=pd.Series(1.0, index=index),
+ downwards_deviation_price=pd.Series(0.0, index=index),
+ device=pd.Series(0, index=index),
+ )
+
+ schedules, planned_costs, results, model = device_scheduler(
+ device_constraints=[gas_constraints, heat_constraints, power_constraints],
+ ems_constraints=ems_constraints,
+ commitments=[gas_price_commitment],
+ coupling_groups=coupling_groups,
+ )
+
+ assert (
+ results.solver.termination_condition == "optimal"
+ ), "Solver did not find an optimal solution."
+
+ # Heat is fixed to -10 kW by derivative_equals.
+ pd.testing.assert_series_equal(
+ schedules[1],
+ pd.Series(-10.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="heat output forced to -10 kW by derivative_equals",
+ )
+
+ # Coupling: P_gas / 1.0 == P_heat / -0.5 → P_gas = -10 / -0.5 = 20 kW
+ pd.testing.assert_series_equal(
+ schedules[0],
+ pd.Series(20.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="gas consumption determined by coupling (20 kW from 10 kW heat at coeff -0.5)",
+ )
+
+ # Coupling: P_gas / 1.0 == P_power / -0.3 → P_power = 20 / -0.3 = -6 kW
+ pd.testing.assert_series_equal(
+ schedules[2],
+ pd.Series(-6.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="power output determined by coupling (-0.3 * alpha = -0.3 * 20 = -6 kW)",
+ )
+
+
+def test_dual_fuel_chp_coupling():
+ """Test coupling_groups with two input devices (dual-fuel CHP).
+
+ Models a CHP unit that consumes equal parts natural gas and hydrogen,
+ producing heat and electricity:
+
+ - d=0 gas input: can only consume gas (derivative_min=0)
+ - d=1 hydrogen input: can only consume hydrogen (derivative_min=0)
+ - d=2 heat output: can only produce heat (derivative_max=0)
+ - d=3 power output: can only produce electricity (derivative_max=0)
+
+ Coupling group ``"chp"`` with coefficients
+ ``[(0, 0.5), (1, 0.5), (2, -0.5), (3, -0.3)]`` introduces a free variable
+ ``alpha`` and enforces ``P[d] == coeff * alpha``:
+
+ P_gas = 0.5 * alpha (50% of total fuel from gas)
+ P_hydrogen = 0.5 * alpha (50% of total fuel from hydrogen)
+ P_heat = -0.5 * alpha (heat efficiency 50% of total fuel)
+ P_power = -0.3 * alpha (power efficiency 30% of total fuel)
+
+ Because gas and hydrogen share the same coefficient the two fuel flows are
+ always equal, confirming that device order does not affect the result.
+
+ Heat production is forced to exactly 10 kW via ``derivative equals = -10`` on device 2.
+ Substituting ``P_heat = -10`` gives ``alpha = 20``, so:
+
+ P_gas = 10 kW (equal gas input)
+ P_hydrogen = 10 kW (equal hydrogen input)
+ P_heat = -10 kW (heat produced, forced)
+ P_power = -6 kW (electricity produced)
+ """
+ start = pd.Timestamp("2026-01-01T00:00+01:00")
+ end = pd.Timestamp("2026-01-01T04:00+01:00")
+ resolution = pd.Timedelta("1h")
+ index = initialize_index(start=start, end=end, resolution=resolution)
+
+ def _flow_df(**kwargs) -> pd.DataFrame:
+ defaults = {
+ "min": np.nan,
+ "max": np.nan,
+ "equals": np.nan,
+ "derivative min": 0.0,
+ "derivative max": 0.0,
+ "derivative equals": np.nan,
+ "derivative down efficiency": 1.0,
+ "derivative up efficiency": 1.0,
+ }
+ defaults.update(kwargs)
+ return pd.DataFrame(defaults, index=index)
+
+ # d=0: gas input — can only consume, capacity 100 kW
+ gas_constraints = _flow_df(**{"derivative max": 100.0})
+ # d=1: hydrogen input — can only consume, capacity 100 kW
+ hydrogen_constraints = _flow_df(**{"derivative max": 100.0})
+ # d=2: heat output — can only produce, forced to -10 kW
+ heat_constraints = _flow_df(
+ **{"derivative min": -100.0, "derivative equals": -10.0}
+ )
+ # d=3: power output — can only produce, free (coupling determines value)
+ power_constraints = _flow_df(**{"derivative min": -100.0})
+
+ ems_constraints = pd.DataFrame(
+ {"derivative min": -200.0, "derivative max": 200.0},
+ index=index,
+ )
+
+ # Both fuel inputs share coefficient 0.5, so they receive identical flows.
+ # Outputs have negative coefficients equal to their efficiency fractions.
+ coupling_groups = {"chp": [(0, 0.5), (1, 0.5), (2, -0.5), (3, -0.3)]}
+
+ # Gas-price commitment for device 0 just to give the objective a finite value
+ # Even though hydrogen is free, it will still be used because its consumption is coupled to gas.
+ fuel_cost_commitment = FlowCommitment(
+ name="fuel cost",
+ index=index,
+ quantity=pd.Series(0.0, index=index),
+ upwards_deviation_price=pd.Series(1.0, index=index),
+ downwards_deviation_price=pd.Series(0.0, index=index),
+ device=pd.Series(0, index=index),
+ )
+
+ schedules, _costs, results, _model = device_scheduler(
+ device_constraints=[
+ gas_constraints,
+ hydrogen_constraints,
+ heat_constraints,
+ power_constraints,
+ ],
+ ems_constraints=ems_constraints,
+ commitments=[fuel_cost_commitment],
+ coupling_groups=coupling_groups,
+ )
+
+ assert (
+ results.solver.termination_condition == "optimal"
+ ), "Solver did not find an optimal solution."
+
+ # Heat is fixed to -10 kW; alpha = -10 / -0.5 = 20.
+ pd.testing.assert_series_equal(
+ schedules[2],
+ pd.Series(-10.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="heat output forced to -10 kW by derivative_equals",
+ )
+
+ # Coupling: P_gas = 0.5 * alpha = 0.5 * 20 = 10 kW
+ pd.testing.assert_series_equal(
+ schedules[0],
+ pd.Series(10.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="gas input = 0.5 * alpha = 10 kW",
+ )
+
+ # Coupling: P_hydrogen = 0.5 * alpha = 10 kW (equal to gas)
+ pd.testing.assert_series_equal(
+ schedules[1],
+ pd.Series(10.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="hydrogen input = 0.5 * alpha = 10 kW (equal to gas input)",
+ )
+
+ # Coupling: P_power = -0.3 * alpha = -0.3 * 20 = -6 kW
+ pd.testing.assert_series_equal(
+ schedules[3],
+ pd.Series(-6.0, index=index),
+ check_names=False,
+ rtol=1e-4,
+ obj="power output = -0.3 * alpha = -6 kW",
+ )
+
+
+def _run_factory_scenario(
+ gas_price: float,
+ elec_price: float,
+) -> tuple:
+ """Run the simplified factory scenario and return the 7 device schedules.
+
+ Devices
+ ~~~~~~~
+ d=0 e-heater electricity → heat coupling (ems_power ≥ 0, i.e. consumes electricity)
+ d=1 gas boiler gas → heat coupling (ems_power ≥ 0, i.e. consumes gas)
+ d=2 steamer heat coupling → steam (ems_power ≤ 0, i.e. produces steam)
+ d=3 CHP gas input gas → chp coupling (ems_power ≥ 0, i.e. consumes gas, coupling member = alpha)
+ d=4 CHP heat out chp coupling → steam (ems_power ≤ 0, i.e. produces steam, coupling member = -0.5 alpha)
+ d=5 CHP power out chp coupling → electricity (ems_power ≤ 0, i.e. produces electricity, coupling member = -0.3 alpha)
+ d=6 steam demand steam → fixed flow (ems_power = 15, i.e. consumes steam)
+
+ CHP coupling coefficients
+ ~~~~~~~~~~~~~~~~~~~~~~~~~
+ The coupling constraint introduces a free variable ``alpha`` (the normalised gas flow)
+ and enforces ``P[d_i] == coeff_i * alpha`` for every device in the group.
+ Choosing thermal efficiency η_heat = 0.5 and power efficiency η_power = 0.3,
+ the coefficients simply become the signed efficiency fractions::
+
+ P_gas = 1.0 * alpha (input, coeff = 1.0)
+ P_heat = -0.5 * alpha (output, coeff = η_heat = -0.5)
+ P_power = -0.3 * alpha (output, coeff = η_power = −0.3)
+
+ """
+ ETA_HEAT = 0.5 # fraction of CHP gas input that becomes heat
+ ETA_POWER = 0.3 # fraction of CHP gas input that becomes power
+ STEAM_DEMAND = 15.0 # kW, constant heat drain representing steam production
+ CHP_GAS_MAX = 20.0 # kW, maximum gas input to CHP
+ BOILER_GAS_MAX = 10.0 # kW, maximum gas input to gas boiler
+ HEATER_POWER_MAX = 100.0 # kW, maximum electricity input to e-heater
+
+ start = pd.Timestamp("2026-01-01T00:00+01:00")
+ end = pd.Timestamp("2026-01-01T04:00+01:00")
+ resolution = pd.Timedelta("1h")
+ index = initialize_index(start=start, end=end, resolution=resolution)
+
+ def _df(**kwargs) -> pd.DataFrame:
+ """Build a device-constraints DataFrame with defaults for unused columns."""
+ defaults = {
+ "min": np.nan,
+ "max": np.nan,
+ "equals": np.nan,
+ "derivative min": 0.0,
+ "derivative max": 0.0,
+ "derivative equals": np.nan,
+ "derivative down efficiency": 1.0,
+ "derivative up efficiency": 1.0,
+ "stock delta": 0.0,
+ }
+ defaults.update(kwargs)
+ return pd.DataFrame(defaults, index=index)
+
+ device_constraints = [
+ # d=0 e-heater: heat-node reference device. The min=max=0 forces the heat
+ # node to balance at every step (zero-capacity flow node), making
+ # the per-step dispatch deterministic despite flat prices.
+ _df(min=0.0, max=0.0, **{"derivative max": HEATER_POWER_MAX}),
+ # d=1 gas boiler: up to 100 kW gas → 100 kW heat (efficiency 1 for clean maths in test)
+ _df(**{"derivative max": BOILER_GAS_MAX, "commodity": "gas"}),
+ # d=2 steamer: can only produce steam (negative ems_power).
+ # The lower bound is finite to avoid unbounded model messages while still
+ # being looser than the upstream heat-supply limits.
+ _df(
+ **{
+ "derivative min": -(HEATER_POWER_MAX + BOILER_GAS_MAX),
+ "derivative max": 0.0,
+ "commodity": "steam",
+ }
+ ),
+ # d=3 CHP gas input: up to CHP_GAS_MAX kW gas
+ _df(**{"derivative max": CHP_GAS_MAX, "commodity": "gas"}),
+ # d=4 CHP heat output: positive ems_power adds heat to the steam node.
+ # The min=max=0 forces the steam node to balance at every step.
+ _df(
+ min=0.0,
+ max=0.0,
+ **{
+ "derivative min": -CHP_GAS_MAX * ETA_HEAT,
+ "derivative max": 0.0,
+ "commodity": "steam",
+ },
+ ),
+ # d=5 CHP power output: negative ems_power only (production)
+ _df(**{"derivative min": -CHP_GAS_MAX * ETA_POWER, "derivative max": 0.0}),
+ # d=6 steam demand: fixed steam consumption at STEAM_DEMAND kW.
+ _df(
+ **{
+ "derivative min": STEAM_DEMAND,
+ "derivative max": STEAM_DEMAND,
+ "commodity": "steam",
+ }
+ ),
+ ]
+
+ ems_constraints = pd.DataFrame(
+ {"derivative min": -300.0, "derivative max": 300.0},
+ index=index,
+ )
+
+ # stock group: all heat-buffer devices share the same stock
+ # (key 0 is an arbitrary group id, not a device index)
+ heat_group_id = 0
+ steam_group_id = 1
+ stock_groups = {heat_group_id: [0, 1, 2], steam_group_id: [2, 4, 6]}
+
+ # CHP coupling: coefficients are signed efficiency fractions.
+ # coeff_heat = -η_heat = -0.5 → P_heat = -0.5 * alpha = -0.5 * P_gas
+ # coeff_power = -η_power = -0.3 → P_power = -0.3 * alpha = -0.3 * P_gas
+ coupling_groups = {
+ "chp": [
+ (3, 1.0),
+ (4, -ETA_HEAT), # = -0.5
+ (5, -ETA_POWER), # = -0.3
+ ]
+ }
+
+ # --- energy-price commitments -------------------------------------------
+ # Gas price applies to gas boiler (d=1) and CHP gas input (d=3).
+ # Electricity price applies to e-heater (d=0) and CHP power output (d=5).
+ # Using both upwards and downwards prices makes each commitment a two-sided
+ # soft equality (quantity = 0):
+ # • upward deviation = consuming more than 0 → positive cost
+ # • downward deviation = producing (negative flow) → negative cost (revenue)
+ gas_p = pd.Series(gas_price, index=index)
+ elec_p = pd.Series(elec_price, index=index)
+
+ commitments = []
+ for d, price in [(1, gas_p), (3, gas_p), (0, elec_p), (5, elec_p)]:
+ commitments.append(
+ FlowCommitment(
+ name="gas cost" if d in (1, 3) else "electricity cost",
+ index=index,
+ quantity=pd.Series(0.0, index=index),
+ upwards_deviation_price=price,
+ downwards_deviation_price=price,
+ device=pd.Series(d, index=index),
+ )
+ )
+
+ schedules, _costs, results, _model = device_scheduler(
+ device_constraints=device_constraints,
+ ems_constraints=ems_constraints,
+ commitments=commitments,
+ stock_groups=stock_groups,
+ coupling_groups=coupling_groups,
+ )
+
+ assert results.solver.termination_condition == "optimal", (
+ f"Solver did not find an optimal solution "
+ f"(gas_price={gas_price}, elec_price={elec_price})"
+ )
+ return tuple(schedules)
+
+
+def test_factory_chp_dispatch():
+ """Factory: CHP + gas boiler + e-heater competing to meet a fixed steam demand.
+
+ The shared heat buffer (modelled via ``stock_groups``) is drained at a
+ constant rate of 15 kW by the steam demand device. Two price scenarios
+ verify that the optimizer correctly chooses the cheapest heat source.
+
+ Scenario A — gas cheaper than electricity
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Prices: gas = 20 EUR/kW, electricity = 50 EUR/kW.
+
+ Effective cost per kW of heat delivered:
+ - CHP: gas_cost − power_revenue = (20·20 − 50·6) / 10 = 10 EUR/kW
+ - gas boiler: 20 EUR/kW (efficiency = 1)
+ - e-heater: 50 EUR/kW (efficiency = 1)
+
+ Merit order: CHP ≪ gas boiler ≪ e-heater.
+
+ With CHP at maximum (20 kW gas → 10 kW heat + 6 kW power):
+ - remaining heat demand = 15 − 10 = 5 kW → gas boiler
+ - e-heater not needed
+
+ Scenario B — electricity cheaper than gas
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Prices: gas = 100 EUR/kW, electricity = 10 EUR/kW.
+
+ Effective cost per kW of heat:
+ - CHP: (100·20 − 10·6) / 10 = 194 EUR/kW
+ - gas boiler: 100 EUR/kW
+ - e-heater: 10 EUR/kW
+
+ Merit order: e-heater ≪ gas boiler ≪ CHP.
+
+ All 15 kW steam demand is met by the e-heater; CHP and gas boiler are off.
+
+ Scenario C — gas slightly cheaper
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ Prices: gas = 50 EUR/kW, electricity = 55 EUR/kW.
+
+ Effective cost per kW of heat delivered:
+ - CHP: gas_cost − power_revenue = (50·20 − 55·6) / 10 = 67 EUR/kW
+ - gas boiler: 50 EUR/kW
+ - e-heater: 55 EUR/kW
+
+ Merit order: gas boiler ≪ e-heater ≪ CHP.
+
+ With gas boiler at maximum (10 kW gas → 10 kW heat):
+ - remaining heat demand = 15 − 10 = 5 kW → e-heater
+ - CHP not needed
+ """
+ # ------------------------------------------------------------------ #
+ # Scenario A: gas cheaper — CHP at max, gas boiler fills the rest #
+ # ------------------------------------------------------------------ #
+ (e_heater, gas_boiler, steamer, chp_gas, chp_heat, chp_power, demand) = (
+ _run_factory_scenario(gas_price=20.0, elec_price=50.0)
+ )
+
+ expected_chp_gas = pd.Series(20.0, index=e_heater.index)
+ expected_chp_heat = pd.Series(-10.0, index=e_heater.index) # -0.5 * 20
+ expected_chp_power = pd.Series(-6.0, index=e_heater.index) # -0.3 * 20
+ expected_boiler = pd.Series(5.0, index=e_heater.index) # fills 15-10 kW gap
+ expected_steamer = pd.Series(-5.0, index=e_heater.index)
+ expected_demand = pd.Series(15.0, index=e_heater.index)
+ expected_eheater = pd.Series(0.0, index=e_heater.index)
+
+ pd.testing.assert_series_equal(
+ chp_gas,
+ expected_chp_gas,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: CHP gas input at maximum (20 kW)",
+ )
+ pd.testing.assert_series_equal(
+ chp_heat,
+ expected_chp_heat,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: CHP heat output = 0.5 × gas input (10 kW)",
+ )
+ pd.testing.assert_series_equal(
+ chp_power,
+ expected_chp_power,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: CHP power output = −0.3 × gas input (−6 kW)",
+ )
+ pd.testing.assert_series_equal(
+ gas_boiler,
+ expected_boiler,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: gas boiler fills remaining 5 kW heat demand",
+ )
+ pd.testing.assert_series_equal(
+ steamer,
+ expected_steamer,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: steamer supplies remaining 5 kW steam",
+ )
+ pd.testing.assert_series_equal(
+ demand,
+ expected_demand,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario A: steam demand fixed at 15 kW",
+ )
+ pd.testing.assert_series_equal(
+ e_heater,
+ expected_eheater,
+ check_names=False,
+ atol=1e-4,
+ obj="Scenario A: e-heater not used (gas is cheapest)",
+ )
+
+ # ------------------------------------------------------------------ #
+ # Scenario B: electricity cheaper — e-heater meets all demand #
+ # ------------------------------------------------------------------ #
+ (e_heater, gas_boiler, steamer, chp_gas, chp_heat, chp_power, demand) = (
+ _run_factory_scenario(gas_price=100.0, elec_price=10.0)
+ )
+
+ expected_eheater_b = pd.Series(15.0, index=e_heater.index)
+ expected_zero = pd.Series(0.0, index=e_heater.index)
+ expected_steamer_b = pd.Series(-15.0, index=e_heater.index)
+ expected_demand_b = pd.Series(15.0, index=e_heater.index)
+
+ pd.testing.assert_series_equal(
+ e_heater,
+ expected_eheater_b,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario B: e-heater meets all 15 kW steam demand",
+ )
+ pd.testing.assert_series_equal(
+ chp_gas,
+ expected_zero,
+ check_names=False,
+ atol=1e-4,
+ obj="Scenario B: CHP not used (electricity is cheapest)",
+ )
+ pd.testing.assert_series_equal(
+ gas_boiler,
+ expected_zero,
+ check_names=False,
+ atol=1e-4,
+ obj="Scenario B: gas boiler not used (electricity is cheapest)",
+ )
+ pd.testing.assert_series_equal(
+ steamer,
+ expected_steamer_b,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario B: steamer supplies all 15 kW steam",
+ )
+ pd.testing.assert_series_equal(
+ demand,
+ expected_demand_b,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario B: steam demand fixed at 15 kW",
+ )
+
+ # --------------------------------------------------------------------------------- #
+ # Scenario C: gas slightly cheaper — gas boiler at max, e-heater fills the rest #
+ # --------------------------------------------------------------------------------- #
+ (e_heater, gas_boiler, steamer, chp_gas, chp_heat, chp_power, demand) = (
+ _run_factory_scenario(gas_price=50.0, elec_price=55.0)
+ )
+
+ expected_chp_gas = pd.Series(0.0, index=e_heater.index)
+ expected_chp_heat = pd.Series(0.0, index=e_heater.index)
+ expected_chp_power = pd.Series(0.0, index=e_heater.index)
+ expected_boiler = pd.Series(10.0, index=e_heater.index)
+ expected_steamer = pd.Series(-15.0, index=e_heater.index)
+ expected_demand = pd.Series(15.0, index=e_heater.index)
+ expected_eheater = pd.Series(5.0, index=e_heater.index) # fills 15-10 kW gap
+
+ pd.testing.assert_series_equal(
+ chp_gas,
+ expected_chp_gas,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: CHP not used",
+ )
+ pd.testing.assert_series_equal(
+ chp_heat,
+ expected_chp_heat,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: CHP not used",
+ )
+ pd.testing.assert_series_equal(
+ chp_power,
+ expected_chp_power,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: CHP not used",
+ )
+ pd.testing.assert_series_equal(
+ gas_boiler,
+ expected_boiler,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: gas boiler at maximum (10 kW)",
+ )
+ pd.testing.assert_series_equal(
+ steamer,
+ expected_steamer,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: steamer supplies all 15 kW steam",
+ )
+ pd.testing.assert_series_equal(
+ demand,
+ expected_demand,
+ check_names=False,
+ rtol=1e-4,
+ obj="Scenario C: steam demand fixed at 15 kW",
+ )
+ pd.testing.assert_series_equal(
+ e_heater,
+ expected_eheater,
+ check_names=False,
+ atol=1e-4,
+ obj="Scenario C: e-heater fills remaining 5 kW heat demand",
+ )
diff --git a/flexmeasures/data/models/planning/tests/test_storage.py b/flexmeasures/data/models/planning/tests/test_storage.py
index d04cea1a3a..55154dd1ff 100644
--- a/flexmeasures/data/models/planning/tests/test_storage.py
+++ b/flexmeasures/data/models/planning/tests/test_storage.py
@@ -6,6 +6,7 @@
import numpy as np
import pandas as pd
+from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType
from flexmeasures.data.models.planning import Scheduler
from flexmeasures.data.models.planning.storage import StorageScheduler
from flexmeasures.data.models.planning.utils import initialize_index
@@ -15,6 +16,7 @@
get_sensors_from_db,
series_to_ts_specs,
)
+from flexmeasures.data.services.utils import get_or_create_model
def test_battery_solver_multi_commitment(add_battery_assets, db):
@@ -583,3 +585,187 @@ def test_resolve_soc_at_start_from_percent_sensor_uses_device_sensor_fallback(
)
== 2.5
)
+
+
+def test_storage_scheduler_chp_coupling(app, db):
+ """Test that the StorageScheduler enforces CHP coupling constraints between devices.
+
+ Models a Combined Heat and Power unit with three sensors.
+
+ In the flex-model, the coupling coefficients are entered as positive magnitudes::
+
+ gas input -> 1.0
+ heat output -> 0.5
+ power output -> 0.3
+
+ Internally, the CHP is interpreted with the signed commodity-flow coefficients::
+
+ P_gas -> 1.0
+ P_heat -> -0.5
+ P_power -> -0.3
+
+ The returned storage schedule for the heat buffer is still positive, because this
+ test uses the storage sign convention for buffer charging.
+
+ - d=0 gas input: CHP gas consumption
+ - d=1 heat output: CHP heat -> heat buffer
+ - d=2 power output: CHP electricity production
+
+ The heat output is forced to exactly 5 kW per step by combining:
+ - ``production-capacity: "0 kW"`` (hard lower bound: derivative_min = 0)
+ - ``consumption-capacity: "5 kW"`` (hard upper bound: derivative_max = 0.005 MW)
+ - ``soc-targets`` requiring 20 kWh at the end of the 4-hour window
+
+ With soc_at_start = 0 and max 5 kW over 4 × 1-hour steps the only feasible
+ solution is P_heat = 5 kW every step. Substituting P_heat = 5 kW gives
+ alpha = 5 / 0.5 = 10 kW, so:
+
+ P_gas = 1.0 × 10 kW = 10 kW
+ P_power = −0.3 × 10 kW = −3 kW
+ """
+ # ---- asset type + asset
+ chp_type = get_or_create_model(GenericAssetType, name="chp-plant")
+ chp = GenericAsset(name="CHP plant (coupling test)", generic_asset_type=chp_type)
+ db.session.add(chp)
+ db.session.flush()
+
+ # ---- schedule window
+ start = pd.Timestamp("2026-01-01T00:00:00+01:00")
+ end = pd.Timestamp("2026-01-01T04:00:00+01:00")
+ resolution = timedelta(hours=1)
+
+ # CHP efficiencies (same values as the factory scenario in test_commitments.py)
+ ETA_HEAT = 0.5 # fraction of gas input that becomes heat
+ ETA_POWER = 0.3 # fraction of gas input that becomes electricity
+
+ # ---- sensors
+ gas_input_sensor = Sensor(
+ name="CHP gas input (coupling test)",
+ generic_asset=chp,
+ unit="MW",
+ event_resolution=resolution,
+ )
+ heat_output_sensor = Sensor(
+ name="CHP heat output (coupling test)",
+ generic_asset=chp,
+ unit="MW",
+ event_resolution=resolution,
+ )
+ power_output_sensor = Sensor(
+ name="CHP power output (coupling test)",
+ generic_asset=chp,
+ unit="MW",
+ event_resolution=resolution,
+ )
+ db.session.add_all([gas_input_sensor, heat_output_sensor, power_output_sensor])
+ db.session.flush()
+
+ # ---- flex model
+ # Flex-model coupling-coefficients are user-facing positive magnitudes.
+ # The intended internal CHP coefficients are +1.0 for gas, -0.5 for heat,
+ # and -0.3 for power.
+ flex_model = [
+ {
+ # d=0: gas input — pure flow device (no SoC), can only consume gas.
+ "sensor": gas_input_sensor.id,
+ "power-capacity": "20 kW",
+ "production-capacity": "0 kW", # derivative_min = 0
+ "coupling": "chp",
+ "coupling-coefficient": 1.0,
+ },
+ {
+ # d=1: heat output — tracks heat-buffer SoC, positive ems_power = heat
+ # added to buffer. The SoC target forces P_heat = 5 kW per step.
+ "sensor": heat_output_sensor.id,
+ "soc-at-start": "0 MWh",
+ "soc-min": "0 MWh",
+ "soc-max": "0.02 MWh", # 20 kWh — matches the SoC target
+ "soc-targets": [
+ {
+ # Single target at the schedule end: cumulative heat = 20 kWh.
+ # With max 5 kW and 4 × 1 h steps the only feasible solution
+ # is 5 kW every step.
+ "start": "2026-01-01T04:00:00+01:00",
+ "duration": "PT1H",
+ "value": "0.02 MWh",
+ }
+ ],
+ "power-capacity": "5 kW",
+ "consumption-capacity": "5 kW",
+ "production-capacity": "0 kW", # can only add heat, not extract
+ "prefer-charging-sooner": True,
+ "coupling": "chp",
+ "coupling-coefficient": ETA_HEAT, # = 0.5
+ },
+ {
+ # d=2: power output — pure flow device (no SoC), can only produce
+ # electricity (negative ems_power).
+ "sensor": power_output_sensor.id,
+ "power-capacity": "6 kW",
+ "consumption-capacity": "0 kW", # derivative_max = 0
+ "coupling": "chp",
+ "coupling-coefficient": ETA_POWER, # = 0.3 (sign inferred from capacities)
+ },
+ ]
+
+ flex_context = {
+ "consumption-price": "50 EUR/MWh",
+ "production-price": "50 EUR/MWh",
+ "site-power-capacity": "1 MW", # large enough to avoid EMS constraints
+ }
+
+ scheduler = StorageScheduler(
+ asset_or_sensor=chp,
+ start=start,
+ end=end,
+ resolution=resolution,
+ flex_model=flex_model,
+ flex_context=flex_context,
+ return_multiple=True,
+ )
+
+ results = scheduler.compute(skip_validation=True)
+
+ # ---- extract storage schedules per sensor
+ storage_schedules = {
+ r["sensor"]: r["data"] for r in results if r.get("name") == "storage_schedule"
+ }
+
+ assert gas_input_sensor in storage_schedules, "Gas input schedule missing"
+ assert heat_output_sensor in storage_schedules, "Heat output schedule missing"
+ assert power_output_sensor in storage_schedules, "Power output schedule missing"
+
+ gas_schedule = storage_schedules[gas_input_sensor]
+ heat_schedule = storage_schedules[heat_output_sensor]
+ power_schedule = storage_schedules[power_output_sensor]
+
+ # The SoC target of 20 kWh is met after 4 × 1-hour steps at 5 kW.
+ # The schedule index runs from ``start`` to ``end`` inclusive (5 time slots),
+ # so the last slot has no binding SoC constraint and the CHP is idle there.
+ # All assertions therefore apply to the first four active slots only.
+ active_steps = slice(None, -1) # exclude the final trailing idle slot
+
+ # Heat output is forced to exactly 5 kW per step by the SoC target.
+ # alpha = P_heat / ETA_HEAT = 0.005 / 0.5 = 0.010 MW
+ np.testing.assert_allclose(
+ heat_schedule.iloc[active_steps],
+ 0.005, # 5 kW expressed in MW
+ rtol=1e-4,
+ err_msg="Heat output should be exactly 5 kW per step (forced by SoC target)",
+ )
+
+ # Coupling: P_gas = 1.0 * alpha = 0.010 MW = 10 kW
+ np.testing.assert_allclose(
+ gas_schedule.iloc[active_steps],
+ 0.010, # 10 kW expressed in MW
+ rtol=1e-4,
+ err_msg="Gas input must be 10 kW — determined by coupling (1.0 * alpha)",
+ )
+
+ # Coupling: P_power = -ETA_POWER * alpha = -0.3 * 0.010 MW = -0.003 MW = -3 kW
+ np.testing.assert_allclose(
+ power_schedule.iloc[active_steps],
+ -0.003, # -3 kW expressed in MW
+ rtol=1e-4,
+ err_msg="Power output must be -3 kW — determined by coupling (-0.3 * alpha)",
+ )
diff --git a/flexmeasures/data/schemas/scheduling/__init__.py b/flexmeasures/data/schemas/scheduling/__init__.py
index 9b39a4ceab..b5bb9d8540 100644
--- a/flexmeasures/data/schemas/scheduling/__init__.py
+++ b/flexmeasures/data/schemas/scheduling/__init__.py
@@ -1,4 +1,6 @@
from __future__ import annotations
+
+from collections import OrderedDict
from datetime import timedelta
from typing import Any, Callable, Dict
@@ -250,9 +252,17 @@ class CommodityFlexContextSchema(SharedSchema):
commodity = fields.Str(
required=True,
data_key="commodity",
- metadata=metadata.COMMODITY.to_dict(),
+ metadata=metadata.COMMODITY_FLEX_CONTEXT.to_dict(),
)
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ commodity_field = self.fields.pop("commodity")
+ self.fields = OrderedDict(
+ [("commodity", commodity_field), *self.fields.items()]
+ )
+
class FlexContextSchema(SharedSchema):
"""This schema defines fields that provide context to the portfolio to be optimized."""
@@ -785,7 +795,7 @@ def _to_currency_per_mwh(price_unit: str) -> str:
},
"commodity": {
"default": "electricity",
- "description": rst_to_openapi(metadata.COMMODITY.description),
+ "description": rst_to_openapi(metadata.COMMODITY_FLEX_MODEL.description),
"types": {
"backend": "typeOne",
"ui": "One fixed value only.",
diff --git a/flexmeasures/data/schemas/scheduling/metadata.py b/flexmeasures/data/schemas/scheduling/metadata.py
index 70645d98d0..7463579864 100644
--- a/flexmeasures/data/schemas/scheduling/metadata.py
+++ b/flexmeasures/data/schemas/scheduling/metadata.py
@@ -27,6 +27,12 @@ def to_dict(self):
# FLEX-CONTEXT
+COMMODITY_FLEX_CONTEXT = MetaData(
+ description="""Commodity to which this part of the flex-context applies.
+Defaults to ``"electricity"``.
+""",
+ examples=["electricity", "gas"],
+)
INFLEXIBLE_DEVICE_SENSORS = MetaData(
description="""Power sensors representing devices that are relevant, but not flexible in the timing of their demand/supply.
For example, a sensor recording rooftop solar power that is connected behind the main meter, and whose production falls under the same contract as the flexible device(s) being scheduled.
@@ -44,11 +50,11 @@ def to_dict(self):
example=[],
)
CONSUMPTION_PRICE = MetaData(
- description="The electricity price applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO₂ intensity—whatever fits your optimization problem. [#old_consumption_price_field]_",
+ description="The commodity price (e.g. electricity price) applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO₂ intensity—whatever fits your optimization problem. [#old_consumption_price_field]_",
examples=[{"sensor": 5}, "0.29 EUR/kWh"],
)
PRODUCTION_PRICE = MetaData(
- description="The electricity price applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO₂ intensity—whatever fits your optimization problem, as long as the unit matches the ``consumption-price`` unit. [#old_production_price_field]_",
+ description="The commodity price (e.g. electricity price) applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO₂ intensity—whatever fits your optimization problem, as long as the unit matches the ``consumption-price`` unit. [#old_production_price_field]_",
example="0.12 EUR/kWh",
)
SITE_POWER_CAPACITY = MetaData(
@@ -183,9 +189,9 @@ def to_dict(self):
# FLEX-MODEL
-COMMODITY = MetaData(
- description="""Commodity type for this storage flex-model.
-Defaults to ``electricity``.
+COMMODITY_FLEX_MODEL = MetaData(
+ description="""Commodity on which this device acts.
+Defaults to ``"electricity"``.
""",
examples=["electricity", "gas"],
)
@@ -303,14 +309,14 @@ def to_dict(self):
example="90%",
)
CHARGING_EFFICIENCY = MetaData(
- description="""One-way conversion efficiency from electricity to the storage's state of charge.
+ description="""One-way conversion efficiency from the commodity (e.g. electricity) to the storage's state of charge.
Can be a percentage, a ratio in the range [0,1], or a coefficient of performance (>1).
Defaults to 100% (no conversion loss).
""",
example=".9",
)
DISCHARGING_EFFICIENCY = MetaData(
- description="""One-way conversion efficiency from the storage's state of charge to electricity.
+ description="""One-way conversion efficiency from the storage's state of charge to the commodity (e.g. electricity).
Defaults to 100% (no conversion loss).""",
example="90%",
)
diff --git a/flexmeasures/data/schemas/scheduling/storage.py b/flexmeasures/data/schemas/scheduling/storage.py
index a7189f2c18..7c29e607b7 100644
--- a/flexmeasures/data/schemas/scheduling/storage.py
+++ b/flexmeasures/data/schemas/scheduling/storage.py
@@ -251,8 +251,35 @@ class StorageFlexModelSchema(Schema):
commodity = fields.Str(
data_key="commodity",
load_default="electricity",
- validate=OneOf(["electricity", "gas"]),
- metadata=dict(description="Commodity label for this device/asset."),
+ metadata=metadata.COMMODITY_FLEX_MODEL.to_dict(),
+ )
+
+ coupling = fields.Str(
+ data_key="coupling",
+ required=False,
+ load_default=None,
+ metadata=dict(
+ description="Name of the coupling group this device belongs to. "
+ "Devices sharing the same coupling name are constrained to have "
+ "proportionally related flows via a hard equality constraint. "
+ "Use together with 'coupling-coefficient' to set the ratio.",
+ example="chp",
+ ),
+ )
+ coupling_coefficient = fields.Float(
+ data_key="coupling-coefficient",
+ required=False,
+ load_default=1.0,
+ metadata=dict(
+ description="Positive coupling magnitude for this device within its coupling group. "
+ "The optimizer introduces a decision variable 'alpha' per group per time step "
+ "and constrains every device by P[d] == coeff * alpha. "
+ "The sign of coeff is inferred internally from directional capacities: "
+ "consumption-capacity = 0 implies output (negative), production-capacity = 0 implies input (positive). "
+ "Example: a CHP with gas input (1.0), heat output (0.5) and power output (0.3). "
+ "Defaults to 1.0.",
+ example=0.5,
+ ),
)
def __init__(
diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json
index 1edbb7a4e9..e222eb3127 100644
--- a/flexmeasures/ui/static/openapi-specs.json
+++ b/flexmeasures/ui/static/openapi-specs.json
@@ -4559,8 +4559,16 @@
"CommodityFlexContext": {
"type": "object",
"properties": {
+ "commodity": {
+ "type": "string",
+ "description": "Commodity to which this part of the flex-context applies.\nDefaults to ``\"electricity\"``.\n",
+ "examples": [
+ "electricity",
+ "gas"
+ ]
+ },
"consumption-price": {
- "description": "The electricity price applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem. [#old_consumption_price_field]_",
+ "description": "The commodity price (e.g. electricity price) applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem. [#old_consumption_price_field]_",
"examples": [
{
"sensor": 5
@@ -4569,7 +4577,7 @@
]
},
"production-price": {
- "description": "The electricity price applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem, as long as the unit matches the ``consumption-price`` unit. [#old_production_price_field]_",
+ "description": "The commodity price (e.g. electricity price) applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem, as long as the unit matches the ``consumption-price`` unit. [#old_production_price_field]_",
"example": "0.12 EUR/kWh"
},
"site-power-capacity": {
@@ -4632,14 +4640,6 @@
"items": {
"type": "integer"
}
- },
- "commodity": {
- "type": "string",
- "description": "Commodity type for this storage flex-model.\nDefaults to ``electricity``.\n",
- "examples": [
- "electricity",
- "gas"
- ]
}
},
"required": [
@@ -4651,7 +4651,7 @@
"type": "object",
"properties": {
"consumption-price": {
- "description": "The electricity price applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem.",
+ "description": "The commodity price (e.g. electricity price) applied to the site's aggregate consumption. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem.",
"examples": [
{
"sensor": 5
@@ -4661,7 +4661,7 @@
"$ref": "#/components/schemas/VariableQuantityOpenAPI"
},
"production-price": {
- "description": "The electricity price applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem, as long as the unit matches the consumption-price unit.",
+ "description": "The commodity price (e.g. electricity price) applied to the site's aggregate production. Can be (a sensor recording) market prices, but also CO\u2082 intensity\u2014whatever fits your optimization problem, as long as the unit matches the consumption-price unit.",
"example": "0.12 EUR/kWh",
"$ref": "#/components/schemas/VariableQuantityOpenAPI"
},
@@ -6275,12 +6275,12 @@
"$ref": "#/components/schemas/VariableQuantityOpenAPI"
},
"charging-efficiency": {
- "description": "One-way conversion efficiency from electricity to the storage's state of charge.\nCan be a percentage, a ratio in the range [0,1], or a coefficient of performance (>1).\nDefaults to 100% (no conversion loss).\n",
+ "description": "One-way conversion efficiency from the commodity (e.g. electricity) to the storage's state of charge.\nCan be a percentage, a ratio in the range [0,1], or a coefficient of performance (>1).\nDefaults to 100% (no conversion loss).\n",
"example": ".9",
"$ref": "#/components/schemas/VariableQuantityOpenAPI"
},
"discharging-efficiency": {
- "description": "One-way conversion efficiency from the storage's state of charge to electricity.\nDefaults to 100% (no conversion loss).",
+ "description": "One-way conversion efficiency from the storage's state of charge to the commodity (e.g. electricity).\nDefaults to 100% (no conversion loss).",
"example": "90%",
"$ref": "#/components/schemas/VariableQuantityOpenAPI"
},
@@ -6323,11 +6323,26 @@
"commodity": {
"type": "string",
"default": "electricity",
- "enum": [
+ "description": "Commodity on which this device acts.\nDefaults to \"electricity\".\n",
+ "examples": [
"electricity",
"gas"
+ ]
+ },
+ "coupling": {
+ "type": [
+ "string",
+ "null"
],
- "description": "Commodity label for this device/asset."
+ "default": null,
+ "description": "Name of the coupling group this device belongs to. Devices sharing the same coupling name are constrained to have proportionally related flows via a hard equality constraint. Use together with 'coupling-coefficient' to set the ratio.",
+ "example": "chp"
+ },
+ "coupling-coefficient": {
+ "type": "number",
+ "default": 1.0,
+ "description": "Positive coupling magnitude for this device within its coupling group. The optimizer introduces a decision variable 'alpha' per group per time step and constrains every device by P[d] == coeff * alpha. The sign of coeff is inferred internally from directional capacities: consumption-capacity = 0 implies output (negative), production-capacity = 0 implies input (positive). Example: a CHP with gas input (1.0), heat output (0.5) and power output (0.3). Defaults to 1.0.",
+ "example": 0.5
},
"sensor": {
"type": "integer",
diff --git a/tests/documentation/test_schemas.py b/tests/documentation/test_schemas.py
index 0a85803103..ead6fb0a9e 100644
--- a/tests/documentation/test_schemas.py
+++ b/tests/documentation/test_schemas.py
@@ -10,6 +10,8 @@
# Metadata constants that intentionally do not appear in the documentation
EXCLUDED_METADATA = {
+ "COMMODITY_FLEX_CONTEXT", # appears as `commodity` in the flex-context listing in scheduling.rst
+ "COMMODITY_FLEX_MODEL", # appears as `commodity` in the flex-model listing in scheduling.rst
"RELAX_CAPACITY_CONSTRAINTS",
"RELAX_SITE_CAPACITY_CONSTRAINTS",
"RELAX_SOC_CONSTRAINTS",