Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ python_version = "3.12"
mypy_path = "stubs"

# Exclude specific directories from type checking will try to add them back gradually
exclude = "(?x)(^temoa/utilities/|^stubs/)"
exclude = "(?x)(^stubs/)"

# Strict typing for our own code
disallow_untyped_defs = true
Expand Down
3 changes: 3 additions & 0 deletions temoa/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(
output_threshold_activity: float | None = None,
output_threshold_emission: float | None = None,
output_threshold_cost: float | None = None,
sqlite: dict[str, object] | None = None,
):
if '-' in scenario:
raise ValueError(
Expand Down Expand Up @@ -167,6 +168,8 @@ def __init__(
self.cycle_count_limit = cycle_count_limit
self.cycle_length_limit = cycle_length_limit

self.sqlite_settings = sqlite or {}

# warn if output db != input db
if self.input_database.suffix == self.output_database.suffix: # they are both .db/.sqlite
if self.input_database != self.output_database: # they are not the same db
Expand Down
130 changes: 73 additions & 57 deletions temoa/utilities/capacity_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
is populated will influence the utility of using this method
"""

import argparse
import itertools
import os.path
import sqlite3

from definitions import PROJECT_ROOT
from matplotlib import pyplot as plt

# Written by: J. F. Hyink
Expand All @@ -17,58 +16,75 @@

# Created on: 7/18/23

# filename of db to analyze...
db = 'US_9R_8D_CT500.sqlite'

source_db_file = os.path.join(PROJECT_ROOT, 'data_files', 'untracked_data', db)
print(source_db_file)
res = []
try:
con = sqlite3.connect(source_db_file)
cur = con.cursor()
cur.execute('SELECT max_cap FROM max_capacity')
for row in cur:
res.append(row)

except sqlite3.Error as e:
print(e)

finally:
con.close()

# chain them together into a list
caps = list(itertools.chain(*res))

cutoff = 1 # GW : An arbitrary cutoff between big and small capacity systems.
small_cap_sources = [c for c in caps if c <= cutoff]
large_cap_sources = [c for c in caps if c > cutoff]

aggregate_small_cap = sum(small_cap_sources)
aggregate_large_cap = sum(large_cap_sources)

print(f'{len(small_cap_sources)} small cap sources account for: {aggregate_small_cap: 0.1f} GW')
print(f'{len(large_cap_sources)} large cap sources account for: {aggregate_large_cap: 0.1f} GW')

plt.hist(caps, bins=100)
plt.show()


# make a cumulative contribution plot, and find a 5% cutoff
cutoff_num_sources = 0
caps.sort()
total_cap = sum(caps)
cumulative_caps = [
caps[0] / total_cap,
]
for i, cap in enumerate(caps[1:]):
cumulative_caps.append(cap / total_cap + cumulative_caps[i])
if cumulative_caps[-1] < 0.05:
cutoff_num_sources += 1

plt.plot(range(len(cumulative_caps)), cumulative_caps)
plt.axvline(x=cutoff_num_sources, color='red', ls='--')
plt.xlabel('Aggregated Sources')
plt.ylabel('Proportion of Total Capacity')
plt.title('Aggregate Capacity vs. Number of Sources')

plt.show()

def analyze_capacity(db_path: str) -> None:
res = []
con = None
try:
con = sqlite3.connect(db_path)
cur = con.cursor()
cur.execute('SELECT max_cap FROM max_capacity')
for row in cur:
res.append(row)

except sqlite3.Error as e:
print(f'Error connecting to database: {e}')
return

finally:
if con:
con.close()

if not res:
print('No data found in max_capacity table.')
return

# chain them together into a list
caps = list(itertools.chain(*res))

cutoff = 1 # GW : An arbitrary cutoff between big and small capacity systems.
small_cap_sources = [c for c in caps if c <= cutoff]
large_cap_sources = [c for c in caps if c > cutoff]

aggregate_small_cap = sum(small_cap_sources)
aggregate_large_cap = sum(large_cap_sources)

print(f'{len(small_cap_sources)} small cap sources account for: {aggregate_small_cap: 0.1f} GW')
print(f'{len(large_cap_sources)} large cap sources account for: {aggregate_large_cap: 0.1f} GW')

plt.hist(caps, bins=100)
plt.show()

# make a cumulative contribution plot, and find a 5% cutoff
cutoff_num_sources = 0
caps.sort()
total_cap = sum(caps)
cumulative_caps = [
caps[0] / total_cap,
]
for i, cap in enumerate(caps[1:]):
cumulative_caps.append(cap / total_cap + cumulative_caps[i])
if cumulative_caps[-1] < 0.05:
cutoff_num_sources += 1

plt.plot(range(len(cumulative_caps)), cumulative_caps)
plt.axvline(x=cutoff_num_sources, color='red', ls='--')
plt.xlabel('Aggregated Sources')
plt.ylabel('Proportion of Total Capacity')
plt.title('Aggregate Capacity vs. Number of Sources')

plt.show()


def main() -> None:
parser = argparse.ArgumentParser(
description='Analyze capacity distribution in a Temoa database.'
)
parser.add_argument('db_path', help='Path to the SQLite database file.')
args = parser.parse_args()

analyze_capacity(args.db_path)


if __name__ == '__main__':
main()
4 changes: 3 additions & 1 deletion temoa/utilities/graph_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
GraphType = TypeVar('GraphType', nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph)


def convert_graph_to_json[GraphType: (nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph)](
def convert_graph_to_json[
GraphType: (nx.Graph[Any], nx.DiGraph[Any], nx.MultiGraph[Any], nx.MultiDiGraph[Any])
](
nx_graph: GraphType,
override_node_properties: dict[str, Any] | None,
override_edge_properties: dict[str, Any] | None,
Expand Down
34 changes: 22 additions & 12 deletions temoa/utilities/unit_cost_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
from temoa.components.costs import total_cost_rule
from temoa.components.storage import storage_energy_upper_bound_constraint
from temoa.core.model import TemoaModel
from temoa.types.core_types import (
Period,
Region,
Season,
Technology,
TimeOfDay,
Vintage,
)

# Written by: J. F. Hyink
# jeff@westernspark.us
Expand All @@ -29,8 +37,8 @@


# indices
rtv = ('A', 'battery', 2020) # rtv
rptv = ('A', 2020, 'battery', 2020) # rptv
rtv = (Region('A'), Technology('battery'), Vintage(2020)) # rtv
rptv = (Region('A'), Period(2020), Technology('battery'), Vintage(2020)) # rptv
model.time_future.construct([2020, 2025, 2030]) # needs to go 1 period beyond optimize horizon
model.time_optimize.construct([2020, 2025])
model.period_length.construct()
Expand Down Expand Up @@ -132,21 +140,23 @@

# More VARS
model.v_storage_level.construct()
model.segment_fraction_per_season.construct(
data=seasonal_fractions
model.segment_fraction_per_season.construct(data=seasonal_fractions)

model.is_seasonal_storage[Technology('battery')] = False
upper_limit = storage_energy_upper_bound_constraint(
model,
Region('A'),
Period(2020),
Season('winter'),
TimeOfDay('1'),
Technology('battery'),
Vintage(2020),
)

model.is_seasonal_storage['battery'] = False
upper_limit = storage_energy_upper_bound_constraint(model, 'A', 2020, 'winter', 1, 'battery', 2020)
print('The storage level constraint for the single period in the "super day":\n', upper_limit)

# cross-check the multiplier...
mulitplier = (
storage_dur
* model.segment_fraction_per_season['winter']
* model.days_per_period
* c2a
* c
storage_dur * model.segment_fraction_per_season['winter'] * model.days_per_period * c2a * c
)
print(f'The multiplier for the storage should be: {mulitplier}')

Expand Down
Loading