diff --git a/.env.example b/.env.example index 3f835882..a3dca336 100644 --- a/.env.example +++ b/.env.example @@ -40,6 +40,16 @@ TRANSFER_NGWMN_VIEWS=True TRANSFER_WATERLEVELS_PRESSURE_DAILY=True TRANSFER_WEATHER_DATA=True TRANSFER_MINOR_TRACE_CHEMISTRY=True +# NM_Wells (geothermal) migration: run `python -m transfers.transfer_geothermal` +# (separate from the deprecated transfers/transfer.py NM_Aquifer driver). +TRANSFER_GEOTHERMAL_REFERENCE=True # load ref_* lookups into the lexicon +TRANSFER_NMW_MIRROR=True # load the NMW_* 1:1 staging mirror +# Optional: path to a NM_Wells SQL Server data-dump .sql file (INSERT statements). +# When set, the mirror parses it to a CSV per table (sqlparse) and bulk-loads via +# Postgres COPY; otherwise it falls back to CSV exports + row inserts. +# NMW_SQL_DUMP=/path/to/NMWells_data.sql +# Optional: dir for the per-table CSVs written from the dump (default: temp dir). +# NMW_CSV_DIR=/path/to/nmw_csv # asset storage GCS_BUCKET_NAME= diff --git a/alembic/versions/u7v8w9x0y1z2_nmw_legacy_staging_mirror_tables.py b/alembic/versions/u7v8w9x0y1z2_nmw_legacy_staging_mirror_tables.py new file mode 100644 index 00000000..b413554d --- /dev/null +++ b/alembic/versions/u7v8w9x0y1z2_nmw_legacy_staging_mirror_tables.py @@ -0,0 +1,207 @@ +"""NM_Wells 1:1 staging mirror tables + +Revision ID: u7v8w9x0y1z2 +Revises: t6u7v8w9x0y1 +Create Date: 2026-06-06 00:00:00.000000 + +1:1 staging mirror of the legacy NM_Wells SQL Server "Migrate First / Main" +tables (see db/nmw_legacy.py and docs/nm_wells-migration.md). Faithful, +column-for-column copies; the transform into the Ocotillo data model is a +later phase. + + tbl_well_locations -> NMW_WellLocations + tbl_well_headers -> NMW_WellHeaders + tbl_well_records -> NMW_WellRecords + tbl_well_z_datum -> NMW_WellZDatum + tbl_well_samples -> NMW_WellSamples +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "u7v8w9x0y1z2" +down_revision: Union[str, Sequence[str], None] = "t6u7v8w9x0y1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "NMW_WellLocations", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("WellDataID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Well_ID", sa.String(), nullable=True), + sa.Column("Import_ID", sa.Integer(), nullable=True), + sa.Column("Township", sa.Float(), nullable=True), + sa.Column("NorS_TDir", sa.String(), nullable=True), + sa.Column("Range", sa.Float(), nullable=True), + sa.Column("EorW_RDir", sa.String(), nullable=True), + sa.Column("Sectn", sa.SmallInteger(), nullable=True), + sa.Column("SectnPart", sa.String(), nullable=True), + sa.Column("UnitLetter", sa.String(), nullable=True), + sa.Column("UTM_zone", sa.String(), nullable=True), + sa.Column("State", sa.String(), nullable=True), + sa.Column("County", sa.String(), nullable=True), + sa.Column("Basin", sa.String(), nullable=True), + sa.Column("Footage_NS", sa.Float(), nullable=True), + sa.Column("NorS_FDir", sa.String(), nullable=True), + sa.Column("Footage_EW", sa.Float(), nullable=True), + sa.Column("EorW_FDir", sa.String(), nullable=True), + sa.Column("Lat_min", sa.SmallInteger(), nullable=True), + sa.Column("Lat_sec", sa.Float(), nullable=True), + sa.Column("Long_deg", sa.SmallInteger(), nullable=True), + sa.Column("Long_min", sa.SmallInteger(), nullable=True), + sa.Column("Long_sec", sa.Float(), nullable=True), + sa.Column("Lat_dd27", sa.Float(), nullable=True), + sa.Column("Long_dd27", sa.Float(), nullable=True), + sa.Column("Lat_dd83", sa.Float(), nullable=True), + sa.Column("Long_dd83", sa.Float(), nullable=True), + sa.Column("SourceID", sa.String(), nullable=True), + sa.Column("SourceDatum", sa.String(), nullable=True), + sa.Column("SourceUnits", sa.String(), nullable=True), + sa.Column("LocAccType", sa.String(), nullable=True), + sa.Column("LocAccMeas", sa.String(), nullable=True), + sa.Column("LocAccVal", sa.Float(), nullable=True), + sa.Column("Duplicated", sa.SmallInteger(), nullable=True), + sa.Column("Exclude", sa.SmallInteger(), nullable=True), + sa.Column("Comments", sa.String(), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("API", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_WellLocations_WellDataID", "NMW_WellLocations", ["WellDataID"] + ) + + op.create_table( + "NMW_WellHeaders", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("WellDataID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("WellSpotID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("API", sa.String(), nullable=True), + sa.Column("WellClass", sa.String(), nullable=True), + sa.Column("WellType", sa.String(), nullable=True), + sa.Column("WellOrient", sa.String(), nullable=True), + sa.Column("CurWellNam", sa.String(), nullable=True), + sa.Column("CurWellNum", sa.String(), nullable=True), + sa.Column("CurStatus", sa.String(), nullable=True), + sa.Column("PrdPoolCnt", sa.SmallInteger(), nullable=True), + sa.Column("CurOperatr", sa.String(), nullable=True), + sa.Column("CurOwner", sa.String(), nullable=True), + sa.Column("TotalDepth", sa.Float(), nullable=True), + sa.Column("Well_TVD", sa.Float(), nullable=True), + sa.Column("Fm_TD", sa.String(), nullable=True), + sa.Column("Age_TD", sa.String(), nullable=True), + sa.Column("SpudDate", sa.DateTime(), nullable=True), + sa.Column("ComplDate", sa.DateTime(), nullable=True), + sa.Column("PlugDate", sa.DateTime(), nullable=True), + sa.Column("PlugBack", sa.Float(), nullable=True), + sa.Column("BridgePlug", sa.String(), nullable=True), + sa.Column("ScoutTickt", sa.SmallInteger(), nullable=True), + sa.Column("DwnHoleSur", sa.SmallInteger(), nullable=True), + sa.Column("GeolLog", sa.SmallInteger(), nullable=True), + sa.Column("Geophyslog", sa.SmallInteger(), nullable=True), + sa.Column("GthrmExist", sa.SmallInteger(), nullable=True), + sa.Column("PetroData", sa.SmallInteger(), nullable=True), + sa.Column("CoreExists", sa.SmallInteger(), nullable=True), + sa.Column("Cuttings", sa.SmallInteger(), nullable=True), + sa.Column("SampleData", sa.SmallInteger(), nullable=True), + sa.Column("Comments", sa.String(), nullable=True), + sa.Column("Import_ID", sa.String(), nullable=True), + sa.Column("Import_DB", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("WellDataID"), + ) + + op.create_table( + "NMW_WellRecords", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("RecrdSetID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("WellDataID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("RecrdClass", sa.String(), nullable=True), + sa.Column("SourceID", sa.String(), nullable=True), + sa.Column("ActionDate", sa.DateTime(), nullable=True), + sa.Column("WellName", sa.String(), nullable=True), + sa.Column("WellNumber", sa.String(), nullable=True), + sa.Column("API_suffix", sa.String(), nullable=True), + sa.Column("EnteredBy", sa.String(), nullable=True), + sa.Column("EntryDate", sa.DateTime(), nullable=True), + sa.Column("Comments", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("RecrdSetID"), + ) + op.create_index("ix_NMW_WellRecords_WellDataID", "NMW_WellRecords", ["WellDataID"]) + + op.create_table( + "NMW_WellZDatum", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("RecrdsetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Elev_GL", sa.Float(), nullable=True), + sa.Column("Elev_DF", sa.Float(), nullable=True), + sa.Column("Elev_KB", sa.Float(), nullable=True), + sa.Column("Elev_unspc", sa.Float(), nullable=True), + sa.Column("DatumElev", sa.Float(), nullable=True), + sa.Column("DepthDatum", sa.String(), nullable=True), + sa.Column("DepthUnits", sa.String(), nullable=True), + sa.Column("Z_datum", sa.String(), nullable=True), + sa.Column("Z_units", sa.String(), nullable=True), + sa.Column("ElevSource", sa.String(), nullable=True), + sa.Column("ElvAccType", sa.String(), nullable=True), + sa.Column("ElvAccMeas", sa.String(), nullable=True), + sa.Column("ElvAccVal", sa.Float(), nullable=True), + sa.Column("Comments", sa.String(), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index("ix_NMW_WellZDatum_RecrdsetID", "NMW_WellZDatum", ["RecrdsetID"]) + + op.create_table( + "NMW_WellSamples", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("RecrdsetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("SmpSetName", sa.String(), nullable=True), + sa.Column("SamplClass", sa.String(), nullable=True), + sa.Column("SampleType", sa.String(), nullable=True), + sa.Column("SampleFm", sa.String(), nullable=True), + sa.Column("SampleLoc", sa.String(), nullable=True), + sa.Column("SampleDate", sa.DateTime(), nullable=True), + sa.Column("From_Depth", sa.Float(), nullable=True), + sa.Column("To_Depth", sa.Float(), nullable=True), + sa.Column("SmpDpUnt", sa.String(), nullable=True), + sa.Column("From_TVD", sa.Float(), nullable=True), + sa.Column("To_TVD", sa.Float(), nullable=True), + sa.Column("From_Elev", sa.Float(), nullable=True), + sa.Column("To_Elev", sa.Float(), nullable=True), + sa.Column("Porosity", sa.SmallInteger(), nullable=True), + sa.Column("Permeablty", sa.SmallInteger(), nullable=True), + sa.Column("Density", sa.SmallInteger(), nullable=True), + sa.Column("DST_Tests", sa.SmallInteger(), nullable=True), + sa.Column("ThinSect", sa.SmallInteger(), nullable=True), + sa.Column("Geochron", sa.SmallInteger(), nullable=True), + sa.Column("Geochem", sa.SmallInteger(), nullable=True), + sa.Column("Geothermal", sa.SmallInteger(), nullable=True), + sa.Column("WholeRock", sa.SmallInteger(), nullable=True), + sa.Column("Paleontlgy", sa.SmallInteger(), nullable=True), + sa.Column("EnteredBy", sa.String(), nullable=True), + sa.Column("EntryDate", sa.DateTime(), nullable=True), + sa.Column("Notes", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("SamplSetID"), + ) + op.create_index("ix_NMW_WellSamples_RecrdsetID", "NMW_WellSamples", ["RecrdsetID"]) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_index("ix_NMW_WellSamples_RecrdsetID", table_name="NMW_WellSamples") + op.drop_table("NMW_WellSamples") + op.drop_index("ix_NMW_WellZDatum_RecrdsetID", table_name="NMW_WellZDatum") + op.drop_table("NMW_WellZDatum") + op.drop_index("ix_NMW_WellRecords_WellDataID", table_name="NMW_WellRecords") + op.drop_table("NMW_WellRecords") + op.drop_table("NMW_WellHeaders") + op.drop_index("ix_NMW_WellLocations_WellDataID", table_name="NMW_WellLocations") + op.drop_table("NMW_WellLocations") diff --git a/alembic/versions/v8w9x0y1z2a3_nmw_geothermal_dst_mirror_tables.py b/alembic/versions/v8w9x0y1z2a3_nmw_geothermal_dst_mirror_tables.py new file mode 100644 index 00000000..2a0bd1a9 --- /dev/null +++ b/alembic/versions/v8w9x0y1z2a3_nmw_geothermal_dst_mirror_tables.py @@ -0,0 +1,330 @@ +"""NM_Wells geothermal + drill-stem-test 1:1 staging mirror tables + +Revision ID: v8w9x0y1z2a3 +Revises: u7v8w9x0y1z2 +Create Date: 2026-06-06 00:00:01.000000 + +1:1 staging mirror of the NM_Wells "Migrate First" Geothermal and Drill Stem +Test tables (see db/nmw_legacy.py and docs/nm_wells-migration.md). Columns and +lengths taken directly from the NM_Wells SQL dump DDL. + + Geothermal: + tbl_gt_bht_headers -> NMW_GtBhtHeaders + tbl_gt_bht_data -> NMW_GtBhtData + tbl_ws_intervals -> NMW_WsIntervals + tbl_gt_conductivity -> NMW_GtConductivity + tbl_gt_heat_flow -> NMW_GtHeatFlow + tbl_gt_sum_heat_flow -> NMW_GtSumHeatFlow + tbl_gt_temp_depths -> NMW_GtTempDepths + Drill Stem Tests: + tbl_ws_dst_headers -> NMW_WsDstHeaders + tbl_ws_dst_intervals -> NMW_WsDstIntervals + tbl_ws_dst_flow_history-> NMW_WsDstFlowHistory + tbl_ws_dst_fluid_properties -> NMW_WsDstFluidProperties + tbl_ws_dst_pressure -> NMW_WsDstPressure +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "v8w9x0y1z2a3" +down_revision: Union[str, Sequence[str], None] = "u7v8w9x0y1z2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "NMW_GtBhtHeaders", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("BHTGUID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("BoreDia", sa.Float(), nullable=True), + sa.Column("BoreUnits", sa.String(length=16), nullable=True), + sa.Column("DrillFluid", sa.String(length=16), nullable=True), + sa.Column("TempUnit", sa.String(length=1), nullable=True), + sa.Column("FldSalinity", sa.Float(), nullable=True), + sa.Column("FldRstvity", sa.Float(), nullable=True), + sa.Column("Fluid_pH", sa.Float(), nullable=True), + sa.Column("FldDensity", sa.Float(), nullable=True), + sa.Column("FldLevel", sa.Float(), nullable=True), + sa.Column("FldViscsty", sa.Float(), nullable=True), + sa.Column("FluidLoss", sa.String(length=50), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint("BHTGUID"), + ) + op.create_index( + "ix_NMW_GtBhtHeaders_SamplSetID", "NMW_GtBhtHeaders", ["SamplSetID"] + ) + + op.create_table( + "NMW_GtBhtData", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("BHTGUID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Depth", sa.Float(), nullable=True), + sa.Column("BHT", sa.Float(), nullable=True), + sa.Column("TempUnit", sa.String(length=5), nullable=True), + sa.Column("HrsSnceCir", sa.Float(), nullable=True), + sa.Column("DateMeasrd", sa.DateTime(), nullable=True), + sa.Column("Comments", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index("ix_NMW_GtBhtData_BHTGUID", "NMW_GtBhtData", ["BHTGUID"]) + + op.create_table( + "NMW_WsIntervals", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("IntrvlGUID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("SampleID", sa.String(length=128), nullable=True), + sa.Column("From_Depth", sa.Float(), nullable=True), + sa.Column("To_Depth", sa.Float(), nullable=True), + sa.Column("From_TVD", sa.Float(), nullable=True), + sa.Column("To_TVD", sa.Float(), nullable=True), + sa.Column("From_Elev", sa.Float(), nullable=True), + sa.Column("To_Elev", sa.Float(), nullable=True), + sa.Column("Intv_Notes", sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint("IntrvlGUID"), + ) + op.create_index("ix_NMW_WsIntervals_SamplSetID", "NMW_WsIntervals", ["SamplSetID"]) + + op.create_table( + "NMW_GtConductivity", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("IntrvlGUID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Cnductvity", sa.Float(), nullable=True), + sa.Column("CnductUnit", sa.String(length=3), nullable=True), + sa.Column("Comments", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_GtConductivity_IntrvlGUID", "NMW_GtConductivity", ["IntrvlGUID"] + ) + + op.create_table( + "NMW_GtHeatFlow", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("IntrvlGUID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Gradient", sa.Float(), nullable=True), + sa.Column("Ka", sa.Float(), nullable=True), + sa.Column("Ka_unit", sa.String(length=3), nullable=True), + sa.Column("Pm", sa.Float(), nullable=True), + sa.Column("Kpr", sa.Float(), nullable=True), + sa.Column("Kpr_unit", sa.String(length=3), nullable=True), + sa.Column("Q", sa.Float(), nullable=True), + sa.Column("Q_unit", sa.String(length=3), nullable=True), + sa.Column("Comments", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index("ix_NMW_GtHeatFlow_IntrvlGUID", "NMW_GtHeatFlow", ["IntrvlGUID"]) + + op.create_table( + "NMW_GtSumHeatFlow", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("RecrdSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("LithClass", sa.String(length=50), nullable=True), + sa.Column("UnitBasis", sa.String(length=16), nullable=True), + sa.Column("UnitName", sa.String(length=128), nullable=True), + sa.Column("GeoID", sa.String(length=16), nullable=True), + sa.Column("FromDepth", sa.Float(), nullable=True), + sa.Column("ToDepth", sa.Float(), nullable=True), + sa.Column("DepthUnit", sa.String(length=8), nullable=True), + sa.Column("From_Elev", sa.Float(), nullable=True), + sa.Column("To_Elev", sa.Float(), nullable=True), + sa.Column("ThermlGrad", sa.Float(), nullable=True), + sa.Column("TGError", sa.Float(), nullable=True), + sa.Column("GradUnit", sa.String(length=3), nullable=True), + sa.Column("TGradRange", sa.String(length=15), nullable=True), + sa.Column("SampleType", sa.String(length=50), nullable=True), + sa.Column("NumSamples", sa.SmallInteger(), nullable=True), + sa.Column("ThermlCond", sa.Float(), nullable=True), + sa.Column("TCondError", sa.Float(), nullable=True), + sa.Column("TCondUnit", sa.String(length=3), nullable=True), + sa.Column("TCondRange", sa.String(length=15), nullable=True), + sa.Column("HeatFlow", sa.Float(), nullable=True), + sa.Column("HtFlowErr", sa.Float(), nullable=True), + sa.Column("HtFlowUnit", sa.String(length=3), nullable=True), + sa.Column("HtFlowEst", sa.Float(), nullable=True), + sa.Column("Quality", sa.String(length=50), nullable=True), + sa.Column("Comments", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_GtSumHeatFlow_RecrdSetID", "NMW_GtSumHeatFlow", ["RecrdSetID"] + ) + op.create_index( + "ix_NMW_GtSumHeatFlow_SamplSetID", "NMW_GtSumHeatFlow", ["SamplSetID"] + ) + + op.create_table( + "NMW_GtTempDepths", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Depth", sa.Float(), nullable=True), + sa.Column("Temp", sa.Float(), nullable=True), + sa.Column("TempUnit", sa.String(length=1), nullable=True), + sa.Column("IntrvlGrad", sa.Float(), nullable=True), + sa.Column("Comments", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_GtTempDepths_SamplSetID", "NMW_GtTempDepths", ["SamplSetID"] + ) + + op.create_table( + "NMW_WsDstHeaders", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("DSTGUID", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("SamplSetID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("TestType", sa.String(length=50), nullable=True), + sa.Column("DSTOprator", sa.String(length=50), nullable=True), + sa.Column("PressUnits", sa.String(length=8), nullable=True), + sa.Column("TempUnit", sa.String(length=1), nullable=True), + sa.Column("PipeDiaUnt", sa.String(length=8), nullable=True), + sa.Column("PipeLenUnt", sa.String(length=8), nullable=True), + sa.Column("ChokeSizUn", sa.String(length=8), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint("DSTGUID"), + ) + op.create_index( + "ix_NMW_WsDstHeaders_SamplSetID", "NMW_WsDstHeaders", ["SamplSetID"] + ) + + op.create_table( + "NMW_WsDstIntervals", + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("DSTInterval", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("DSTGUID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("DSTName", sa.String(length=128), nullable=True), + sa.Column("TargetFm", sa.String(length=16), nullable=True), + sa.Column("DSTDate", sa.DateTime(), nullable=True), + sa.Column("DSTNumber", sa.SmallInteger(), nullable=True), + sa.Column("Status", sa.String(length=255), nullable=True), + sa.Column("StatusDate", sa.DateTime(), nullable=True), + sa.Column("PackrFrom", sa.Float(), nullable=True), + sa.Column("PackerTo", sa.Float(), nullable=True), + sa.Column("SrfChokeSz", sa.Float(), nullable=True), + sa.Column("BotChokeSz", sa.Float(), nullable=True), + sa.Column("PipeDia", sa.Float(), nullable=True), + sa.Column("PipeLength", sa.Float(), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint("DSTInterval"), + ) + op.create_index("ix_NMW_WsDstIntervals_DSTGUID", "NMW_WsDstIntervals", ["DSTGUID"]) + + op.create_table( + "NMW_WsDstFlowHistory", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("DSTInterval", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("Operation", sa.String(length=255), nullable=True), + sa.Column("StartTime", sa.DateTime(), nullable=True), + sa.Column("EndTime", sa.DateTime(), nullable=True), + sa.Column("Duration", sa.Float(), nullable=True), + sa.Column("Pressure", sa.Float(), nullable=True), + sa.Column("Temp", sa.Float(), nullable=True), + sa.Column("RecovColmn", sa.Float(), nullable=True), + sa.Column("RecovType", sa.String(length=255), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_WsDstFlowHistory_DSTInterval", "NMW_WsDstFlowHistory", ["DSTInterval"] + ) + + op.create_table( + "NMW_WsDstFluidProperties", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("DSTInterval", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("SourceLoc", sa.String(length=255), nullable=True), + sa.Column("Resistivty", sa.Float(), nullable=True), + sa.Column("Temp", sa.Float(), nullable=True), + sa.Column("Chlorides", sa.Float(), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_WsDstFluidProperties_DSTInterval", + "NMW_WsDstFluidProperties", + ["DSTInterval"], + ) + + op.create_table( + "NMW_WsDstPressure", + sa.Column("OBJECTID", sa.Integer(), nullable=False), + sa.Column("DSTInterval", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PrsGageDpt", sa.Float(), nullable=True), + sa.Column("BlankedOff", sa.SmallInteger(), nullable=True), + sa.Column("InShtInMin", sa.Float(), nullable=True), + sa.Column("FlwPrsInMin", sa.Float(), nullable=True), + sa.Column("PrsInShtIn", sa.Float(), nullable=True), + sa.Column("PrsInitClsdIn", sa.Float(), nullable=True), + sa.Column("FnShtInMin", sa.Float(), nullable=True), + sa.Column("FlwPrsFinMin", sa.Float(), nullable=True), + sa.Column("PrsFnShtIn", sa.Float(), nullable=True), + sa.Column("ShtInPrMth", sa.String(length=255), nullable=True), + sa.Column("HydrostPrsIn", sa.Float(), nullable=True), + sa.Column("HydStPrsFl", sa.Float(), nullable=True), + sa.Column("HydstPrMth", sa.String(length=255), nullable=True), + sa.Column("EquilPress", sa.Float(), nullable=True), + sa.Column("EqlPrsMth", sa.String(length=255), nullable=True), + sa.Column("FlowPrsMin", sa.Float(), nullable=True), + sa.Column("FlowPrsMax", sa.Float(), nullable=True), + sa.Column("FlowPrsMth", sa.String(length=255), nullable=True), + sa.Column("DSTFluid", sa.String(length=128), nullable=True), + sa.Column("FmTemp", sa.Float(), nullable=True), + sa.Column("TempCorrtn", sa.Float(), nullable=True), + sa.Column("TempFlowng", sa.Float(), nullable=True), + sa.Column("TempUnit", sa.String(length=5), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=True), + sa.PrimaryKeyConstraint("OBJECTID"), + ) + op.create_index( + "ix_NMW_WsDstPressure_DSTInterval", "NMW_WsDstPressure", ["DSTInterval"] + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_index("ix_NMW_WsDstPressure_DSTInterval", table_name="NMW_WsDstPressure") + op.drop_table("NMW_WsDstPressure") + op.drop_index( + "ix_NMW_WsDstFluidProperties_DSTInterval", table_name="NMW_WsDstFluidProperties" + ) + op.drop_table("NMW_WsDstFluidProperties") + op.drop_index( + "ix_NMW_WsDstFlowHistory_DSTInterval", table_name="NMW_WsDstFlowHistory" + ) + op.drop_table("NMW_WsDstFlowHistory") + op.drop_index("ix_NMW_WsDstIntervals_DSTGUID", table_name="NMW_WsDstIntervals") + op.drop_table("NMW_WsDstIntervals") + op.drop_index("ix_NMW_WsDstHeaders_SamplSetID", table_name="NMW_WsDstHeaders") + op.drop_table("NMW_WsDstHeaders") + op.drop_index("ix_NMW_GtTempDepths_SamplSetID", table_name="NMW_GtTempDepths") + op.drop_table("NMW_GtTempDepths") + op.drop_index("ix_NMW_GtSumHeatFlow_RecrdSetID", table_name="NMW_GtSumHeatFlow") + op.drop_index("ix_NMW_GtSumHeatFlow_SamplSetID", table_name="NMW_GtSumHeatFlow") + op.drop_table("NMW_GtSumHeatFlow") + op.drop_index("ix_NMW_GtHeatFlow_IntrvlGUID", table_name="NMW_GtHeatFlow") + op.drop_table("NMW_GtHeatFlow") + op.drop_index("ix_NMW_GtConductivity_IntrvlGUID", table_name="NMW_GtConductivity") + op.drop_table("NMW_GtConductivity") + op.drop_index("ix_NMW_WsIntervals_SamplSetID", table_name="NMW_WsIntervals") + op.drop_table("NMW_WsIntervals") + op.drop_index("ix_NMW_GtBhtData_BHTGUID", table_name="NMW_GtBhtData") + op.drop_table("NMW_GtBhtData") + op.drop_index("ix_NMW_GtBhtHeaders_SamplSetID", table_name="NMW_GtBhtHeaders") + op.drop_table("NMW_GtBhtHeaders") diff --git a/alembic/versions/w9x0y1z2a3b4_add_geothermal_ogc_views.py b/alembic/versions/w9x0y1z2a3b4_add_geothermal_ogc_views.py new file mode 100644 index 00000000..6e89bff9 --- /dev/null +++ b/alembic/versions/w9x0y1z2a3b4_add_geothermal_ogc_views.py @@ -0,0 +1,173 @@ +"""add geothermal OGC views (bottom-hole temps + temperature-depth profile) + +Revision ID: w9x0y1z2a3b4 +Revises: v8w9x0y1z2a3 +Create Date: 2026-06-07 00:00:00.000000 + +Two point layers over the NM_Wells staging mirror (db/nmw_legacy.py): + + ogc_geothermal_wells_bht + One feature per geothermal well that has bottom-hole-temperature data + (NMW_GtBhtData), with aggregate BHT stats. + + ogc_geothermal_wells_temperature_profile (MATERIALIZED) + One feature per geothermal well that has a downhole temperature-vs-depth + series (NMW_GtTempDepths, ~370k source rows), with the ordered series as + a JSON array. Materialized + indexed (unique well_data_id, GiST geom); + REFRESH MATERIALIZED VIEW after a data reload. + +Well geometry is built from NMW_WellLocations Lat/Long_dd83 (WGS84). Geothermal +data links to a well via: + gt_*.SamplSetID -> NMW_WellSamples.SamplSetID + NMW_WellSamples.RecrdsetID -> NMW_WellRecords.RecrdSetID + NMW_WellRecords.WellDataID -> NMW_WellLocations/Headers.WellDataID +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "w9x0y1z2a3b4" +down_revision: Union[str, Sequence[str], None] = "v8w9x0y1z2a3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +_BHT_VIEW = "ogc_geothermal_wells_bht" +_PROFILE_VIEW = "ogc_geothermal_wells_temperature_profile" + +_REQUIRED_TABLES = ( + "NMW_WellLocations", + "NMW_WellHeaders", + "NMW_WellRecords", + "NMW_WellSamples", + "NMW_GtBhtData", + "NMW_GtBhtHeaders", + "NMW_GtTempDepths", +) + + +def _create_bht_view() -> str: + return """ + CREATE VIEW ogc_geothermal_wells_bht AS + SELECT + r."WellDataID" AS well_data_id, + hdr."CurWellNam" AS well_name, + hdr."API" AS api, + hdr."TotalDepth" AS total_depth, + count(d.*) AS bht_count, + max(d."BHT") AS max_bht, + min(d."BHT") AS min_bht, + max(d."Depth") AS max_bht_depth, + max(d."TempUnit") AS temp_unit, + ST_SetSRID( + ST_MakePoint(loc."Long_dd83", loc."Lat_dd83"), 4326 + ) AS geom + FROM "NMW_GtBhtData" AS d + JOIN "NMW_GtBhtHeaders" AS h ON h."BHTGUID" = d."BHTGUID" + JOIN "NMW_WellSamples" AS s ON s."SamplSetID" = h."SamplSetID" + JOIN "NMW_WellRecords" AS r ON r."RecrdSetID" = s."RecrdsetID" + JOIN "NMW_WellLocations" AS loc ON loc."WellDataID" = r."WellDataID" + LEFT JOIN "NMW_WellHeaders" AS hdr ON hdr."WellDataID" = r."WellDataID" + WHERE loc."Lat_dd83" IS NOT NULL + AND loc."Long_dd83" IS NOT NULL + GROUP BY + r."WellDataID", + loc."Lat_dd83", + loc."Long_dd83", + hdr."CurWellNam", + hdr."API", + hdr."TotalDepth" + """ + + +def _create_profile_view() -> str: + # Materialized: the source NMW_GtTempDepths is large (~370k source rows) and + # this groups + builds a JSON series per well, too heavy to recompute per + # pygeoapi request. Staging data loads once, so staleness is a non-issue; + # REFRESH MATERIALIZED VIEW after a reload. + return """ + CREATE MATERIALIZED VIEW ogc_geothermal_wells_temperature_profile AS + SELECT + r."WellDataID" AS well_data_id, + hdr."CurWellNam" AS well_name, + hdr."API" AS api, + count(td.*) AS reading_count, + min(td."Depth") AS min_depth, + max(td."Depth") AS max_depth, + min(td."Temp") AS min_temp, + max(td."Temp") AS max_temp, + max(td."TempUnit") AS temp_unit, + json_agg( + json_build_object('depth', td."Depth", 'temp', td."Temp") + ORDER BY td."Depth" + ) AS series, + ST_SetSRID( + ST_MakePoint(loc."Long_dd83", loc."Lat_dd83"), 4326 + ) AS geom + FROM "NMW_GtTempDepths" AS td + JOIN "NMW_WellSamples" AS s ON s."SamplSetID" = td."SamplSetID" + JOIN "NMW_WellRecords" AS r ON r."RecrdSetID" = s."RecrdsetID" + JOIN "NMW_WellLocations" AS loc ON loc."WellDataID" = r."WellDataID" + LEFT JOIN "NMW_WellHeaders" AS hdr ON hdr."WellDataID" = r."WellDataID" + WHERE loc."Lat_dd83" IS NOT NULL + AND loc."Long_dd83" IS NOT NULL + AND td."Depth" IS NOT NULL + AND td."Temp" IS NOT NULL + GROUP BY + r."WellDataID", + loc."Lat_dd83", + loc."Long_dd83", + hdr."CurWellNam", + hdr."API" + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing = set(inspector.get_table_names(schema="public")) + missing = [t for t in _REQUIRED_TABLES if t not in existing] + if missing: + raise RuntimeError( + "Cannot create geothermal OGC views. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text(f"DROP VIEW IF EXISTS {_BHT_VIEW}")) + op.execute(text(_create_bht_view())) + op.execute( + text( + f"COMMENT ON VIEW {_BHT_VIEW} IS " + "'Geothermal wells with bottom-hole-temperature data.'" + ) + ) + + op.execute(text(f"DROP MATERIALIZED VIEW IF EXISTS {_PROFILE_VIEW}")) + op.execute(text(f"DROP VIEW IF EXISTS {_PROFILE_VIEW}")) + op.execute(text(_create_profile_view())) + op.execute( + text( + f"COMMENT ON MATERIALIZED VIEW {_PROFILE_VIEW} IS " + "'Geothermal wells with downhole temperature-vs-depth series.'" + ) + ) + # Unique index on the feature id enables REFRESH ... CONCURRENTLY; GiST on + # the geometry for fast pygeoapi bbox queries. + op.execute( + text( + f"CREATE UNIQUE INDEX ux_{_PROFILE_VIEW}_well_data_id " + f"ON {_PROFILE_VIEW} (well_data_id)" + ) + ) + op.execute( + text( + f"CREATE INDEX ix_{_PROFILE_VIEW}_geom ON {_PROFILE_VIEW} USING gist (geom)" + ) + ) + + +def downgrade() -> None: + op.execute(text(f"DROP MATERIALIZED VIEW IF EXISTS {_PROFILE_VIEW}")) + op.execute(text(f"DROP VIEW IF EXISTS {_BHT_VIEW}")) diff --git a/alembic/versions/x0y1z2a3b4c5_add_geothermal_heat_flow_ogc_view.py b/alembic/versions/x0y1z2a3b4c5_add_geothermal_heat_flow_ogc_view.py new file mode 100644 index 00000000..5422102a --- /dev/null +++ b/alembic/versions/x0y1z2a3b4c5_add_geothermal_heat_flow_ogc_view.py @@ -0,0 +1,109 @@ +"""add geothermal heat-flow OGC view + +Revision ID: x0y1z2a3b4c5 +Revises: w9x0y1z2a3b4 +Create Date: 2026-06-07 00:00:01.000000 + +pygeoapi point layer ogc_geothermal_wells_summary_heat_flow: geothermal wells +with summary heat-flow determinations (NMW_GtSumHeatFlow), one feature per well +with aggregate stats plus a `measurements` JSON series (one element per +determination, ordered by depth). Geometry from NMW_WellLocations Lat/Long_dd83. + +Link: NMW_GtSumHeatFlow.RecrdSetID -> NMW_WellRecords.RecrdSetID -> +NMW_WellLocations/Headers.WellDataID. +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "x0y1z2a3b4c5" +down_revision: Union[str, Sequence[str], None] = "w9x0y1z2a3b4" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +_VIEW = "ogc_geothermal_wells_summary_heat_flow" + +_REQUIRED_TABLES = ( + "NMW_WellLocations", + "NMW_WellHeaders", + "NMW_WellRecords", + "NMW_GtSumHeatFlow", +) + + +def _create_view() -> str: + return """ + CREATE VIEW ogc_geothermal_wells_summary_heat_flow AS + SELECT + r."WellDataID" AS well_data_id, + hdr."CurWellNam" AS well_name, + hdr."API" AS api, + count(shf.*) AS heat_flow_count, + max(shf."HeatFlow") AS max_heat_flow, + avg(shf."HeatFlow") AS avg_heat_flow, + max(shf."HtFlowUnit") AS heat_flow_unit, + max(shf."ThermlGrad") AS max_thermal_gradient, + max(shf."GradUnit") AS gradient_unit, + max(shf."ThermlCond") AS max_thermal_conductivity, + max(shf."TCondUnit") AS conductivity_unit, + max(shf."Quality") AS quality, + json_agg( + json_build_object( + 'from_depth', shf."FromDepth", + 'to_depth', shf."ToDepth", + 'depth_unit', shf."DepthUnit", + 'heat_flow', shf."HeatFlow", + 'heat_flow_error', shf."HtFlowErr", + 'heat_flow_unit', shf."HtFlowUnit", + 'thermal_gradient', shf."ThermlGrad", + 'gradient_unit', shf."GradUnit", + 'thermal_conductivity', shf."ThermlCond", + 'conductivity_unit', shf."TCondUnit", + 'quality', shf."Quality" + ) + ORDER BY shf."FromDepth" + ) AS measurements, + ST_SetSRID( + ST_MakePoint(loc."Long_dd83", loc."Lat_dd83"), 4326 + ) AS geom + FROM "NMW_GtSumHeatFlow" AS shf + JOIN "NMW_WellRecords" AS r ON r."RecrdSetID" = shf."RecrdSetID" + JOIN "NMW_WellLocations" AS loc ON loc."WellDataID" = r."WellDataID" + LEFT JOIN "NMW_WellHeaders" AS hdr ON hdr."WellDataID" = r."WellDataID" + WHERE loc."Lat_dd83" IS NOT NULL + AND loc."Long_dd83" IS NOT NULL + GROUP BY + r."WellDataID", + loc."Lat_dd83", + loc."Long_dd83", + hdr."CurWellNam", + hdr."API" + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing = set(inspector.get_table_names(schema="public")) + missing = [t for t in _REQUIRED_TABLES if t not in existing] + if missing: + raise RuntimeError( + "Cannot create geothermal heat-flow OGC view. Missing required " + "tables: " + ", ".join(missing) + ) + + op.execute(text(f"DROP VIEW IF EXISTS {_VIEW}")) + op.execute(text(_create_view())) + op.execute( + text( + f"COMMENT ON VIEW {_VIEW} IS " + "'Geothermal wells with summary heat-flow determinations (pygeoapi).'" + ) + ) + + +def downgrade() -> None: + op.execute(text(f"DROP VIEW IF EXISTS {_VIEW}")) diff --git a/alembic/versions/y1z2a3b4c5d6_add_geothermal_interval_heat_flow_ogc_view.py b/alembic/versions/y1z2a3b4c5d6_add_geothermal_interval_heat_flow_ogc_view.py new file mode 100644 index 00000000..12f57018 --- /dev/null +++ b/alembic/versions/y1z2a3b4c5d6_add_geothermal_interval_heat_flow_ogc_view.py @@ -0,0 +1,112 @@ +"""add geothermal per-interval heat-flow OGC view + +Revision ID: y1z2a3b4c5d6 +Revises: x0y1z2a3b4c5 +Create Date: 2026-06-07 00:00:02.000000 + +pygeoapi point layer of geothermal wells with per-interval heat-flow values +(NMW_GtHeatFlow), one feature per well with aggregate stats plus a +`measurements` JSON series (one element per interval, ordered by depth). +Distinct from ogc_geothermal_wells_summary_heat_flow (NMW_GtSumHeatFlow). + +Link: NMW_GtHeatFlow.IntrvlGUID -> NMW_WsIntervals.IntrvlGUID -> +NMW_WellSamples.SamplSetID -> NMW_WellRecords.RecrdSetID -> +NMW_WellLocations/Headers.WellDataID. +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "y1z2a3b4c5d6" +down_revision: Union[str, Sequence[str], None] = "x0y1z2a3b4c5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +_VIEW = "ogc_geothermal_wells_interval_heat_flow" + +_REQUIRED_TABLES = ( + "NMW_WellLocations", + "NMW_WellHeaders", + "NMW_WellRecords", + "NMW_WellSamples", + "NMW_WsIntervals", + "NMW_GtHeatFlow", +) + + +def _create_view() -> str: + return """ + CREATE VIEW ogc_geothermal_wells_interval_heat_flow AS + SELECT + r."WellDataID" AS well_data_id, + hdr."CurWellNam" AS well_name, + hdr."API" AS api, + count(hf.*) AS interval_count, + max(hf."Q") AS max_heat_flow, + avg(hf."Q") AS avg_heat_flow, + max(hf."Q_unit") AS heat_flow_unit, + max(hf."Gradient") AS max_gradient, + max(hf."Kpr") AS max_thermal_conductivity, + max(hf."Kpr_unit") AS conductivity_unit, + max(hf."Ka") AS max_diffusivity, + max(hf."Ka_unit") AS diffusivity_unit, + json_agg( + json_build_object( + 'from_depth', i."From_Depth", + 'to_depth', i."To_Depth", + 'heat_flow', hf."Q", + 'heat_flow_unit', hf."Q_unit", + 'gradient', hf."Gradient", + 'thermal_conductivity', hf."Kpr", + 'conductivity_unit', hf."Kpr_unit", + 'diffusivity', hf."Ka", + 'diffusivity_unit', hf."Ka_unit" + ) + ORDER BY i."From_Depth" + ) AS measurements, + ST_SetSRID( + ST_MakePoint(loc."Long_dd83", loc."Lat_dd83"), 4326 + ) AS geom + FROM "NMW_GtHeatFlow" AS hf + JOIN "NMW_WsIntervals" AS i ON i."IntrvlGUID" = hf."IntrvlGUID" + JOIN "NMW_WellSamples" AS s ON s."SamplSetID" = i."SamplSetID" + JOIN "NMW_WellRecords" AS r ON r."RecrdSetID" = s."RecrdsetID" + JOIN "NMW_WellLocations" AS loc ON loc."WellDataID" = r."WellDataID" + LEFT JOIN "NMW_WellHeaders" AS hdr ON hdr."WellDataID" = r."WellDataID" + WHERE loc."Lat_dd83" IS NOT NULL + AND loc."Long_dd83" IS NOT NULL + GROUP BY + r."WellDataID", + loc."Lat_dd83", + loc."Long_dd83", + hdr."CurWellNam", + hdr."API" + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing = set(inspector.get_table_names(schema="public")) + missing = [t for t in _REQUIRED_TABLES if t not in existing] + if missing: + raise RuntimeError( + "Cannot create geothermal interval heat-flow OGC view. Missing " + "required tables: " + ", ".join(missing) + ) + + op.execute(text(f"DROP VIEW IF EXISTS {_VIEW}")) + op.execute(text(_create_view())) + op.execute( + text( + f"COMMENT ON VIEW {_VIEW} IS " + "'Geothermal wells with per-interval heat-flow values (pygeoapi).'" + ) + ) + + +def downgrade() -> None: + op.execute(text(f"DROP VIEW IF EXISTS {_VIEW}")) diff --git a/db/__init__.py b/db/__init__.py index a376381b..4e2e7fb3 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -59,6 +59,7 @@ from db.thing_geologic_formation_association import * from db.aquifer_type import * from db.nma_legacy import * +from db.nmw_legacy import * from db.transducer import * from sqlalchemy import ( diff --git a/db/nmw_legacy.py b/db/nmw_legacy.py new file mode 100644 index 00000000..4c53c32c --- /dev/null +++ b/db/nmw_legacy.py @@ -0,0 +1,735 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""1:1 staging mirror of the legacy NM_Wells SQL Server database. + +PURPOSE +------- +These models are a FAITHFUL, column-for-column copy of the NM_Wells source +tables. They are a *staging layer*: data lands here unchanged from the SQL +dump, then a later transform phase maps it into the Ocotillo data model +(Location / Thing / FieldEvent / FieldActivity / Sample / Observation, plus +status_history, measuring_point_history, contact, publication, etc.). + +This file mirrors the convention of ``db/nma_legacy.py`` (the NM_Aquifer +mirror): ``NMW_`` table prefix, original source column names preserved via the +first positional arg to ``mapped_column``, snake_case Python attributes. + +SOURCE +------ +NM_Wells is delivered as a SQL dump. Physical source table names are +``tbl_well_*`` (snake_case). To feed the existing CSV->Pandas->ORM transfer +pipeline, export each source table to CSV (same flow as ``nma_csv_cache``). + +SCOPE (this commit) +------------------- +Mirrors the five "Migrate First / Main" tables that have an authoritative +field-level mapping in the planning workbook +("NM_Wells + Subsurface library.xlsx", sheet 3): + + tbl_well_locations -> NMW_WellLocations + tbl_well_headers -> NMW_WellHeaders + tbl_well_records -> NMW_WellRecords + tbl_well_z_datum -> NMW_WellZDatum + tbl_well_samples -> NMW_WellSamples + +Also mirrors the Geothermal and Drill Stem Test "Migrate First" tables +(columns + lengths taken directly from the NM_Wells SQL dump DDL, so these are +more precise than the five Main tables above whose lengths the sheet omitted): + + Geothermal: + tbl_gt_bht_headers -> NMW_GtBhtHeaders tbl_gt_bht_data -> NMW_GtBhtData + tbl_gt_conductivity -> NMW_GtConductivity tbl_gt_heat_flow -> NMW_GtHeatFlow + tbl_gt_sum_heat_flow-> NMW_GtSumHeatFlow tbl_gt_temp_depths -> NMW_GtTempDepths + tbl_ws_intervals -> NMW_WsIntervals + Drill Stem Tests: + tbl_ws_dst_headers -> NMW_WsDstHeaders tbl_ws_dst_intervals -> NMW_WsDstIntervals + tbl_ws_dst_flow_history -> NMW_WsDstFlowHistory + tbl_ws_dst_fluid_properties -> NMW_WsDstFluidProperties + tbl_ws_dst_pressure -> NMW_WsDstPressure + +Geothermal/DST relationship chains (kept as plain indexed GUID columns, NOT +enforced FKs, since this is staging): + well_samples.SamplSetID <- gt_bht_headers / gt_temp_depths / gt_sum_heat_flow + / ws_intervals / ws_dst_headers (SamplSetID) + gt_bht_headers.BHTGUID <- gt_bht_data.BHTGUID + ws_intervals.IntrvlGUID <- gt_conductivity / gt_heat_flow (IntrvlGUID) + ws_dst_headers.DSTGUID <- ws_dst_intervals.DSTGUID + ws_dst_intervals.DSTInterval <- ws_dst_flow_history / ws_dst_fluid_properties + / ws_dst_pressure (DSTInterval) + well_records.RecrdSetID <- gt_sum_heat_flow.RecrdSetID + +The transform of geothermal/DST into the Ocotillo model is not yet designed +(no field-level mapping in the workbook); see docs/nm_wells-migration.md. + +TRANSFORM NOTES +--------------- +Each column carries an inline note describing its eventual Ocotillo target +(from the mapping sheet). "Drop" = not carried into the Ocotillo model (kept +here only for staging fidelity / audit). See docs/nm_wells-migration.md for +the full plan and the cross-table relationship re-routing +(legacy RecrdSetID -> field_event). + +TYPE MAPPING (SQL Server -> SQLAlchemy) +--------------------------------------- + uniqueidentifier -> postgresql UUID(as_uuid=True) + int -> Integer + smallint -> SmallInteger + real / float -> Float + nvarchar -> String (source lengths not in the sheet; widened) + datetime2 -> DateTime + timestamp -> dropped (SQL Server rowversion; no value as staging data) + +PRIMARY KEYS (verified against the NM_Wells SQL dump DDL) +-------------------------------------------------------- +- NMW_WellHeaders -> WellDataID, NMW_WellRecords -> RecrdSetID, + NMW_WellSamples -> SamplSetID: declared PRIMARY KEY constraints in source. +- NMW_WellLocations, NMW_WellZDatum: source declares no PK, only unique indexes + on OBJECTID and GlobalID; OBJECTID (identity, never NULL) is used. +- Geothermal/DST: declared PKs where present (BHTGUID, IntrvlGUID, DSTGUID, + DSTInterval); the rest are heaps keyed on the OBJECTID identity column. + +LEXICON FLAGGING (Phase 2) +-------------------------- +Every ``ref_*`` table is loaded as a ``LexiconCategory`` whose rows become +``LexiconTerm``s (see transfers/reference_lexicon_transfer.py). The mirror +columns that hold those coded values will, in the Phase-2 Ocotillo model, +become ``lexicon_term`` foreign keys / enums. + +``LEXICON_REF_BY_COLUMN`` below flags every such attribute, mapping +``{tablename: {source_column: ref_source_table}}``. The lexicon *category* for +each ref table is assigned by ``transfers/reference_lexicon_transfer.py`` (one +category per ref table), so this map records the stable ref-table name rather +than the derived category string. ``LEXICON_CANDIDATES_NO_REF`` lists coded +columns that have no ``ref_*`` table and will need a NEW category / enum. +""" + +from sqlalchemy import ( + DateTime, + Float, + Integer, + SmallInteger, + String, +) +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import mapped_column + +from db.base import Base + +# Attributes that will become lexicon_term FKs / enums in the Phase-2 transform. +# {tablename: {source_column: ref_source_table}}. The lexicon category per ref +# table is assigned by transfers/reference_lexicon_transfer.py. +LEXICON_REF_BY_COLUMN: dict[str, dict[str, str]] = { + "NMW_WellLocations": { + "UnitLetter": "ref_unit_letters", + "State": "ref_states", + "County": "ref_county", + "Basin": "ref_basins", + "SourceDatum": "ref_coordinate_datum", + "SourceUnits": "ref_xy_units", + "LocAccType": "ref_coordinate_accuracy", + "LocAccMeas": "ref_coordinate_method", + }, + "NMW_WellHeaders": { + "WellClass": "ref_well_class", + "WellType": "ref_well_types", + "WellOrient": "ref_well_orientations", + "CurStatus": "ref_well_status", + }, + "NMW_WellRecords": { + "RecrdClass": "ref_well_record_class", + }, + "NMW_WellZDatum": { + "DepthDatum": "ref_ground_levels", + "DepthUnits": "ref_unit_depths", + "Z_datum": "ref_altitude_datums", + "Z_units": "ref_unit_depths", + "ElevSource": "ref_altitude_methods", + }, + "NMW_WellSamples": { + "SamplClass": "ref_sample_class", + "SampleType": "ref_sample_types", + "SmpDpUnt": "ref_unit_depths", + }, + "NMW_GtBhtHeaders": { + "BoreUnits": "ref_length_units", + "TempUnit": "ref_unit_temps", + }, + "NMW_GtBhtData": { + "TempUnit": "ref_unit_temps", + }, + "NMW_GtConductivity": { + "CnductUnit": "ref_unit_conductivity", + }, + "NMW_GtHeatFlow": { + "Kpr_unit": "ref_unit_conductivity", + "Q_unit": "ref_unit_heat_flow", + }, + "NMW_GtSumHeatFlow": { + "LithClass": "ref_lith_class", + "UnitBasis": "ref_unit_basis", + "DepthUnit": "ref_unit_depths", + "GradUnit": "ref_unit_gradients", + "SampleType": "ref_sample_types", + "TCondUnit": "ref_unit_conductivity", + "HtFlowUnit": "ref_unit_heat_flow", + }, + "NMW_GtTempDepths": { + "TempUnit": "ref_unit_temps", + }, + "NMW_WsDstHeaders": { + "PressUnits": "ref_pres_units", + "TempUnit": "ref_unit_temps", + "PipeDiaUnt": "ref_length_units", + "PipeLenUnt": "ref_length_units", + "ChokeSizUn": "ref_length_units", + }, +} + +# Coded/categorical columns with NO existing ref_* table; Phase 2 must create a +# new lexicon category or enum for these. {tablename: [source_column, ...]}. +LEXICON_CANDIDATES_NO_REF: dict[str, list[str]] = { + "NMW_GtBhtHeaders": ["DrillFluid"], + "NMW_GtHeatFlow": ["Ka_unit"], + "NMW_GtSumHeatFlow": ["Quality"], + "NMW_WsDstHeaders": ["TestType"], + "NMW_WsDstIntervals": ["Status"], + "NMW_WsDstFlowHistory": ["Operation", "RecovType"], + "NMW_WsDstPressure": ["DSTFluid"], +} + + +class NMW_WellLocations(Base): + """1:1 mirror of NM_Wells ``tbl_well_locations`` (Main / Migrate First). + + Transform target: ``location`` (point from Lat/Long_dd83, state, county) + plus a new ``NMW_Location`` table for the legacy PLSS/UTM attributes. + """ + + __tablename__ = "NMW_WellLocations" + + # No declared PK in source; OBJECTID (identity, unique index, always + # non-null) chosen over the also-unique GlobalID, which permits a NULL. + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # Drop + well_data_id = mapped_column( + "WellDataID", UUID(as_uuid=True), index=True + ) # -> NMW_Location.well_id (relates header/location/records) + well_id_legacy = mapped_column("Well_ID", String) # Drop + import_id = mapped_column("Import_ID", Integer) # Drop + township = mapped_column("Township", Float) # -> NMW_Location.township + nors_tdir = mapped_column("NorS_TDir", String) # -> NMW_Location.township_n_s + range_ = mapped_column("Range", Float) # -> NMW_Location.range + eorw_rdir = mapped_column("EorW_RDir", String) # -> NMW_Location.range_e_w + sectn = mapped_column("Sectn", SmallInteger) # -> NMW_Location.section + sectn_part = mapped_column("SectnPart", String) # -> NMW_Location.section_portion + unit_letter = mapped_column("UnitLetter", String) # -> NMW_Location.unit_letter + utm_zone = mapped_column("UTM_zone", String) # -> NMW_Location.utm_zone + state = mapped_column("State", String) # -> location.state + county = mapped_column("County", String) # -> location.county + basin = mapped_column("Basin", String) # -> NMW_Location.basin + footage_ns = mapped_column("Footage_NS", Float) # -> NMW_Location.footage_n_s + nors_fdir = mapped_column("NorS_FDir", String) # -> NMW_Location.direction_n_s + footage_ew = mapped_column("Footage_EW", Float) # -> NMW_Location.footage_e_w + eorw_fdir = mapped_column("EorW_FDir", String) # -> NMW_Location.direction_e_w + lat_min = mapped_column("Lat_min", SmallInteger) # Drop (mostly empty) + lat_sec = mapped_column("Lat_sec", Float) # Drop (mostly empty) + long_deg = mapped_column("Long_deg", SmallInteger) # Drop (mostly empty) + long_min = mapped_column("Long_min", SmallInteger) # Drop (mostly empty) + long_sec = mapped_column("Long_sec", Float) # Drop (mostly empty) + lat_dd27 = mapped_column("Lat_dd27", Float) # -> NMW_Location.latitude_dd27 + long_dd27 = mapped_column("Long_dd27", Float) # -> NMW_Location.longitude_dd27 + lat_dd83 = mapped_column("Lat_dd83", Float) # -> location.point + long_dd83 = mapped_column("Long_dd83", Float) # -> location.point + source_id = mapped_column("SourceID", String) # -> publication.id + source_datum = mapped_column("SourceDatum", String) # -> NMW_Location.source_datum + source_units = mapped_column("SourceUnits", String) # -> NMW_Location.source_units + loc_acc_type = mapped_column("LocAccType", String) # Drop + loc_acc_meas = mapped_column("LocAccMeas", String) # Drop + loc_acc_val = mapped_column("LocAccVal", Float) # Drop + duplicated = mapped_column("Duplicated", SmallInteger) # Drop + exclude = mapped_column("Exclude", SmallInteger) # Drop + comments = mapped_column("Comments", String) # (unmapped) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + api = mapped_column("API", String) # Drop + + +class NMW_WellHeaders(Base): + """1:1 mirror of NM_Wells ``tbl_well_headers`` (Main / Migrate First). + + Transform target: ``thing`` (name/type/well_depth/completion_date), + ``status_history``, ``contact`` (operator + owner), ``publication``, + ``thing_geologic_formation_association``, ``thing_id_link.alternate_id``, + plus new ``well_detail`` and ``well_purpose`` tables. + """ + + __tablename__ = "NMW_WellHeaders" + + object_id = mapped_column("OBJECTID", Integer) # Drop + # WellDataID is the key relating header <-> location <-> records. + well_data_id = mapped_column( + "WellDataID", UUID(as_uuid=True), primary_key=True + ) # Keep + well_spot_id = mapped_column( + "WellSpotID", UUID(as_uuid=True) + ) # Drop (purpose unclear) + api = mapped_column("API", String) # -> thing_id_link.alternate_id + well_class = mapped_column("WellClass", String) # -> thing.type + well_type = mapped_column("WellType", String) # -> well_purpose.purpose + well_orient = mapped_column("WellOrient", String) # -> well_detail.well_orient + cur_well_nam = mapped_column("CurWellNam", String) # -> thing.name + cur_well_num = mapped_column("CurWellNum", String) # -> well_detail.well_number + cur_status = mapped_column("CurStatus", String) # -> status_history.status + prd_pool_cnt = mapped_column("PrdPoolCnt", SmallInteger) # Drop + cur_operatr = mapped_column("CurOperatr", String) # -> contact.name (type=operator) + cur_owner = mapped_column("CurOwner", String) # -> contact.name (type=owner) + total_depth = mapped_column("TotalDepth", Float) # -> thing.well_depth + well_tvd = mapped_column("Well_TVD", Float) # Drop + fm_td = mapped_column("Fm_TD", String) # -> thing_geologic_formation_association.id + age_td = mapped_column("Age_TD", String) # Drop + spud_date = mapped_column("SpudDate", DateTime) # Drop + compl_date = mapped_column("ComplDate", DateTime) # -> thing.well_completion_date + plug_date = mapped_column("PlugDate", DateTime) # Drop + plug_back = mapped_column("PlugBack", Float) # Drop + bridge_plug = mapped_column("BridgePlug", String) # Drop + scout_tickt = mapped_column("ScoutTickt", SmallInteger) # Drop + dwn_hole_sur = mapped_column("DwnHoleSur", SmallInteger) # Drop + geol_log = mapped_column("GeolLog", SmallInteger) # Drop + geophys_log = mapped_column("Geophyslog", SmallInteger) # Drop + gthrm_exist = mapped_column("GthrmExist", SmallInteger) # Drop + petro_data = mapped_column("PetroData", SmallInteger) # Drop + core_exists = mapped_column("CoreExists", SmallInteger) # Drop + cuttings = mapped_column("Cuttings", SmallInteger) # Drop + sample_data = mapped_column("SampleData", SmallInteger) # Drop + comments = mapped_column("Comments", String) # -> well_detail.comments + import_id = mapped_column("Import_ID", String) # Drop + import_db = mapped_column("Import_DB", String) # Drop + + +class NMW_WellRecords(Base): + """1:1 mirror of NM_Wells ``tbl_well_records`` (Main / Migrate First). + + Transform target: ``field_event`` (+ ``field_activity``). The legacy + wells -> records relationship (RecrdSetID) is re-routed to + wells -> field_event during transform. RecrdClass tags which records are + geothermal. + """ + + __tablename__ = "NMW_WellRecords" + + object_id = mapped_column("OBJECTID", Integer) # Drop + recrd_set_id = mapped_column( + "RecrdSetID", UUID(as_uuid=True), primary_key=True + ) # -> field_event.id + well_data_id = mapped_column( + "WellDataID", UUID(as_uuid=True), index=True + ) # FK -> header/location WellDataID + recrd_class = mapped_column("RecrdClass", String) # -> field_activity.activity_type + source_id = mapped_column( + "SourceID", String + ) # -> publication.id (text in source, not a real FK) + action_date = mapped_column("ActionDate", DateTime) # -> field_event.event_date + well_name = mapped_column("WellName", String) # Drop + well_number = mapped_column("WellNumber", String) # Drop + api_suffix = mapped_column("API_suffix", String) # Drop + entered_by = mapped_column("EnteredBy", String) # Drop + entry_date = mapped_column("EntryDate", DateTime) # Drop + comments = mapped_column("Comments", String) # -> field_event.notes + + +class NMW_WellZDatum(Base): + """1:1 mirror of NM_Wells ``tbl_well_z_datum`` (Main / Migrate First). + + Transform target: ``measuring_point_history`` (elevation -> height, + datum -> description, units/source -> new fields). + """ + + __tablename__ = "NMW_WellZDatum" + + # No declared PK in source; OBJECTID (identity, unique index, always + # non-null) chosen over the also-unique GlobalID, which permits a NULL. + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # Drop + recrdset_id = mapped_column( + "RecrdsetID", UUID(as_uuid=True), index=True + ) # FK -> records + elev_gl = mapped_column( + "Elev_GL", Float + ) # -> measuring_point_history.measuring_point_height + elev_df = mapped_column( + "Elev_DF", Float + ) # -> measuring_point_history.measuring_point_height + elev_kb = mapped_column( + "Elev_KB", Float + ) # -> measuring_point_history.measuring_point_height + elev_unspc = mapped_column( + "Elev_unspc", Float + ) # -> measuring_point_history.measuring_point_height + datum_elev = mapped_column("DatumElev", Float) # Drop (redundant) + depth_datum = mapped_column( + "DepthDatum", String + ) # -> measuring_point_history.measuring_point_description + depth_units = mapped_column( + "DepthUnits", String + ) # -> measuring_point_history.measuring_point_units [new field] + z_datum = mapped_column("Z_datum", String) # Drop (only 7 records) + z_units = mapped_column("Z_units", String) # Drop + elev_source = mapped_column( + "ElevSource", String + ) # -> measuring_point_history.source [new field] + elv_acc_type = mapped_column("ElvAccType", String) # Drop + elv_acc_meas = mapped_column("ElvAccMeas", String) # Drop + elv_acc_val = mapped_column("ElvAccVal", Float) # Drop + comments = mapped_column("Comments", String) # Drop + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_WellSamples(Base): + """1:1 mirror of NM_Wells ``tbl_well_samples`` (Main / Migrate First). + + Transform target: ``sample`` (date/notes/created_by) + ``observation`` + (depth units). The many boolean attribute flags (Porosity, Geothermal, + etc.) are dropped (mostly empty in source). + """ + + __tablename__ = "NMW_WellSamples" + + object_id = mapped_column("OBJECTID", Integer) # Drop + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), primary_key=True + ) # -> sample.id + recrdset_id = mapped_column( + "RecrdsetID", UUID(as_uuid=True), index=True + ) # -> field_activity.id + smp_set_name = mapped_column("SmpSetName", String) # Drop + sampl_class = mapped_column("SamplClass", String) # Drop (mostly 'data') + sample_type = mapped_column("SampleType", String) # Drop (mostly empty) + sample_fm = mapped_column("SampleFm", String) # Drop (mostly empty) + sample_loc = mapped_column("SampleLoc", String) # Drop (no entries) + sample_date = mapped_column("SampleDate", DateTime) # -> sample.sample_date + from_depth = mapped_column("From_Depth", Float) # -> observation (depth) + to_depth = mapped_column("To_Depth", Float) # -> observation (depth) + smp_dp_unt = mapped_column("SmpDpUnt", String) # -> observation.unit + from_tvd = mapped_column("From_TVD", Float) # Drop + to_tvd = mapped_column("To_TVD", Float) # Drop + from_elev = mapped_column("From_Elev", Float) # Drop (empty) + to_elev = mapped_column("To_Elev", Float) # Drop (empty) + porosity = mapped_column("Porosity", SmallInteger) # Drop + permeablty = mapped_column("Permeablty", SmallInteger) # Drop + density = mapped_column("Density", SmallInteger) # Drop + dst_tests = mapped_column("DST_Tests", SmallInteger) # Drop + thin_sect = mapped_column("ThinSect", SmallInteger) # Drop + geochron = mapped_column("Geochron", SmallInteger) # Drop + geochem = mapped_column("Geochem", SmallInteger) # Drop + geothermal = mapped_column("Geothermal", SmallInteger) # Drop + whole_rock = mapped_column("WholeRock", SmallInteger) # Drop + paleontlgy = mapped_column("Paleontlgy", SmallInteger) # Drop + entered_by = mapped_column("EnteredBy", String) # -> sample.created_by_name + entry_date = mapped_column("EntryDate", DateTime) # -> sample.created_at + notes = mapped_column("Notes", String) # -> sample.notes + + +# ============================================================================= +# GEOTHERMAL (Area=Geothermal, "Migrate First") +# ============================================================================= + + +class NMW_GtBhtHeaders(Base): + """1:1 mirror of NM_Wells ``tbl_gt_bht_headers`` (bottom-hole-temp header).""" + + __tablename__ = "NMW_GtBhtHeaders" + + object_id = mapped_column("OBJECTID", Integer) # Drop (identity) + bht_guid = mapped_column("BHTGUID", UUID(as_uuid=True), primary_key=True) + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), index=True + ) # FK -> well_samples.SamplSetID + bore_dia = mapped_column("BoreDia", Float) + bore_units = mapped_column("BoreUnits", String(16)) + drill_fluid = mapped_column("DrillFluid", String(16)) + temp_unit = mapped_column("TempUnit", String(1)) + fld_salinity = mapped_column("FldSalinity", Float) + fld_rstvity = mapped_column("FldRstvity", Float) + fluid_ph = mapped_column("Fluid_pH", Float) + fld_density = mapped_column("FldDensity", Float) + fld_level = mapped_column("FldLevel", Float) + fld_viscsty = mapped_column("FldViscsty", Float) + fluid_loss = mapped_column("FluidLoss", String(50)) + notes = mapped_column("Notes", String(255)) + + +class NMW_GtBhtData(Base): + """1:1 mirror of NM_Wells ``tbl_gt_bht_data`` (BHT readings).""" + + __tablename__ = "NMW_GtBhtData" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + bht_guid = mapped_column( + "BHTGUID", UUID(as_uuid=True), index=True + ) # FK -> gt_bht_headers.BHTGUID + depth = mapped_column("Depth", Float) + bht = mapped_column("BHT", Float) + temp_unit = mapped_column("TempUnit", String(5)) + hrs_snce_cir = mapped_column("HrsSnceCir", Float) + date_measrd = mapped_column("DateMeasrd", DateTime) + comments = mapped_column("Comments", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_WsIntervals(Base): + """1:1 mirror of NM_Wells ``tbl_ws_intervals`` (sample depth intervals).""" + + __tablename__ = "NMW_WsIntervals" + + object_id = mapped_column("OBJECTID", Integer) # Drop (identity) + intrvl_guid = mapped_column("IntrvlGUID", UUID(as_uuid=True), primary_key=True) + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), index=True + ) # FK -> well_samples.SamplSetID + sample_id = mapped_column("SampleID", String(128)) + from_depth = mapped_column("From_Depth", Float) + to_depth = mapped_column("To_Depth", Float) + from_tvd = mapped_column("From_TVD", Float) + to_tvd = mapped_column("To_TVD", Float) + from_elev = mapped_column("From_Elev", Float) + to_elev = mapped_column("To_Elev", Float) + intv_notes = mapped_column("Intv_Notes", String(255)) + + +class NMW_GtConductivity(Base): + """1:1 mirror of NM_Wells ``tbl_gt_conductivity`` (thermal conductivity).""" + + __tablename__ = "NMW_GtConductivity" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + intrvl_guid = mapped_column( + "IntrvlGUID", UUID(as_uuid=True), index=True + ) # FK -> ws_intervals.IntrvlGUID + cnductvity = mapped_column("Cnductvity", Float) + cnduct_unit = mapped_column("CnductUnit", String(3)) + comments = mapped_column("Comments", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_GtHeatFlow(Base): + """1:1 mirror of NM_Wells ``tbl_gt_heat_flow`` (per-interval heat flow).""" + + __tablename__ = "NMW_GtHeatFlow" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + intrvl_guid = mapped_column( + "IntrvlGUID", UUID(as_uuid=True), index=True + ) # FK -> ws_intervals.IntrvlGUID + gradient = mapped_column("Gradient", Float) + ka = mapped_column("Ka", Float) + ka_unit = mapped_column("Ka_unit", String(3)) + pm = mapped_column("Pm", Float) + kpr = mapped_column("Kpr", Float) + kpr_unit = mapped_column("Kpr_unit", String(3)) + q = mapped_column("Q", Float) + q_unit = mapped_column("Q_unit", String(3)) + comments = mapped_column("Comments", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_GtSumHeatFlow(Base): + """1:1 mirror of NM_Wells ``tbl_gt_sum_heat_flow`` (summary heat flow).""" + + __tablename__ = "NMW_GtSumHeatFlow" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + recrd_set_id = mapped_column( + "RecrdSetID", UUID(as_uuid=True), index=True + ) # FK -> well_records.RecrdSetID + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), index=True + ) # FK -> well_samples.SamplSetID + lith_class = mapped_column("LithClass", String(50)) + unit_basis = mapped_column("UnitBasis", String(16)) + unit_name = mapped_column("UnitName", String(128)) + geo_id = mapped_column("GeoID", String(16)) + from_depth = mapped_column("FromDepth", Float) + to_depth = mapped_column("ToDepth", Float) + depth_unit = mapped_column("DepthUnit", String(8)) + from_elev = mapped_column("From_Elev", Float) + to_elev = mapped_column("To_Elev", Float) + therml_grad = mapped_column("ThermlGrad", Float) + tg_error = mapped_column("TGError", Float) + grad_unit = mapped_column("GradUnit", String(3)) + tgrad_range = mapped_column("TGradRange", String(15)) + sample_type = mapped_column("SampleType", String(50)) + num_samples = mapped_column("NumSamples", SmallInteger) + therml_cond = mapped_column("ThermlCond", Float) + tcond_error = mapped_column("TCondError", Float) + tcond_unit = mapped_column("TCondUnit", String(3)) + tcond_range = mapped_column("TCondRange", String(15)) + heat_flow = mapped_column("HeatFlow", Float) + ht_flow_err = mapped_column("HtFlowErr", Float) + ht_flow_unit = mapped_column("HtFlowUnit", String(3)) + ht_flow_est = mapped_column("HtFlowEst", Float) + quality = mapped_column("Quality", String(50)) + comments = mapped_column("Comments", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_GtTempDepths(Base): + """1:1 mirror of NM_Wells ``tbl_gt_temp_depths`` (temp-vs-depth profile).""" + + __tablename__ = "NMW_GtTempDepths" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), index=True + ) # FK -> well_samples.SamplSetID + depth = mapped_column("Depth", Float) + temp = mapped_column("Temp", Float) + temp_unit = mapped_column("TempUnit", String(1)) + intrvl_grad = mapped_column("IntrvlGrad", Float) + comments = mapped_column("Comments", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +# ============================================================================= +# DRILL STEM TESTS (Area=Drill Stem Tests, "Migrate First") +# ============================================================================= + + +class NMW_WsDstHeaders(Base): + """1:1 mirror of NM_Wells ``tbl_ws_dst_headers`` (DST header).""" + + __tablename__ = "NMW_WsDstHeaders" + + object_id = mapped_column("OBJECTID", Integer) # Drop (identity) + dst_guid = mapped_column("DSTGUID", UUID(as_uuid=True), primary_key=True) + sampl_set_id = mapped_column( + "SamplSetID", UUID(as_uuid=True), index=True + ) # FK -> well_samples.SamplSetID + test_type = mapped_column("TestType", String(50)) + dst_operator = mapped_column("DSTOprator", String(50)) + press_units = mapped_column("PressUnits", String(8)) + temp_unit = mapped_column("TempUnit", String(1)) + pipe_dia_unt = mapped_column("PipeDiaUnt", String(8)) + pipe_len_unt = mapped_column("PipeLenUnt", String(8)) + choke_siz_un = mapped_column("ChokeSizUn", String(8)) + notes = mapped_column("Notes", String(255)) + + +class NMW_WsDstIntervals(Base): + """1:1 mirror of NM_Wells ``tbl_ws_dst_intervals`` (DST interval).""" + + __tablename__ = "NMW_WsDstIntervals" + + object_id = mapped_column("OBJECTID", Integer) # Drop (identity) + dst_interval = mapped_column("DSTInterval", UUID(as_uuid=True), primary_key=True) + dst_guid = mapped_column( + "DSTGUID", UUID(as_uuid=True), index=True + ) # FK -> ws_dst_headers.DSTGUID + dst_name = mapped_column("DSTName", String(128)) + target_fm = mapped_column("TargetFm", String(16)) + dst_date = mapped_column("DSTDate", DateTime) + dst_number = mapped_column("DSTNumber", SmallInteger) + status = mapped_column("Status", String(255)) + status_date = mapped_column("StatusDate", DateTime) + packr_from = mapped_column("PackrFrom", Float) + packer_to = mapped_column("PackerTo", Float) + srf_choke_sz = mapped_column("SrfChokeSz", Float) + bot_choke_sz = mapped_column("BotChokeSz", Float) + pipe_dia = mapped_column("PipeDia", Float) + pipe_length = mapped_column("PipeLength", Float) + notes = mapped_column("Notes", String(255)) + + +class NMW_WsDstFlowHistory(Base): + """1:1 mirror of NM_Wells ``tbl_ws_dst_flow_history`` (DST flow events).""" + + __tablename__ = "NMW_WsDstFlowHistory" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + dst_interval = mapped_column( + "DSTInterval", UUID(as_uuid=True), index=True + ) # FK -> ws_dst_intervals.DSTInterval + operation = mapped_column("Operation", String(255)) + start_time = mapped_column("StartTime", DateTime) + end_time = mapped_column("EndTime", DateTime) + duration = mapped_column("Duration", Float) + pressure = mapped_column("Pressure", Float) + temp = mapped_column("Temp", Float) + recov_column = mapped_column("RecovColmn", Float) + recov_type = mapped_column("RecovType", String(255)) + notes = mapped_column("Notes", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_WsDstFluidProperties(Base): + """1:1 mirror of NM_Wells ``tbl_ws_dst_fluid_properties`` (recovered fluid).""" + + __tablename__ = "NMW_WsDstFluidProperties" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + dst_interval = mapped_column( + "DSTInterval", UUID(as_uuid=True), index=True + ) # FK -> ws_dst_intervals.DSTInterval + source_loc = mapped_column("SourceLoc", String(255)) + resistivity = mapped_column("Resistivty", Float) + temp = mapped_column("Temp", Float) + chlorides = mapped_column("Chlorides", Float) + notes = mapped_column("Notes", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +class NMW_WsDstPressure(Base): + """1:1 mirror of NM_Wells ``tbl_ws_dst_pressure`` (DST pressure readings).""" + + __tablename__ = "NMW_WsDstPressure" + + object_id = mapped_column("OBJECTID", Integer, primary_key=True) # identity PK + dst_interval = mapped_column( + "DSTInterval", UUID(as_uuid=True), index=True + ) # FK -> ws_dst_intervals.DSTInterval + prs_gage_dpt = mapped_column("PrsGageDpt", Float) + blanked_off = mapped_column("BlankedOff", SmallInteger) + in_sht_in_min = mapped_column("InShtInMin", Float) + flw_prs_in_min = mapped_column("FlwPrsInMin", Float) + prs_in_sht_in = mapped_column("PrsInShtIn", Float) + prs_init_clsd_in = mapped_column("PrsInitClsdIn", Float) + fn_sht_in_min = mapped_column("FnShtInMin", Float) + flw_prs_fin_min = mapped_column("FlwPrsFinMin", Float) + prs_fn_sht_in = mapped_column("PrsFnShtIn", Float) + sht_in_pr_mth = mapped_column("ShtInPrMth", String(255)) + hydrost_prs_in = mapped_column("HydrostPrsIn", Float) + hyd_st_prs_fl = mapped_column("HydStPrsFl", Float) + hydst_pr_mth = mapped_column("HydstPrMth", String(255)) + equil_press = mapped_column("EquilPress", Float) + eql_prs_mth = mapped_column("EqlPrsMth", String(255)) + flow_prs_min = mapped_column("FlowPrsMin", Float) + flow_prs_max = mapped_column("FlowPrsMax", Float) + flow_prs_mth = mapped_column("FlowPrsMth", String(255)) + dst_fluid = mapped_column("DSTFluid", String(128)) + fm_temp = mapped_column("FmTemp", Float) + temp_corrtn = mapped_column("TempCorrtn", Float) + temp_flowng = mapped_column("TempFlowng", Float) + temp_unit = mapped_column("TempUnit", String(5)) + notes = mapped_column("Notes", String(255)) + global_id = mapped_column("GlobalID", UUID(as_uuid=True)) # Drop + + +# ============================================================================= +# TODO(remaining "Migrate First" tables, no DDL/mapping yet) +# ----------------------------------------------------------------------------- +# Publications: tbl_sources +# Subsurface Library: dst_scan, log_scanned, Well_Header, well_operators +# See docs/nm_wells-migration.md for the full inventory + recommendations. +# ============================================================================= + + +# ============= EOF ============================================= diff --git a/pyproject.toml b/pyproject.toml index 6ae7ad46..3feae312 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,7 @@ dependencies = [ "sqlalchemy-continuum==1.6.0", "sqlalchemy-searchable==2.1.0", "sqlalchemy-utils==0.42.1", + "sqlparse>=0.5.5", "starlette==0.52.1", "starlette-admin[i18n]==0.16.1", "typer==0.26.7", diff --git a/requirements.txt b/requirements.txt index 85c0a1a7..d0c31e4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -428,9 +428,9 @@ dnspython==2.8.0 \ dotenv==0.9.9 \ --hash=sha256:29cf74a087b31dafdb5a446b6d7e11cbce8ed2741540e2339c69fbef92c94ce9 # via ocotilloapi -ecdsa==0.19.2 \ - --hash=sha256:62635b0ac1ca2e027f82122b5b81cb706edc38cd91c63dda28e4f3455a2bf930 \ - --hash=sha256:840f5dc5e375c68f36c1a7a5b9caad28f95daa65185c9253c0c08dd952bb7399 +ecdsa==0.19.1 \ + --hash=sha256:30638e27cf77b7e15c4c4cc1973720149e1033827cfd00661ca5c8cc0cdb24c3 \ + --hash=sha256:478cba7b62555866fcb3bb3fe985e06decbdb68ef55713c4e5ab98c57d508e61 # via python-jose email-validator==2.3.0 \ --hash=sha256:80f13f623413e6b197ae73bb10bf4eb0908faf509ad8362c5edeb0be7fd450b4 \ @@ -1635,6 +1635,10 @@ sqlalchemy-utils==0.42.1 \ # via # ocotilloapi # sqlalchemy-searchable +sqlparse==0.5.5 \ + --hash=sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba \ + --hash=sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e + # via ocotilloapi starlette==0.52.1 \ --hash=sha256:0029d43eb3d273bc4f83a08720b4912ea4b071087a3b48db01b7c839f7954d74 \ --hash=sha256:834edd1b0a23167694292e94f597773bc3f89f362be6effee198165a35d62933 diff --git a/transfers/nmw_mirror_transfer.py b/transfers/nmw_mirror_transfer.py new file mode 100644 index 00000000..d541d0f0 --- /dev/null +++ b/transfers/nmw_mirror_transfer.py @@ -0,0 +1,362 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""Load the NM_Wells SQL dump into the ``NMW_*`` 1:1 staging mirror tables. + +Phase 1 of the NM_Wells migration (see db/nmw_legacy.py and +docs/nm_wells-migration.md). This is a faithful copy: each source table's CSV +export is read and its rows are inserted into the matching ``NMW_*`` mirror +model with NO transformation beyond type coercion. The Phase 2 transform into +the Ocotillo model is separate. + +Generic + data-driven: one ``MirrorSpec`` per (model, source table). Column +handling is derived from each model's ``__table__`` metadata, so adding a new +mirror table requires only a model + a spec entry (no per-table code). + +Two row sources, selected at runtime: + +1. **SQL Server data dump** (preferred): set ``NMW_SQL_DUMP`` to a ``.sql`` file + of ``INSERT [dbo].[tbl_*] (...) VALUES (...)`` statements. Each table is + written to a CSV by ``transfers.nmw_sql_dump.write_table_csv`` (sqlparse) and + bulk-loaded with Postgres ``COPY ... FROM STDIN`` (truncate + COPY; Postgres + casts text -> column types). CSV output dir defaults to a temp dir, override + with ``NMW_CSV_DIR``. +2. **CSV exports** (fallback when ``NMW_SQL_DUMP`` is unset): per-table CSVs read + with ``transfers.util.read_csv`` (``transfers/data/nma_csv_cache/.csv`` + then GCS ``nma_csv/
.csv``), inserted row-by-row with type coercion. + +In both cases the source column names are the original SQL Server names +(OBJECTID, WellDataID, ...), which match the mirror columns' DB names exactly. + +Idempotent: rows upsert via ``INSERT ... ON CONFLICT () DO NOTHING``. +""" + +import itertools +import os +import tempfile +import uuid +from dataclasses import dataclass + +import pandas as pd +from sqlalchemy import DateTime, Float, Integer, LargeBinary, SmallInteger, String, text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from db.nmw_legacy import ( + NMW_GtBhtData, + NMW_GtBhtHeaders, + NMW_GtConductivity, + NMW_GtHeatFlow, + NMW_GtSumHeatFlow, + NMW_GtTempDepths, + NMW_WellHeaders, + NMW_WellLocations, + NMW_WellRecords, + NMW_WellSamples, + NMW_WellZDatum, + NMW_WsDstFlowHistory, + NMW_WsDstFluidProperties, + NMW_WsDstHeaders, + NMW_WsDstIntervals, + NMW_WsDstPressure, + NMW_WsIntervals, +) +from transfers.logger import logger +from transfers.nmw_sql_dump import iter_table_rows, write_table_csv +from transfers.util import read_csv + +# Path to a SQL Server data-dump .sql file. When set, rows are parsed from it; +# otherwise the loader falls back to per-table CSV exports. +_SQL_DUMP_ENV = "NMW_SQL_DUMP" +# Optional output dir for the per-table CSVs written from the dump (COPY path). +# Defaults to a fresh temp dir. +_CSV_DIR_ENV = "NMW_CSV_DIR" +_CHUNK_SIZE = 2000 + +# Materialized OGC views over the geothermal mirror that need a REFRESH after a +# (re)load. Regular views reflect the tables live and need no refresh. +_MATERIALIZED_VIEWS = ("ogc_geothermal_wells_temperature_profile",) + + +@dataclass +class MirrorSpec: + """Maps a mirror model to its NM_Wells source CSV/table name.""" + + model: type + source_table: str + + +# All NMW_* mirror tables. Order is irrelevant (no enforced cross-table FKs in +# the staging layer), but parents are listed before children for readability. +NMW_MIRROR_SPECS: list[MirrorSpec] = [ + # Main + MirrorSpec(NMW_WellLocations, "tbl_well_locations"), + MirrorSpec(NMW_WellHeaders, "tbl_well_headers"), + MirrorSpec(NMW_WellRecords, "tbl_well_records"), + MirrorSpec(NMW_WellZDatum, "tbl_well_z_datum"), + MirrorSpec(NMW_WellSamples, "tbl_well_samples"), + # Geothermal + MirrorSpec(NMW_GtBhtHeaders, "tbl_gt_bht_headers"), + MirrorSpec(NMW_GtBhtData, "tbl_gt_bht_data"), + MirrorSpec(NMW_WsIntervals, "tbl_ws_intervals"), + MirrorSpec(NMW_GtConductivity, "tbl_gt_conductivity"), + MirrorSpec(NMW_GtHeatFlow, "tbl_gt_heat_flow"), + MirrorSpec(NMW_GtSumHeatFlow, "tbl_gt_sum_heat_flow"), + MirrorSpec(NMW_GtTempDepths, "tbl_gt_temp_depths"), + # Drill Stem Tests + MirrorSpec(NMW_WsDstHeaders, "tbl_ws_dst_headers"), + MirrorSpec(NMW_WsDstIntervals, "tbl_ws_dst_intervals"), + MirrorSpec(NMW_WsDstFlowHistory, "tbl_ws_dst_flow_history"), + MirrorSpec(NMW_WsDstFluidProperties, "tbl_ws_dst_fluid_properties"), + MirrorSpec(NMW_WsDstPressure, "tbl_ws_dst_pressure"), +] + + +def _coerce(value, col_type): + """Coerce a single cell to the Python value for ``col_type`` (or None). + + Treats NaN/NaT as None. (pandas keeps NaN/NaT in typed columns even after a + ``.where(notnull, None)``, so the missing-value check must happen here.) + """ + if value is None: + return None + try: + if pd.isna(value): + return None + except (TypeError, ValueError): + pass # non-scalar / unhashable: fall through and coerce normally + if isinstance(col_type, UUID): + if isinstance(value, uuid.UUID): + return value + try: + return uuid.UUID(str(value).strip()) + except (ValueError, AttributeError, TypeError): + return None + if isinstance(col_type, (Integer, SmallInteger)): + try: + return int(value) + except (ValueError, TypeError): + return None + if isinstance(col_type, Float): + try: + return float(value) + except (ValueError, TypeError): + return None + if isinstance(col_type, DateTime): + # read_csv does not parse_dates, so values are typically raw strings. + # Parse explicitly to avoid driver-dependent insert failures. + if hasattr(value, "to_pydatetime"): + return value.to_pydatetime() + ts = pd.to_datetime(value, errors="coerce") + return None if pd.isna(ts) else ts.to_pydatetime() + if isinstance(col_type, String): + s = str(value) + return s[: col_type.length] if col_type.length else s + # Fallback (should not hit for our mirror types). + return value + + +def _row_source(spec: MirrorSpec): + """Return ``(iterator_of_raw_dicts, source_label)`` for a spec. + + SQL dump if ``NMW_SQL_DUMP`` is set, otherwise CSV. Raises on a hard read + error so the caller can record/skip the table. + """ + dump = os.getenv(_SQL_DUMP_ENV) + if dump: + return iter_table_rows(dump, spec.source_table), f"sql:{os.path.basename(dump)}" + df = read_csv(spec.source_table) + return (rec for rec in df.to_dict("records")), "csv" + + +def _flush(session: Session, model, rows: list[dict], pk_cols: list[str]) -> int: + """Upsert a batch; return inserted row count.""" + if not rows: + return 0 + stmt = pg_insert(model).values(rows).on_conflict_do_nothing(index_elements=pk_cols) + result = session.execute(stmt) + session.commit() + return result.rowcount if result.rowcount and result.rowcount > 0 else 0 + + +def _copy_csv_into_table( + session: Session, table_name: str, header: list[str], csv_path: str +) -> None: + """Bulk-load a CSV into ``table_name`` via Postgres COPY (pg8000 stream).""" + collist = ", ".join(f'"{c}"' for c in header) + sql = ( + f'COPY "{table_name}" ({collist}) FROM STDIN ' + "WITH (FORMAT CSV, HEADER true, NULL '')" + ) + raw = session.connection().connection # underlying pg8000 DBAPI connection + cursor = raw.cursor() + with open(csv_path, "rb") as f: + cursor.execute(sql, stream=f) + + +def _copy_load_table( + session: Session, spec: MirrorSpec, dump: str, out_dir: str, limit: int = 0 +) -> dict: + """Dump -> per-table CSV (sqlparse) -> COPY into the mirror table.""" + table = spec.model.__table__ + name = spec.source_table + # Load only model columns (rowversion/LargeBinary excluded). COPY relies on + # Postgres to cast text -> column types, so no Python coercion is needed. + columns = [c.name for c in table.columns if not isinstance(c.type, LargeBinary)] + out_csv = os.path.join(out_dir, f"{name}.csv") + + n, header = write_table_csv(dump, name, out_csv, columns=columns, limit=limit) + if n == 0: + logger.warning("Skipping %s (no rows in dump)", name) + return {"table": name, "skipped": True, "reason": "no rows", "source": "sql"} + + # Staging reload: truncate then COPY (no upsert; tables are a 1:1 snapshot). + session.execute(text(f'TRUNCATE TABLE "{table.name}"')) + _copy_csv_into_table(session, table.name, header, out_csv) + session.commit() + logger.info("COPY %s -> %s: %d rows (%s)", name, table.name, n, out_csv) + return {"table": name, "skipped": False, "rows": n, "inserted": n, "source": "sql"} + + +def _load_table(session: Session, spec: MirrorSpec, limit: int = 0) -> dict: + """Load one source table (SQL dump or CSV) into its mirror. Stats dict.""" + table = spec.model.__table__ + name = spec.source_table + # Loadable columns from the model (rowversion/LargeBinary excluded defensively). + cols = {c.name: c for c in table.columns if not isinstance(c.type, LargeBinary)} + pk_cols = [c.name for c in table.primary_key] + + try: + rows_iter, src = _row_source(spec) + except Exception as e: # noqa: BLE001 - missing source must not abort the run + logger.warning("Skipping %s (could not read source): %s", name, e) + return {"table": name, "skipped": True, "reason": str(e)} + + if limit and limit > 0: + rows_iter = itertools.islice(rows_iter, limit) + + total = 0 + inserted = 0 + batch: list[dict] = [] + warned_cols = False + for rec in rows_iter: + total += 1 + if not warned_cols: + missing = [n for n in cols if n not in rec] + if missing: + logger.warning( + "%s: mirror columns absent from source: %s", name, missing + ) + warned_cols = True + # NaN/NaT (CSV) and NULL (SQL) normalize to None inside _coerce. + row = {n: _coerce(rec.get(n), cols[n].type) for n in cols if n in rec} + if any(row.get(pk) is None for pk in pk_cols): + continue # cannot upsert without a PK value + batch.append(row) + if len(batch) >= _CHUNK_SIZE: + inserted += _flush(session, spec.model, batch, pk_cols) + batch = [] + inserted += _flush(session, spec.model, batch, pk_cols) + + if total == 0: + logger.warning("Skipping %s (no source rows from %s)", name, src) + return {"table": name, "skipped": True, "reason": "no rows", "source": src} + + logger.info( + "Mirror %s -> %s [%s]: %d source rows, %d inserted", + name, + table.name, + src, + total, + inserted, + ) + return { + "table": name, + "skipped": False, + "rows": total, + "inserted": inserted, + "source": src, + } + + +def transfer_nmw_mirror(session: Session, limit: int = None) -> tuple: + """Load all NM_Wells source tables into the ``NMW_*`` staging mirror. + + Source is a SQL dump (``NMW_SQL_DUMP``) when set, else per-table CSVs. Same + ``(session, limit)`` signature as the other session-based transfers. Returns + ``(num_tables_loaded, total_rows_inserted, errors)``. + """ + limit = int(limit or 0) + dump = os.getenv(_SQL_DUMP_ENV) + out_dir = None + if dump: + if not os.path.exists(dump): + raise FileNotFoundError(f"{_SQL_DUMP_ENV} set but file not found: {dump}") + out_dir = os.getenv(_CSV_DIR_ENV) or tempfile.mkdtemp(prefix="nmw_csv_") + os.makedirs(out_dir, exist_ok=True) + logger.info("NMW mirror source: SQL dump %s -> CSV %s -> COPY", dump, out_dir) + else: + logger.info("NMW mirror source: CSV exports (set %s for a dump)", _SQL_DUMP_ENV) + + results = [] + errors = [] + for spec in NMW_MIRROR_SPECS: + try: + if dump: + results.append(_copy_load_table(session, spec, dump, out_dir, limit)) + else: + results.append(_load_table(session, spec, limit)) + except Exception as e: # noqa: BLE001 - isolate per-table failures + logger.critical("NMW mirror load failed for %s: %s", spec.source_table, e) + session.rollback() + errors.append({"table": spec.source_table, "error": str(e)}) + + loaded = [r for r in results if not r.get("skipped")] + skipped = [r for r in results if r.get("skipped")] + inserted = sum(r.get("inserted", 0) for r in loaded) + logger.info( + "NMW mirror load complete: %d tables loaded, %d skipped, %d rows inserted, " + "%d errors", + len(loaded), + len(skipped), + inserted, + len(errors), + ) + return len(loaded), inserted, errors + + +def refresh_materialized_views(session: Session) -> list[str]: + """REFRESH the geothermal materialized OGC views (skip any not present). + + Call after a mirror (re)load so the materialized views reflect new data. + Plain (non-concurrent) REFRESH — runs inside the session transaction. + """ + refreshed = [] + for view in _MATERIALIZED_VIEWS: + exists = session.execute( + text("SELECT to_regclass(:n)"), {"n": f"public.{view}"} + ).scalar() + if not exists: + logger.warning("Skip refresh; materialized view missing: %s", view) + continue + logger.info("REFRESH MATERIALIZED VIEW %s", view) + session.execute(text(f'REFRESH MATERIALIZED VIEW "{view}"')) + session.commit() + refreshed.append(view) + return refreshed + + +# ============= EOF ============================================= diff --git a/transfers/nmw_sql_dump.py b/transfers/nmw_sql_dump.py new file mode 100644 index 00000000..29f72e73 --- /dev/null +++ b/transfers/nmw_sql_dump.py @@ -0,0 +1,226 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""Parse a SQL Server data-dump ``.sql`` file into per-table CSVs. + +``INSERT [dbo].[
] () VALUES ()[, () ...]`` statements +(SSMS "Generate Scripts -> data" / bcp INSERT mode) are split with ``sqlparse`` +and decoded to plain Python values: + + NULL -> None + N'...' / '...' -> str (doubled '' unescaped) + 123 / -1.5 -> int / float + CAST(expr AS type) -> the inner expr, recursively + 0x.... -> None (binary / rowversion; not mirrored) + +``iter_table_rows`` yields ``{column: value}`` dicts; ``write_table_csv`` writes +one table to a CSV suitable for a Postgres ``COPY ... FROM`` bulk load (NULL -> +empty field, so load with ``NULL ''``). + +Encoding is auto-detected from the BOM (SSMS writes UTF-16 LE); falls back to +utf-8. +""" + +import csv +import itertools +import re +from typing import Iterator, Optional + +import sqlparse + + +def _detect_encoding(path: str) -> str: + with open(path, "rb") as f: + head = f.read(4) + if head[:2] in (b"\xff\xfe", b"\xfe\xff"): + return "utf-16" + if head[:3] == b"\xef\xbb\xbf": + return "utf-8-sig" + return "utf-8" + + +def _split_top_level(s: str) -> list[str]: + """Split a comma list at paren-depth 0, respecting single-quoted strings.""" + parts: list[str] = [] + buf: list[str] = [] + depth = 0 + in_quote = False + i = 0 + n = len(s) + while i < n: + c = s[i] + if in_quote: + buf.append(c) + if c == "'": + if i + 1 < n and s[i + 1] == "'": # escaped '' + buf.append("'") + i += 2 + continue + in_quote = False + i += 1 + continue + if c == "'": + in_quote = True + buf.append(c) + elif c == "(": + depth += 1 + buf.append(c) + elif c == ")": + depth -= 1 + buf.append(c) + elif c == "," and depth == 0: + parts.append("".join(buf).strip()) + buf = [] + else: + buf.append(c) + i += 1 + if buf: + parts.append("".join(buf).strip()) + return parts + + +def _iter_value_groups(s: str) -> Iterator[str]: + """Yield the inside of each top-level ``( ... )`` group in a VALUES list.""" + depth = 0 + in_quote = False + start = -1 + i = 0 + n = len(s) + while i < n: + c = s[i] + if in_quote: + if c == "'": + if i + 1 < n and s[i + 1] == "'": + i += 2 + continue + in_quote = False + i += 1 + continue + if c == "'": + in_quote = True + elif c == "(": + if depth == 0: + start = i + 1 + depth += 1 + elif c == ")": + depth -= 1 + if depth == 0 and start >= 0: + yield s[start:i] + start = -1 + i += 1 + + +_CAST_RE = re.compile(r"(?is)^CAST\s*\((.*)\s+AS\s+[^)]+\)$") + + +def _parse_value(tok: str): + t = tok.strip() + if not t or t.upper() == "NULL": + return None + m = _CAST_RE.match(t) + if m: + return _parse_value(m.group(1).strip()) + # N'...' or '...' + if t[:1] == "'" or t[:2].upper() == "N'": + q = t.find("'") + inner = t[q + 1 :] + if inner.endswith("'"): + inner = inner[:-1] + return inner.replace("''", "'") + if t[:2].lower() == "0x": # binary / rowversion + return None + if re.fullmatch(r"[-+]?\d+", t): + return int(t) + try: + return float(t) + except ValueError: + return t + + +_INSERT_RE = re.compile( + r"(?is)INSERT\s+(?:\[dbo\]\.)?\[?(?P
\w+)\]?\s*" + r"\((?P.*?)\)\s*VALUES\s*(?P.*)$" +) + + +def _iter_insert_statements(path: str, table: str) -> Iterator[str]: + """Yield raw INSERT statement strings for ``table`` using sqlparse.""" + enc = _detect_encoding(path) + target = table.lower() + with open(path, encoding=enc, errors="ignore") as f: + # parsestream splits the dump into statements lazily. + for statement in sqlparse.parsestream(f): + s = str(statement).strip() + if not s: + continue + low = s.lower() + if "insert" not in low or target not in low: + continue + yield s + + +def iter_table_rows(path: str, table: str) -> Iterator[dict]: + """Yield ``{column: value}`` dicts for every INSERT into ``table``.""" + for stmt in _iter_insert_statements(path, table): + m = _INSERT_RE.search(stmt) + if not m or m.group("table").lower() != table.lower(): + continue + cols = [c.strip().strip("[]") for c in _split_top_level(m.group("cols"))] + vals_part = m.group("vals").strip().rstrip(";") + for group in _iter_value_groups(vals_part): + vals = [_parse_value(v) for v in _split_top_level(group)] + if len(vals) != len(cols): + continue # malformed row; skip + yield dict(zip(cols, vals)) + + +def _csv_cell(value) -> str: + """Render a parsed value for a COPY-friendly CSV (None -> empty field).""" + return "" if value is None else str(value) + + +def write_table_csv( + path: str, + table: str, + out_csv: str, + columns: Optional[list[str]] = None, + limit: int = 0, +) -> tuple[int, list[str]]: + """Write one source table's rows to ``out_csv``. Returns (n_rows, header). + + ``columns`` restricts/orders the output columns (e.g. the target model's + columns); missing source values become empty fields. If omitted, the first + row's keys define the header. None -> empty so Postgres COPY ``NULL ''`` + treats it as NULL. + """ + rows = iter_table_rows(path, table) + if limit and limit > 0: + rows = itertools.islice(rows, limit) + + header: Optional[list[str]] = None + writer = None + n = 0 + with open(out_csv, "w", newline="", encoding="utf-8") as fo: + for rec in rows: + if header is None: + header = list(columns) if columns else list(rec.keys()) + writer = csv.writer(fo) + writer.writerow(header) + writer.writerow([_csv_cell(rec.get(c)) for c in header]) + n += 1 + return n, (header or list(columns or [])) + + +# ============= EOF ============================================= diff --git a/transfers/reference_lexicon_transfer.py b/transfers/reference_lexicon_transfer.py new file mode 100644 index 00000000..bfd391e2 --- /dev/null +++ b/transfers/reference_lexicon_transfer.py @@ -0,0 +1,402 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""Load the legacy NM_Wells ``ref_*`` reference tables into the lexicon. + +The planning workbook ("NM_Wells + Subsurface library.xlsx", sheet 1) flags the +``ref_*`` tables as "Add to lexicon". Each ref table is a small code/description +lookup; this transfer loads its rows as ``LexiconTerm`` rows and links them to a +``LexiconCategory`` named after the table (``ref_well_class`` -> ``well_class``). + +Idempotent: mirrors ``core.initializers.init_lexicon`` — categories and terms +upsert via ``ON CONFLICT DO NOTHING`` (both ``name``/``term`` are unique); +term<->category associations are inserted only when missing (no unique +constraint exists on that table). + +Row source is the same as the mirror loader: a SQL Server data dump when +``NMW_SQL_DUMP`` is set (parsed by ``transfers.nmw_sql_dump.iter_table_rows``), +otherwise per-table CSV exports via ``transfers.util.read_csv``. + +NOTE(columns): the ref tables' actual column names are not in the workbook, so +term/definition columns are AUTO-DETECTED per table (see ``_pick_columns``). If +auto-detection is wrong for a table, set ``term_col`` / ``definition_col`` +explicitly on its ``RefTableSpec`` below. The chosen columns are logged. + +NOTE(LU_*): the Subsurface Library ``LU_*`` lookups (LU_EnteredBy, LU_LogType, +LU_Status, LU_Type_Wellheader, LU_WorkType) are also "Add to lexicon"; add them +to ``REFERENCE_TABLE_SPECS`` once their CSVs are available. +""" + +import itertools +import os +from dataclasses import dataclass +from typing import Optional + +import pandas as pd +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from db import ( + LexiconCategory, + LexiconTerm, + LexiconTermCategoryAssociation, +) +from transfers.logger import logger +from transfers.nmw_sql_dump import iter_table_rows +from transfers.util import read_csv + +# Same source selector as the mirror loader: a SQL Server data dump when +# NMW_SQL_DUMP is set, otherwise per-table CSV exports. +_SQL_DUMP_ENV = "NMW_SQL_DUMP" +# lexicon_term.term (and its FK targets) is String(100). +_TERM_MAX_LEN = 100 + +# Column-name hints for auto-detecting the code/term vs definition columns. +_META_COLS = {"objectid", "ssma_timestamp", "globalid", "id", "import_id"} +_TERM_HINTS = ("code", "abbr", "symbol", "letter", "key", "short") +_DEF_HINTS = ( + "description", + "desc", + "definition", + "meaning", + "label", + "name", + "long", + "title", + "value", + "text", +) + + +@dataclass +class RefTableSpec: + """One legacy ref table -> one lexicon category. + + term_col / definition_col are optional overrides; when None the columns are + auto-detected from the CSV header. + """ + + source_table: str + category: str + term_col: Optional[str] = None + definition_col: Optional[str] = None + description: Optional[str] = None + + +def _spec(table: str) -> RefTableSpec: + """Build a spec with category ``nmw_
`` (e.g. ``nmw_ref_states``).""" + return RefTableSpec( + source_table=table, + category=f"nmw_{table}", + description=f"Imported from NM_Wells {table}", + ) + + +# All ref_* tables marked "Add to lexicon" in the workbook (sheet 1). +# ref_nm_quads (Review, ~2k rows) is intentionally excluded; add/remove specs +# here as the mapping is refined. +REFERENCE_TABLE_SPECS: list[RefTableSpec] = [ + _spec(t) + for t in ( + "ref_altitude_datums", + "ref_altitude_methods", + "ref_basins", + "ref_coordinate_accuracy", + "ref_coordinate_datum", + "ref_coordinate_method", + "ref_county", + "ref_data_reliability", + "ref_date_drilled", + "ref_depth_types", + "ref_display_scales", + "ref_ground_levels", + "ref_gt_data_sources", + "ref_gt_well_types", + "ref_ign_comps", + "ref_indurations", + "ref_initials", + "ref_length_units", + "ref_lith_class", + "ref_lith_types", + "ref_ll_sources", + "ref_mm_facies", + "ref_perforation_types", + "ref_porosity_methods", + "ref_pres_units", + "ref_prod_meth_quality", + "ref_prod_methods", + "ref_prod_units", + "ref_sample_class", + "ref_sample_types", + "ref_states", + "ref_textures", + "ref_unit_basis", + "ref_unit_conductivity", + "ref_unit_depths", + "ref_unit_gradients", + "ref_unit_heat_flow", + "ref_unit_letters", + "ref_unit_temps", + "ref_well_action_class", + "ref_well_class", + "ref_well_commodity", + "ref_well_log_class", + "ref_well_orientations", + "ref_well_record_class", + "ref_well_status", + "ref_well_types", + "ref_work_types", + "ref_xy_units", + ) +] + + +def _pick_columns(columns: list[str], spec: RefTableSpec) -> tuple[str, str]: + """Resolve (term_col, definition_col) for a ref table. + + Honors explicit overrides on the spec, else auto-detects from the column + names using name hints, ignoring meta columns (OBJECTID, GlobalID, ...). + """ + cols = [c for c in columns if str(c).strip().lower() not in _META_COLS] + if not cols: + cols = list(columns) + low = {c: str(c).strip().lower() for c in cols} + + term_col = spec.term_col + if term_col is None: + term_col = next( + (c for c in cols if any(h in low[c] for h in _TERM_HINTS)), cols[0] + ) + + def_col = spec.definition_col + if def_col is None: + def_col = next( + (c for c in cols if c != term_col and any(h in low[c] for h in _DEF_HINTS)), + None, + ) + if def_col is None: + def_col = cols[1] if len(cols) > 1 else term_col + + return term_col, def_col + + +def _clean(value) -> Optional[str]: + if value is None or pd.isna(value): + return None + s = str(value).strip() + return s or None + + +def _get_or_create_category(session: Session, spec: RefTableSpec) -> int: + """Return the lexicon_category.id for the spec, creating it if needed.""" + cat_id = session.execute( + select(LexiconCategory.id).where(LexiconCategory.name == spec.category) + ).scalar_one_or_none() + if cat_id is not None: + return cat_id + + session.execute( + pg_insert(LexiconCategory) + .values(name=spec.category, description=spec.description) + .on_conflict_do_nothing(index_elements=["name"]) + ) + session.commit() + return session.execute( + select(LexiconCategory.id).where(LexiconCategory.name == spec.category) + ).scalar_one() + + +def _iter_source_rows(table: str, limit: int = 0): + """Yield raw ``{column: value}`` dicts for a ref table. + + SQL dump when NMW_SQL_DUMP is set (same source as the mirror loader), + otherwise per-table CSV. Mirrors transfers.nmw_mirror_transfer._row_source. + """ + dump = os.getenv(_SQL_DUMP_ENV) + if dump: + it = iter_table_rows(dump, table) + else: + df = read_csv(table) + it = (rec for rec in df.to_dict("records")) + if limit and limit > 0: + it = itertools.islice(it, limit) + return it + + +def _transfer_one(session: Session, spec: RefTableSpec, limit: int = 0) -> dict: + """Load a single ref table into the lexicon. Returns a stats dict.""" + try: + rows = list(_iter_source_rows(spec.source_table, limit)) + except Exception as e: # noqa: BLE001 - missing source should not abort the run + logger.warning("Skipping %s (could not read source): %s", spec.source_table, e) + return {"table": spec.source_table, "skipped": True, "reason": str(e)} + + if not rows: + logger.warning("Skipping %s (empty)", spec.source_table) + return {"table": spec.source_table, "skipped": True, "reason": "empty"} + + # Column names from the union of row keys (CSV rows and SSMS INSERTs are + # column-consistent, but be defensive). + columns: list[str] = [] + seen = set() + for rec in rows: + for k in rec: + if k not in seen: + seen.add(k) + columns.append(k) + + term_col, def_col = _pick_columns(columns, spec) + logger.info( + "%s -> category=%s term_col=%s definition_col=%s (%d rows)", + spec.source_table, + spec.category, + term_col, + def_col, + len(rows), + ) + + category_id = _get_or_create_category(session, spec) + + # Build unique (term -> definition) map, dropping empties and overlong terms. + term_defs: dict[str, str] = {} + truncated = 0 + for rec in rows: + term = _clean(rec.get(term_col)) + if term is None: + continue + if len(term) > _TERM_MAX_LEN: + term = term[:_TERM_MAX_LEN] + truncated += 1 + definition = _clean(rec.get(def_col)) or term + term_defs.setdefault(term, definition) + + if not term_defs: + logger.warning("Skipping %s (no usable terms)", spec.source_table) + return {"table": spec.source_table, "skipped": True, "reason": "no terms"} + if truncated: + logger.warning( + "%s: truncated %d term(s) to %d chars", + spec.source_table, + truncated, + _TERM_MAX_LEN, + ) + + term_names = list(term_defs) + existing_terms = dict( + session.execute( + select(LexiconTerm.term, LexiconTerm.id).where( + LexiconTerm.term.in_(term_names) + ) + ).all() + ) + new_rows = [ + {"term": t, "definition": d} + for t, d in term_defs.items() + if t not in existing_terms + ] + if new_rows: + session.execute( + pg_insert(LexiconTerm) + .values(new_rows) + .on_conflict_do_nothing(index_elements=["term"]) + ) + session.commit() + existing_terms = dict( + session.execute( + select(LexiconTerm.term, LexiconTerm.id).where( + LexiconTerm.term.in_(term_names) + ) + ).all() + ) + + term_ids = [tid for tid in existing_terms.values() if tid is not None] + existing_links = set() + if term_ids: + existing_links = set( + session.execute( + select(LexiconTermCategoryAssociation.term_id).where( + LexiconTermCategoryAssociation.category_id == category_id, + LexiconTermCategoryAssociation.term_id.in_(term_ids), + ) + ).scalars() + ) + + assoc_rows = [ + {"term_id": tid, "category_id": category_id} + for tid in term_ids + if tid not in existing_links + ] + if assoc_rows: + session.execute(pg_insert(LexiconTermCategoryAssociation).values(assoc_rows)) + session.commit() + + return { + "table": spec.source_table, + "skipped": False, + "rows": len(rows), + "terms": len(term_defs), + "created_terms": len(new_rows), + "linked": len(assoc_rows), + } + + +def transfer_reference_tables(session: Session, limit: int = None) -> tuple: + """Foundational transfer: load all ``ref_*`` tables into the lexicon. + + Same ``(session, limit)`` signature as the other foundational transfers + (aquifer systems, geologic formations). Returns + ``(num_tables, total_created_terms, errors)``. + """ + limit = int(limit or 0) + dump = os.getenv(_SQL_DUMP_ENV) + if dump: + if not os.path.exists(dump): + raise FileNotFoundError(f"{_SQL_DUMP_ENV} set but file not found: {dump}") + logger.info("Reference lexicon source: SQL dump %s", dump) + else: + logger.info( + "Reference lexicon source: CSV exports (set %s for a dump)", _SQL_DUMP_ENV + ) + + results = [] + errors = [] + for spec in REFERENCE_TABLE_SPECS: + try: + results.append(_transfer_one(session, spec, limit)) + except Exception as e: # noqa: BLE001 - isolate per-table failures + logger.critical( + "Reference lexicon transfer failed for %s: %s", spec.source_table, e + ) + session.rollback() + errors.append({"table": spec.source_table, "error": str(e)}) + + loaded = [r for r in results if not r.get("skipped")] + skipped = [r for r in results if r.get("skipped")] + created = sum(r.get("created_terms", 0) for r in loaded) + linked = sum(r.get("linked", 0) for r in loaded) + logger.info( + "Reference lexicon transfer complete: %d tables loaded, %d skipped, " + "%d terms created, %d associations, %d errors", + len(loaded), + len(skipped), + created, + linked, + len(errors), + ) + return len(loaded), created, errors + + +# ============= EOF ============================================= diff --git a/transfers/transfer.py b/transfers/transfer.py index 419d4870..b1cae6ba 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -13,8 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +"""DEPRECATED: legacy NM_Aquifer -> Ocotillo transfer orchestrator. + +This module (the original AMPAPI / NM_Aquifer migration driver) is deprecated. +Do not add new migrations here. New migrations get their own standalone +orchestrator script; e.g. the NM_Wells geothermal migration lives in +``transfers/transfer_geothermal.py``. +""" + import os import time +import warnings from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager from dataclasses import dataclass @@ -324,6 +333,12 @@ def _drop_and_rebuild_db() -> None: @timeit def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: + warnings.warn( + "transfers.transfer is deprecated; new migrations get their own " + "orchestrator (e.g. transfers/transfer_geothermal.py).", + DeprecationWarning, + stacklevel=2, + ) message("STARTING TRANSFER", new_line_at_top=False) if get_bool_env("DROP_AND_REBUILD_DB", False): logger.info("Dropping schema and rebuilding database from migrations") diff --git a/transfers/transfer_geothermal.py b/transfers/transfer_geothermal.py new file mode 100644 index 00000000..6945ea01 --- /dev/null +++ b/transfers/transfer_geothermal.py @@ -0,0 +1,111 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +"""Standalone orchestrator for the NM_Wells (geothermal) migration. + +Separate from the deprecated ``transfers/transfer.py`` (NM_Aquifer driver). This +script runs the NM_Wells Phase-1 staging migration: + +1. Reference -> lexicon load (``ref_*`` lookups), gated by + ``TRANSFER_GEOTHERMAL_REFERENCE`` (default True). +2. NM_Wells 1:1 staging mirror load into the ``NMW_*`` tables, gated by + ``TRANSFER_NMW_MIRROR`` (default True). Row source is a SQL Server data dump + when ``NMW_SQL_DUMP`` is set, otherwise per-table CSV exports. + +Assumes the schema already exists (run ``alembic upgrade head`` first). Does not +drop/rebuild the database. + +Run: + python -m transfers.transfer_geothermal +Env: + TRANSFER_LIMIT=1000 # rows per table (0/unset = all) + NMW_SQL_DUMP=/path/to/data.sql # optional; else CSV + TRANSFER_GEOTHERMAL_REFERENCE=1 + TRANSFER_NMW_MIRROR=1 +""" + +import os + +from dotenv import load_dotenv + +# Load .env FIRST, before any database imports. Do not override env vars already +# set by the runtime (e.g. Cloud Run jobs). +load_dotenv(override=False) + +# In managed runtimes DB_DRIVER is sometimes omitted while CLOUD_SQL_* are set. +if ( + not (os.getenv("DB_DRIVER") or "").strip() + and (os.getenv("CLOUD_SQL_INSTANCE_NAME") or "").strip() +): + os.environ["DB_DRIVER"] = "cloudsql" + +from db.engine import session_ctx # noqa: E402 +from services.env import get_bool_env # noqa: E402 +from transfers.logger import logger # noqa: E402 +from transfers.nmw_mirror_transfer import ( # noqa: E402 + refresh_materialized_views, + transfer_nmw_mirror, +) +from transfers.reference_lexicon_transfer import transfer_reference_tables # noqa: E402 + + +def run_geothermal_transfer(limit: int = None) -> dict: + """Run the NM_Wells geothermal staging migration. Returns a summary dict.""" + limit = int(limit if limit is not None else os.getenv("TRANSFER_LIMIT", 0) or 0) + summary: dict = {} + + logger.info("========== NM_WELLS (GEOTHERMAL) MIGRATION ==========") + logger.info("limit=%s", limit or "all") + + if get_bool_env("TRANSFER_GEOTHERMAL_REFERENCE", True): + logger.info("---- Reference tables -> lexicon ----") + with session_ctx() as session: + tables, created, errors = transfer_reference_tables(session, limit=limit) + summary["reference"] = { + "tables": tables, + "terms_created": created, + "errors": len(errors), + } + else: + logger.info("Skipping reference->lexicon (TRANSFER_GEOTHERMAL_REFERENCE=0)") + + if get_bool_env("TRANSFER_NMW_MIRROR", True): + logger.info("---- NM_Wells 1:1 staging mirror ----") + with session_ctx() as session: + tables, inserted, errors = transfer_nmw_mirror(session, limit=limit) + summary["mirror"] = { + "tables": tables, + "rows_inserted": inserted, + "errors": len(errors), + } + logger.info("---- Refresh materialized OGC views ----") + with session_ctx() as session: + summary["refreshed_views"] = refresh_materialized_views(session) + else: + logger.info("Skipping NM_Wells mirror (TRANSFER_NMW_MIRROR=0)") + + logger.info("NM_Wells migration complete: %s", summary) + return summary + + +def main() -> None: + run_geothermal_transfer() + + +if __name__ == "__main__": + main() + + +# ============= EOF ============================================= diff --git a/uv.lock b/uv.lock index e1ac33e0..a56136a1 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.13" [[package]] @@ -1577,6 +1577,7 @@ dependencies = [ { name = "sqlalchemy-continuum" }, { name = "sqlalchemy-searchable" }, { name = "sqlalchemy-utils" }, + { name = "sqlparse" }, { name = "starlette" }, { name = "starlette-admin", extra = ["i18n"] }, { name = "typer" }, @@ -1690,6 +1691,7 @@ requires-dist = [ { name = "sqlalchemy-continuum", specifier = "==1.6.0" }, { name = "sqlalchemy-searchable", specifier = "==2.1.0" }, { name = "sqlalchemy-utils", specifier = "==0.42.1" }, + { name = "sqlparse", specifier = ">=0.5.5" }, { name = "starlette", specifier = "==0.52.1" }, { name = "starlette-admin", extras = ["i18n"], specifier = "==0.16.1" }, { name = "typer", specifier = "==0.26.7" }, @@ -2903,6 +2905,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/25/7400c18c3ee97914cc99c90007795c00a4ec5b60c853b49db7ba24d11179/sqlalchemy_utils-0.42.1-py3-none-any.whl", hash = "sha256:243cfe1b3a1dae3c74118ae633f1d1e0ed8c787387bc33e556e37c990594ac80", size = 91761, upload-time = "2025-12-13T03:14:15.014Z" }, ] +[[package]] +name = "sqlparse" +version = "0.5.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/90/76/437d71068094df0726366574cf3432a4ed754217b436eb7429415cf2d480/sqlparse-0.5.5.tar.gz", hash = "sha256:e20d4a9b0b8585fdf63b10d30066c7c94c5d7a7ec47c889a2d83a3caa93ff28e", size = 120815, upload-time = "2025-12-19T07:17:45.073Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/49/4b/359f28a903c13438ef59ebeee215fb25da53066db67b305c125f1c6d2a25/sqlparse-0.5.5-py3-none-any.whl", hash = "sha256:12a08b3bf3eec877c519589833aed092e2444e68240a3577e8e26148acc7b1ba", size = 46138, upload-time = "2025-12-19T07:17:46.573Z" }, +] + [[package]] name = "starlette" version = "0.52.1"