diff --git a/pyproject.toml b/pyproject.toml index 55ac79c8..d12fb368 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,7 +144,7 @@ python_version = "3.12" mypy_path = "stubs" # Exclude specific directories from type checking will try to add them back gradually -exclude = "(?x)(^temoa/utilities/|^stubs/)" +exclude = "(?x)(^stubs/)" # Strict typing for our own code disallow_untyped_defs = true diff --git a/temoa/core/config.py b/temoa/core/config.py index 834dfc14..3c7858d7 100644 --- a/temoa/core/config.py +++ b/temoa/core/config.py @@ -70,6 +70,7 @@ def __init__( output_threshold_activity: float | None = None, output_threshold_emission: float | None = None, output_threshold_cost: float | None = None, + sqlite: dict[str, object] | None = None, ): if '-' in scenario: raise ValueError( @@ -167,6 +168,8 @@ def __init__( self.cycle_count_limit = cycle_count_limit self.cycle_length_limit = cycle_length_limit + self.sqlite_settings = sqlite or {} + # warn if output db != input db if self.input_database.suffix == self.output_database.suffix: # they are both .db/.sqlite if self.input_database != self.output_database: # they are not the same db diff --git a/temoa/utilities/capacity_analyzer.py b/temoa/utilities/capacity_analyzer.py index 81e77c01..fb271df3 100644 --- a/temoa/utilities/capacity_analyzer.py +++ b/temoa/utilities/capacity_analyzer.py @@ -4,11 +4,10 @@ is populated will influence the utility of using this method """ +import argparse import itertools -import os.path import sqlite3 -from definitions import PROJECT_ROOT from matplotlib import pyplot as plt # Written by: J. F. Hyink @@ -17,58 +16,75 @@ # Created on: 7/18/23 -# filename of db to analyze... -db = 'US_9R_8D_CT500.sqlite' - -source_db_file = os.path.join(PROJECT_ROOT, 'data_files', 'untracked_data', db) -print(source_db_file) -res = [] -try: - con = sqlite3.connect(source_db_file) - cur = con.cursor() - cur.execute('SELECT max_cap FROM max_capacity') - for row in cur: - res.append(row) - -except sqlite3.Error as e: - print(e) - -finally: - con.close() - -# chain them together into a list -caps = list(itertools.chain(*res)) - -cutoff = 1 # GW : An arbitrary cutoff between big and small capacity systems. -small_cap_sources = [c for c in caps if c <= cutoff] -large_cap_sources = [c for c in caps if c > cutoff] - -aggregate_small_cap = sum(small_cap_sources) -aggregate_large_cap = sum(large_cap_sources) - -print(f'{len(small_cap_sources)} small cap sources account for: {aggregate_small_cap: 0.1f} GW') -print(f'{len(large_cap_sources)} large cap sources account for: {aggregate_large_cap: 0.1f} GW') - -plt.hist(caps, bins=100) -plt.show() - - -# make a cumulative contribution plot, and find a 5% cutoff -cutoff_num_sources = 0 -caps.sort() -total_cap = sum(caps) -cumulative_caps = [ - caps[0] / total_cap, -] -for i, cap in enumerate(caps[1:]): - cumulative_caps.append(cap / total_cap + cumulative_caps[i]) - if cumulative_caps[-1] < 0.05: - cutoff_num_sources += 1 - -plt.plot(range(len(cumulative_caps)), cumulative_caps) -plt.axvline(x=cutoff_num_sources, color='red', ls='--') -plt.xlabel('Aggregated Sources') -plt.ylabel('Proportion of Total Capacity') -plt.title('Aggregate Capacity vs. Number of Sources') - -plt.show() + +def analyze_capacity(db_path: str) -> None: + res = [] + con = None + try: + con = sqlite3.connect(db_path) + cur = con.cursor() + cur.execute('SELECT max_cap FROM max_capacity') + for row in cur: + res.append(row) + + except sqlite3.Error as e: + print(f'Error connecting to database: {e}') + return + + finally: + if con: + con.close() + + if not res: + print('No data found in max_capacity table.') + return + + # chain them together into a list + caps = list(itertools.chain(*res)) + + cutoff = 1 # GW : An arbitrary cutoff between big and small capacity systems. + small_cap_sources = [c for c in caps if c <= cutoff] + large_cap_sources = [c for c in caps if c > cutoff] + + aggregate_small_cap = sum(small_cap_sources) + aggregate_large_cap = sum(large_cap_sources) + + print(f'{len(small_cap_sources)} small cap sources account for: {aggregate_small_cap: 0.1f} GW') + print(f'{len(large_cap_sources)} large cap sources account for: {aggregate_large_cap: 0.1f} GW') + + plt.hist(caps, bins=100) + plt.show() + + # make a cumulative contribution plot, and find a 5% cutoff + cutoff_num_sources = 0 + caps.sort() + total_cap = sum(caps) + cumulative_caps = [ + caps[0] / total_cap, + ] + for i, cap in enumerate(caps[1:]): + cumulative_caps.append(cap / total_cap + cumulative_caps[i]) + if cumulative_caps[-1] < 0.05: + cutoff_num_sources += 1 + + plt.plot(range(len(cumulative_caps)), cumulative_caps) + plt.axvline(x=cutoff_num_sources, color='red', ls='--') + plt.xlabel('Aggregated Sources') + plt.ylabel('Proportion of Total Capacity') + plt.title('Aggregate Capacity vs. Number of Sources') + + plt.show() + + +def main() -> None: + parser = argparse.ArgumentParser( + description='Analyze capacity distribution in a Temoa database.' + ) + parser.add_argument('db_path', help='Path to the SQLite database file.') + args = parser.parse_args() + + analyze_capacity(args.db_path) + + +if __name__ == '__main__': + main() diff --git a/temoa/utilities/graph_utils.py b/temoa/utilities/graph_utils.py index 4ee5ba5f..7a94dba5 100644 --- a/temoa/utilities/graph_utils.py +++ b/temoa/utilities/graph_utils.py @@ -40,7 +40,9 @@ GraphType = TypeVar('GraphType', nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph) -def convert_graph_to_json[GraphType: (nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph)]( +def convert_graph_to_json[ + GraphType: (nx.Graph[Any], nx.DiGraph[Any], nx.MultiGraph[Any], nx.MultiDiGraph[Any]) +]( nx_graph: GraphType, override_node_properties: dict[str, Any] | None, override_edge_properties: dict[str, Any] | None, diff --git a/temoa/utilities/unit_cost_explorer.py b/temoa/utilities/unit_cost_explorer.py index 6cafe9ac..b17c01fe 100644 --- a/temoa/utilities/unit_cost_explorer.py +++ b/temoa/utilities/unit_cost_explorer.py @@ -8,6 +8,14 @@ from temoa.components.costs import total_cost_rule from temoa.components.storage import storage_energy_upper_bound_constraint from temoa.core.model import TemoaModel +from temoa.types.core_types import ( + Period, + Region, + Season, + Technology, + TimeOfDay, + Vintage, +) # Written by: J. F. Hyink # jeff@westernspark.us @@ -29,8 +37,8 @@ # indices -rtv = ('A', 'battery', 2020) # rtv -rptv = ('A', 2020, 'battery', 2020) # rptv +rtv = (Region('A'), Technology('battery'), Vintage(2020)) # rtv +rptv = (Region('A'), Period(2020), Technology('battery'), Vintage(2020)) # rptv model.time_future.construct([2020, 2025, 2030]) # needs to go 1 period beyond optimize horizon model.time_optimize.construct([2020, 2025]) model.period_length.construct() @@ -132,21 +140,23 @@ # More VARS model.v_storage_level.construct() -model.segment_fraction_per_season.construct( - data=seasonal_fractions +model.segment_fraction_per_season.construct(data=seasonal_fractions) + +model.is_seasonal_storage[Technology('battery')] = False +upper_limit = storage_energy_upper_bound_constraint( + model, + Region('A'), + Period(2020), + Season('winter'), + TimeOfDay('1'), + Technology('battery'), + Vintage(2020), ) - -model.is_seasonal_storage['battery'] = False -upper_limit = storage_energy_upper_bound_constraint(model, 'A', 2020, 'winter', 1, 'battery', 2020) print('The storage level constraint for the single period in the "super day":\n', upper_limit) # cross-check the multiplier... mulitplier = ( - storage_dur - * model.segment_fraction_per_season['winter'] - * model.days_per_period - * c2a - * c + storage_dur * model.segment_fraction_per_season['winter'] * model.days_per_period * c2a * c ) print(f'The multiplier for the storage should be: {mulitplier}')