From 6ecee1c365b3aeb71843c080c4c4cebe7f7bb524 Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 15:34:51 -0600 Subject: [PATCH 1/8] feat: add NGWMN views sourced from new Ocotillo data model Add view_NGWMN_WaterLevels, view_NGWMN_WellConstruction, and view_NGWMN_Lithology as PostgreSQL views over the new Ocotillo tables, replicating the legacy AMPAPI (NM_Aquifer) view definitions so NGWMN exports no longer depend on the static NMA_view_NGWMN_* copy tables. Co-Authored-By: Claude Fable 5 --- ...2z3_add_ngwmn_views_from_ocotillo_model.py | 205 ++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py diff --git a/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py new file mode 100644 index 00000000..d878e36f --- /dev/null +++ b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py @@ -0,0 +1,205 @@ +"""add NGWMN views sourced from the new Ocotillo data model + +Replaces the legacy NMA_view_NGWMN_* copy tables as the source for NGWMN +exports. These views reproduce the original AMPAPI (SQL Server) view +definitions but read from the new Ocotillo tables: + +- view_NGWMN_WaterLevels: observation/sample/field_activity/field_event/thing +- view_NGWMN_WellConstruction: thing/well_screen/well_casing_material +- view_NGWMN_Lithology: thing_geologic_formation_association/geologic_formation + +Revision ID: u8v9w0x1y2z3 +Revises: t6u7v8w9x0y1 +Create Date: 2026-06-11 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "u8v9w0x1y2z3" +down_revision: Union[str, Sequence[str], None] = "t6u7v8w9x0y1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +REQUIRED_TABLES = { + "thing", + "well_screen", + "well_casing_material", + "observation", + "sample", + "field_activity", + "field_event", + "parameter", + "thing_geologic_formation_association", + "geologic_formation", +} + +DROP_WATERLEVELS_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_WaterLevels"' +DROP_WELLCONSTRUCTION_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_WellConstruction"' +DROP_LITHOLOGY_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_Lithology"' + + +def _create_waterlevels_view() -> str: + # Mirrors dbo.view_NGWMN_WaterLevels: + # SELECT PointID, DateMeasured, DepthToWaterBGS, 'ft bgs', CASE + # MeasurementMethod..., CASE DataQuality..., PublicRelease + # FROM WaterLevels WHERE PublicRelease = 1 + # + # The transfer stored observation.value as depth-to-water below the + # measuring point (DepthToWater) and measuring_point_height separately, + # so DepthToWaterBGS = value - measuring_point_height (0 when no MP + # height was recorded). + # + # observation_datetime was converted to UTC during transfer; rows whose + # legacy timestamp had no time component were stored at 00:00 UTC, so + # only rows with a real time component are shifted back to local time + # before taking the date. + # + # MeasurementMethod/WLAccuracy reproduce the legacy CASE expressions, + # keyed on the lexicon meanings the transfer wrote (LU_MeasurementMethod + # and LU_DataQuality), including the legacy quirk mapping code O + # ("Observed...") to 'Acoustic Sounder'. + return """ + CREATE VIEW "view_NGWMN_WaterLevels" AS + SELECT + t.name AS "PointID", + CASE + WHEN (o.observation_datetime AT TIME ZONE 'UTC')::time = '00:00:00' + THEN (o.observation_datetime AT TIME ZONE 'UTC')::date + ELSE (o.observation_datetime AT TIME ZONE 'America/Denver')::date + END AS "DateMeasured", + o.value - COALESCE(o.measuring_point_height, 0) AS "DepthToWaterBGS", + 'ft bgs' AS "WLUnits", + CASE s.sample_method + WHEN 'Steel-tape measurement' THEN 'Steel tape' + WHEN 'Electric tape measurement (E-probe)' THEN 'Electric tape' + WHEN 'Observed (required for F, N, and W water level status)' THEN 'Acoustic Sounder' + WHEN 'Estimated' THEN 'Estimated' + WHEN 'Reported, method not known' THEN 'Reported' + WHEN 'Pressure-gage measurement' THEN 'Pressure gauge' + WHEN 'Unknown (for legacy data only; not for new data entry)' THEN 'Unknown; from legacy data' + ELSE NULL + END AS "MeasurementMethod", + CASE o.nma_data_quality + WHEN 'Water level accurate to within two hundreths of a foot' THEN '0.02 ft' + WHEN 'Water level accurate to within one foot' THEN '1.0 ft' + WHEN 'Water level accuracy not to nearest foot or water level not repeatable' THEN 'Unknown' + ELSE NULL + END AS "WLAccuracy", + TRUE AS "PublicRelease" + FROM observation AS o + JOIN sample AS s ON s.id = o.sample_id + JOIN field_activity AS fa ON fa.id = s.field_activity_id + JOIN field_event AS fe ON fe.id = fa.field_event_id + JOIN thing AS t ON t.id = fe.thing_id + JOIN parameter AS p ON p.id = o.parameter_id + WHERE p.parameter_name = 'groundwater level' + AND o.release_status = 'public' + """ + + +def _create_wellconstruction_view() -> str: + # Mirrors dbo.view_NGWMN_WellConstruction: + # WellData LEFT JOIN WellScreens ON WellData.WellID = WellScreens.WellID + # CasingTop is 0 whenever a casing depth exists (casing assumed to start + # at ground surface). The legacy free-text CasingDescription was reduced + # to controlled material terms during transfer, so it is rebuilt here as + # a comma-separated list of well_casing_material terms. + return """ + CREATE VIEW "view_NGWMN_WellConstruction" AS + SELECT + t.name AS "PointID", + CASE WHEN t.well_casing_depth IS NOT NULL THEN 0::double precision END AS "CasingTop", + t.well_casing_depth AS "CasingBottom", + CASE WHEN t.well_casing_depth IS NOT NULL THEN 'ft bgs' END AS "CasingDepthUnits", + ws.screen_depth_top AS "ScreenTop", + ws.screen_depth_bottom AS "ScreenBottom", + CASE WHEN ws.screen_depth_bottom IS NOT NULL THEN 'ft bgs' END AS "ScreenBottomUnit", + ws.screen_description AS "ScreenDescription", + cm.materials AS "CasingDescription" + FROM thing AS t + LEFT JOIN well_screen AS ws ON ws.thing_id = t.id + LEFT JOIN LATERAL ( + SELECT string_agg(wcm.material, ', ' ORDER BY wcm.material) AS materials + FROM well_casing_material AS wcm + WHERE wcm.thing_id = t.id + ) AS cm ON TRUE + WHERE t.thing_type = 'water well' + """ + + +def _create_lithology_view() -> str: + # Mirrors dbo.view_NGWMN_Lithology: + # Stratigraphy INNER JOIN LU_Lithology ON Lithology = ABBREVIATION + # The new model keeps the resolved lithology term on geologic_formation + # (the abbreviation code was not migrated), so the term backs both the + # Lithology and TERM columns. The inner join is reproduced by requiring + # a non-null lithology. StratSource was not migrated and is NULL. + return """ + CREATE VIEW "view_NGWMN_Lithology" AS + SELECT + tgfa.id AS "OBJECTID", + t.name AS "PointID", + gf.lithology AS "Lithology", + gf.lithology AS "TERM", + NULL::character varying AS "StratSource", + tgfa.top_depth AS "StratTop", + CASE WHEN tgfa.top_depth IS NOT NULL THEN 'ft bgs' END AS "StratTopUnit", + tgfa.bottom_depth AS "StratBottom", + CASE WHEN tgfa.bottom_depth IS NOT NULL THEN 'ft bgs' END AS "StratBottomUnit" + FROM thing_geologic_formation_association AS tgfa + JOIN thing AS t ON t.id = tgfa.thing_id + JOIN geologic_formation AS gf ON gf.id = tgfa.geologic_formation_id + WHERE gf.lithology IS NOT NULL + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + missing = REQUIRED_TABLES - existing_tables + if missing: + raise RuntimeError( + "Cannot create NGWMN views. Missing required tables: " + f"{', '.join(sorted(missing))}" + ) + + op.execute(text(DROP_WATERLEVELS_SQL)) + op.execute(text(_create_waterlevels_view())) + op.execute( + text( + 'COMMENT ON VIEW "view_NGWMN_WaterLevels" IS ' + "'Public manual groundwater level measurements in the NGWMN " + "exchange format, sourced from the Ocotillo observation model.'" + ) + ) + + op.execute(text(DROP_WELLCONSTRUCTION_SQL)) + op.execute(text(_create_wellconstruction_view())) + op.execute( + text( + 'COMMENT ON VIEW "view_NGWMN_WellConstruction" IS ' + "'Well casing and screen intervals in the NGWMN exchange format, " + "sourced from the Ocotillo thing/well_screen model.'" + ) + ) + + op.execute(text(DROP_LITHOLOGY_SQL)) + op.execute(text(_create_lithology_view())) + op.execute( + text( + 'COMMENT ON VIEW "view_NGWMN_Lithology" IS ' + "'Lithology intervals in the NGWMN exchange format, sourced from " + "the Ocotillo geologic formation associations.'" + ) + ) + + +def downgrade() -> None: + op.execute(text(DROP_LITHOLOGY_SQL)) + op.execute(text(DROP_WELLCONSTRUCTION_SQL)) + op.execute(text(DROP_WATERLEVELS_SQL)) From c009966ad89b4f4f6aaa3e42ca0cf997c706d54b Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 15:56:40 -0600 Subject: [PATCH 2/8] feat: serve NGWMN exports from new Ocotillo-model views Point the /ngwmn endpoint queries at view_NGWMN_WaterLevels, view_NGWMN_WellConstruction, and view_NGWMN_Lithology instead of the legacy NMA_view_NGWMN_* copy tables, and add endpoint tests covering the XML output, BGS depth math, timezone date handling, measurement method and accuracy mapping, and private-record exclusion. Co-Authored-By: Claude Fable 5 --- services/ngwmn_helper.py | 6 +- tests/test_ngwmn_endpoints.py | 252 ++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 3 deletions(-) create mode 100644 tests/test_ngwmn_endpoints.py diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py index 73df1158..94b52c11 100644 --- a/services/ngwmn_helper.py +++ b/services/ngwmn_helper.py @@ -39,7 +39,7 @@ def make_xml_response(db, sql, point_id, func): def make_lithology_response(point_id, db): sql = ( 'select "PointID", "StratTop", "StratBottom", "TERM" ' - 'from "NMA_view_NGWMN_Lithology" where "PointID"=:point_id' + 'from "view_NGWMN_Lithology" where "PointID"=:point_id' ) return make_xml_response(db, sql, point_id, lithology_xml) @@ -48,14 +48,14 @@ def make_well_construction_response(point_id, db): sql = ( 'select "PointID", "CasingTop", "CasingBottom", "CasingDepthUnits", ' '"ScreenTop", "ScreenBottom", "ScreenBottomUnit", "ScreenDescription", "CasingDescription" ' - 'from "NMA_view_NGWMN_WellConstruction" where "PointID"=:point_id' + 'from "view_NGWMN_WellConstruction" where "PointID"=:point_id' ) return make_xml_response(db, sql, point_id, well_construction_xml) def make_waterlevels_response(point_id, db): sql = ( - 'select * from "NMA_view_NGWMN_WaterLevels" where "PointID"=:point_id ' + 'select * from "view_NGWMN_WaterLevels" where "PointID"=:point_id ' 'order by "DateMeasured"' ) sql2 = ( diff --git a/tests/test_ngwmn_endpoints.py b/tests/test_ngwmn_endpoints.py new file mode 100644 index 00000000..32db2736 --- /dev/null +++ b/tests/test_ngwmn_endpoints.py @@ -0,0 +1,252 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Tests for the /ngwmn endpoints backed by the view_NGWMN_* views, which are +sourced from the new Ocotillo data model (thing/well_screen/observation/ +thing_geologic_formation_association) rather than the legacy NMA_view_NGWMN_* +copy tables. +""" + +from xml.etree import ElementTree as etree + +import pytest +from sqlalchemy import delete + +from db import ( + FieldActivity, + FieldEvent, + GeologicFormation, + Observation, + Sample, + Thing, + ThingGeologicFormationAssociation, + WellCasingMaterial, + WellScreen, +) +from db.engine import session_ctx +from tests import client, get_parameter_id + +POINT_ID = "NGWMN-TEST-0001" + + +@pytest.fixture(scope="module") +def ngwmn_well(): + """A public water well with casing, screen, lithology, and water levels.""" + with session_ctx() as session: + thing = Thing( + name=POINT_ID, + thing_type="water well", + release_status="public", + well_depth=150.0, + well_casing_depth=120.5, + ) + session.add(thing) + session.flush() + + session.add( + WellScreen( + thing_id=thing.id, + screen_depth_top=80.0, + screen_depth_bottom=120.0, + screen_description="4in slotted", + release_status="public", + ) + ) + session.add( + WellCasingMaterial( + thing_id=thing.id, material="Steel", release_status="public" + ) + ) + + formation = GeologicFormation( + formation_code=None, + lithology="Sandstone", + release_status="public", + ) + session.add(formation) + session.flush() + session.add( + ThingGeologicFormationAssociation( + thing_id=thing.id, + geologic_formation_id=formation.id, + top_depth=0.0, + bottom_depth=60.0, + release_status="public", + ) + ) + + event = FieldEvent( + thing_id=thing.id, + event_date="2024-03-15T19:00:00Z", + release_status="public", + ) + session.add(event) + session.flush() + activity = FieldActivity( + field_event_id=event.id, + activity_type="groundwater level", + release_status="public", + ) + session.add(activity) + session.flush() + + parameter_id = get_parameter_id("groundwater level", "Field Parameter") + + observations = [ + # Real time component: 19:00 UTC is 13:00 MDT, so the measured + # date is 2024-03-15 local. BGS = 50.0 - 2.5 = 47.50. + { + "sample_name": f"{POINT_ID}-wl-1", + "sample_method": "Steel-tape measurement", + "observation_datetime": "2024-03-15T19:00:00Z", + "value": 50.0, + "measuring_point_height": 2.5, + "nma_data_quality": "Water level accurate to within two hundreths of a foot", + "release_status": "public", + }, + # Midnight UTC means no time was measured during transfer, so the + # UTC date is kept. No MP height: BGS = value. + { + "sample_name": f"{POINT_ID}-wl-2", + "sample_method": "Pressure-gage measurement", + "observation_datetime": "2024-04-01T00:00:00Z", + "value": 33.0, + "measuring_point_height": None, + "nma_data_quality": "Water level accurate to within one foot", + "release_status": "public", + }, + # Private observations must not appear in the NGWMN export. + { + "sample_name": f"{POINT_ID}-wl-3", + "sample_method": "Steel-tape measurement", + "observation_datetime": "2024-05-01T00:00:00Z", + "value": 12.0, + "measuring_point_height": None, + "nma_data_quality": None, + "release_status": "private", + }, + ] + for obs in observations: + sample = Sample( + field_activity_id=activity.id, + sample_date=obs["observation_datetime"], + sample_name=obs["sample_name"], + sample_matrix="water", + sample_method=obs["sample_method"], + qc_type="Normal", + release_status=obs["release_status"], + ) + session.add(sample) + session.flush() + session.add( + Observation( + sample_id=sample.id, + parameter_id=parameter_id, + observation_datetime=obs["observation_datetime"], + value=obs["value"], + unit="ft", + measuring_point_height=obs["measuring_point_height"], + nma_data_quality=obs["nma_data_quality"], + release_status=obs["release_status"], + ) + ) + session.commit() + thing_id = thing.id + formation_id = formation.id + + yield POINT_ID + + with session_ctx() as session: + # Thing delete cascades to screens, casing materials, field events + # (and through to samples/observations), and formation associations. + session.execute(delete(Thing).where(Thing.id == thing_id)) + session.execute( + delete(GeologicFormation).where(GeologicFormation.id == formation_id) + ) + session.commit() + + +def test_ngwmn_waterlevels(ngwmn_well): + response = client.get(f"/ngwmn/waterlevels/{ngwmn_well}") + assert response.status_code == 200 + + root = etree.fromstring(response.content) + assert root.tag == "WaterLevels" + levels = root.findall("WaterLevel") + assert len(levels) == 2 + + first, second = levels + assert first.findtext("PointID") == ngwmn_well + assert first.findtext("DepthFromLandSurfaceData") == "47.50" + assert first.findtext("WaterLevelUnits") == "ft bgs" + assert first.findtext("MeasuringMethod") == "Steel tape" + assert first.findtext("MeasurementYear") == "2024" + assert first.findtext("MeasurementMonth") == "3" + assert first.findtext("MeasurementDay") == "15" + assert first.findtext("WaterLevelAccuracy") == "0.02 ft" + + assert second.findtext("DepthFromLandSurfaceData") == "33.00" + assert second.findtext("MeasuringMethod") == "Pressure gauge" + assert second.findtext("MeasurementMonth") == "4" + assert second.findtext("MeasurementDay") == "1" + assert second.findtext("WaterLevelAccuracy") == "1.0 ft" + + +def test_ngwmn_wellconstruction(ngwmn_well): + response = client.get(f"/ngwmn/wellconstruction/{ngwmn_well}") + assert response.status_code == 200 + + root = etree.fromstring(response.content) + assert root.tag == "Casings" + casings = root.findall("Casing") + assert len(casings) == 1 + + casing = casings[0] + assert casing.findtext("PointID") == ngwmn_well + assert float(casing.findtext("CasingTop")) == 0.0 + assert float(casing.findtext("CasingBottom")) == 120.5 + assert casing.findtext("CasingDepthUnits") == "ft bgs" + assert float(casing.findtext("ScreenTop")) == 80.0 + assert float(casing.findtext("ScreenBottom")) == 120.0 + assert casing.findtext("ScreenDescription") == "4in slotted" + + +def test_ngwmn_lithology(ngwmn_well): + response = client.get(f"/ngwmn/lithology/{ngwmn_well}") + assert response.status_code == 200 + + root = etree.fromstring(response.content) + assert root.tag == "Lithologies" + lithologies = root.findall("Lithology") + assert len(lithologies) == 1 + + lithology = lithologies[0] + assert lithology.findtext("PointID") == ngwmn_well + assert float(lithology.findtext("TopDepth")) == 0.0 + assert float(lithology.findtext("BottomDepth")) == 60.0 + assert lithology.findtext("Units") == "feet" + assert lithology.findtext("Description") == "Sandstone" + + +def test_ngwmn_unknown_pointid_returns_empty(): + response = client.get("/ngwmn/waterlevels/NO-SUCH-POINTID") + assert response.status_code == 200 + root = etree.fromstring(response.content) + assert root.tag == "WaterLevels" + assert len(root.findall("WaterLevel")) == 0 + + +# ============= EOF ============================================= From 7851008011959784a2c35d2bc73c299f528fea5a Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 16:12:09 -0600 Subject: [PATCH 3/8] feat: add transducer_daily_data materialized view Aggregate raw transducer observations into one row per well, parameter, day, and QC status, replacing the legacy NMA_WaterLevelsContinuous_Pressure_Daily table as the daily rollup. Includes a unique index so the view supports REFRESH MATERIALIZED VIEW CONCURRENTLY via the refresh CLI. Co-Authored-By: Claude Fable 5 --- ...transducer_daily_data_materialized_view.py | 116 +++++++++++++ tests/test_transducer_daily_data.py | 162 ++++++++++++++++++ 2 files changed, 278 insertions(+) create mode 100644 alembic/versions/v0w1x2y3z4a5_add_transducer_daily_data_materialized_view.py create mode 100644 tests/test_transducer_daily_data.py diff --git a/alembic/versions/v0w1x2y3z4a5_add_transducer_daily_data_materialized_view.py b/alembic/versions/v0w1x2y3z4a5_add_transducer_daily_data_materialized_view.py new file mode 100644 index 00000000..c8d85984 --- /dev/null +++ b/alembic/versions/v0w1x2y3z4a5_add_transducer_daily_data_materialized_view.py @@ -0,0 +1,116 @@ +"""add transducer daily data materialized view + +Aggregates raw transducer observations into one row per well, parameter, +day, and QC status. This is the new-model replacement for the legacy +NMA_WaterLevelsContinuous_Pressure_Daily table, which AMPAPI rebuilt +nightly from the raw continuous pressure record. + +Revision ID: v0w1x2y3z4a5 +Revises: u8v9w0x1y2z3 +Create Date: 2026-06-11 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "v0w1x2y3z4a5" +down_revision: Union[str, Sequence[str], None] = "u8v9w0x1y2z3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +REQUIRED_TABLES = { + "transducer_observation", + "deployment", + "thing", + "parameter", +} + +DROP_VIEW_SQL = "DROP MATERIALIZED VIEW IF EXISTS transducer_daily_data" + + +def _create_transducer_daily_data_view() -> str: + # transducer_observation.value is depth to water in feet below ground + # surface (the transfer wrote DepthToWaterBGS directly), so the daily + # depth columns need no measuring-point correction. + # + # The transfer wrote legacy timestamps unshifted (naive local clock + # readings stored as UTC), so bucketing on the UTC date preserves the + # original measurement dates. + # + # QC status: the transfer marked QCed rows release_status='public' and + # un-reviewed rows 'private', so qced mirrors the legacy QCed flag. + return """ + CREATE MATERIALIZED VIEW transducer_daily_data AS + SELECT + d.thing_id, + t.name AS point_id, + tob.parameter_id, + p.parameter_name, + (tob.observation_datetime AT TIME ZONE 'UTC')::date AS date_measured, + (tob.release_status = 'public') AS qced, + avg(tob.value) AS depth_to_water_bgs, + min(tob.value) AS depth_to_water_bgs_min, + max(tob.value) AS depth_to_water_bgs_max, + count(*) AS measurement_count, + min(tob.observation_datetime) AS first_measurement_at, + max(tob.observation_datetime) AS last_measurement_at, + avg(tob.nma_waterlevelscontinuous_pressure_temperature_water) AS temperature_water, + avg(tob.nma_waterlevelscontinuous_pressure_water_head) AS water_head, + avg(tob.nma_waterlevelscontinuous_pressure_water_head_adjusted) AS water_head_adjusted, + avg(tob.nma_waterlevelscontinuous_pressure_conddl_ms_cm) AS conddl_ms_cm + FROM transducer_observation AS tob + JOIN deployment AS d ON d.id = tob.deployment_id + JOIN thing AS t ON t.id = d.thing_id + JOIN parameter AS p ON p.id = tob.parameter_id + GROUP BY + d.thing_id, + t.name, + tob.parameter_id, + p.parameter_name, + (tob.observation_datetime AT TIME ZONE 'UTC')::date, + (tob.release_status = 'public') + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + missing = REQUIRED_TABLES - existing_tables + if missing: + raise RuntimeError( + "Cannot create transducer_daily_data. Missing required tables: " + f"{', '.join(sorted(missing))}" + ) + + op.execute(text(DROP_VIEW_SQL)) + op.execute(text(_create_transducer_daily_data_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW transducer_daily_data IS " + "'Daily aggregates of transducer observations per well, parameter, " + "and QC status. Replacement for the legacy " + "NMA_WaterLevelsContinuous_Pressure_Daily table. Refresh with " + "REFRESH MATERIALIZED VIEW CONCURRENTLY transducer_daily_data.'" + ) + ) + # Unique index required for REFRESH MATERIALIZED VIEW CONCURRENTLY. + op.execute( + text( + "CREATE UNIQUE INDEX ux_transducer_daily_data_key " + "ON transducer_daily_data (thing_id, parameter_id, date_measured, qced)" + ) + ) + op.execute( + text( + "CREATE INDEX ix_transducer_daily_data_point_id_date " + "ON transducer_daily_data (point_id, date_measured)" + ) + ) + + +def downgrade() -> None: + op.execute(text(DROP_VIEW_SQL)) diff --git a/tests/test_transducer_daily_data.py b/tests/test_transducer_daily_data.py new file mode 100644 index 00000000..9d5d3235 --- /dev/null +++ b/tests/test_transducer_daily_data.py @@ -0,0 +1,162 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Tests for the transducer_daily_data materialized view, which aggregates raw +transducer observations into one row per well, parameter, day, and QC status. +""" + +from datetime import date + +import pytest +from sqlalchemy import delete, text + +from db import Deployment, Sensor, Thing, TransducerObservation +from db.engine import session_ctx +from tests import get_parameter_id + +POINT_ID = "TDD-TEST-0001" + + +def _refresh_view(session): + session.execute(text("REFRESH MATERIALIZED VIEW transducer_daily_data")) + session.commit() + + +@pytest.fixture(scope="module") +def transducer_well(): + """A well with a transducer deployment and two days of observations.""" + with session_ctx() as session: + thing = Thing(name=POINT_ID, thing_type="water well", release_status="public") + session.add(thing) + session.flush() + + sensor = Sensor( + name=f"{POINT_ID}-transducer", + sensor_type="Pressure Transducer", + release_status="public", + ) + session.add(sensor) + session.flush() + + deployment = Deployment( + thing_id=thing.id, + sensor_id=sensor.id, + installation_date="2024-01-01", + release_status="public", + ) + session.add(deployment) + session.flush() + + parameter_id = get_parameter_id("groundwater level", "Field Parameter") + + observations = [ + # Day 1: three QCed readings -> avg 12.0, min 10.0, max 14.0. + ("2024-03-15T06:00:00Z", 10.0, "public", 8.5), + ("2024-03-15T12:00:00Z", 12.0, "public", 9.0), + ("2024-03-15T18:00:00Z", 14.0, "public", 9.5), + # Day 1: one un-QCed reading -> separate row. + ("2024-03-15T13:00:00Z", 99.0, "private", None), + # Day 2: two QCed readings -> avg 21.0. + ("2024-03-16T06:00:00Z", 20.0, "public", None), + ("2024-03-16T18:00:00Z", 22.0, "public", None), + ] + for dt, value, release_status, temperature in observations: + session.add( + TransducerObservation( + deployment_id=deployment.id, + parameter_id=parameter_id, + observation_datetime=dt, + value=value, + release_status=release_status, + nma_waterlevelscontinuous_pressure_temperature_water=temperature, + ) + ) + session.commit() + thing_id = thing.id + sensor_id = sensor.id + + _refresh_view(session) + + yield thing_id + + with session_ctx() as session: + # Thing delete cascades to the deployment and its observations. + session.execute(delete(Thing).where(Thing.id == thing_id)) + session.execute(delete(Sensor).where(Sensor.id == sensor_id)) + session.commit() + _refresh_view(session) + + +def _fetch_rows(session, thing_id): + return ( + session.execute( + text( + "SELECT * FROM transducer_daily_data " + "WHERE thing_id = :thing_id ORDER BY date_measured, qced" + ), + {"thing_id": thing_id}, + ) + .mappings() + .all() + ) + + +def test_transducer_daily_data_aggregation(transducer_well): + with session_ctx() as session: + rows = _fetch_rows(session, transducer_well) + + assert len(rows) == 3 + + day1_private, day1_public, day2_public = rows + + assert day1_private["point_id"] == POINT_ID + assert day1_private["parameter_name"] == "groundwater level" + assert day1_private["date_measured"] == date(2024, 3, 15) + assert day1_private["qced"] is False + assert day1_private["measurement_count"] == 1 + assert day1_private["depth_to_water_bgs"] == pytest.approx(99.0) + + assert day1_public["date_measured"] == date(2024, 3, 15) + assert day1_public["qced"] is True + assert day1_public["measurement_count"] == 3 + assert day1_public["depth_to_water_bgs"] == pytest.approx(12.0) + assert day1_public["depth_to_water_bgs_min"] == pytest.approx(10.0) + assert day1_public["depth_to_water_bgs_max"] == pytest.approx(14.0) + assert day1_public["temperature_water"] == pytest.approx(9.0) + + assert day2_public["date_measured"] == date(2024, 3, 16) + assert day2_public["qced"] is True + assert day2_public["measurement_count"] == 2 + assert day2_public["depth_to_water_bgs"] == pytest.approx(21.0) + assert day2_public["temperature_water"] is None + + +def test_transducer_daily_data_concurrent_refresh(transducer_well): + # The unique index on (thing_id, parameter_id, date_measured, qced) must + # support CONCURRENTLY, which the refresh CLI uses in production. + from db.engine import engine + + with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + conn.execute( + text("REFRESH MATERIALIZED VIEW CONCURRENTLY transducer_daily_data") + ) + + with session_ctx() as session: + rows = _fetch_rows(session, transducer_well) + assert len(rows) == 3 + + +# ============= EOF ============================================= From 7036a629f1818ac12a8e68a74e97b36acfc22c3c Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 16:23:20 -0600 Subject: [PATCH 4/8] feat: source NGWMN continuous water levels from transducer_daily_data Replace the legacy NMA_WaterLevelsContinuous_Pressure_Daily query in make_waterlevels_response with the transducer_daily_data materialized view, and simplify the merge/XML code to use the view's (point_id, date_measured, depth_to_water_bgs) rows instead of the 19-column positional legacy layout. The merge rule is unchanged: on overlapping dates the manual reading wins when shallower, and only one record is emitted per date. Guard the depth comparison against NULLs. Co-Authored-By: Claude Fable 5 --- services/ngwmn_helper.py | 130 +++++++++++------------------ tests/test_ngwmn_endpoints.py | 152 +++++++++++++++++++++++++++++++++- 2 files changed, 198 insertions(+), 84 deletions(-) diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py index 94b52c11..c6aa55dd 100644 --- a/services/ngwmn_helper.py +++ b/services/ngwmn_helper.py @@ -59,8 +59,11 @@ def make_waterlevels_response(point_id, db): 'order by "DateMeasured"' ) sql2 = ( - 'select * from "NMA_WaterLevelsContinuous_Pressure_Daily" where "PointID"=:point_id and "QCed" is true ' - 'order by "DateMeasured"' + "select point_id, date_measured, depth_to_water_bgs " + "from transducer_daily_data " + "where point_id=:point_id and qced is true " + "and parameter_name='groundwater level' " + "order by date_measured" ) return make_xml_response(db, (sql, sql2), point_id, water_levels_xml2) @@ -76,59 +79,43 @@ def water_levels_xml(records): def water_levels_xml2(manual, pressure): + """ + Merge manual measurements (view_NGWMN_WaterLevels rows) with daily + transducer aggregates (transducer_daily_data rows). Both row types carry + (PointID, date, depth to water bgs, ...) in their first three columns. + + When both sources have a measurement on the same date, the manual reading + wins if it is shallower; either way only one record is emitted per date. + """ if not pressure: return make_xml("WaterLevels", manual, make_water_level) - else: - root = etree.Element("WaterLevels") - # doc = etree.ElementTree(root) - - columns = [ - "GlobalID", - "OBJECTID", - "WellID", - "PointID", - "DateMeasured", - "TemperatureWater", - "WaterHead", - "WaterHeadAdjusted", - "DepthToWaterBGS", - "MeasurementMethod", - "DataSource", - "MeasuringAgency", - "QCed", - "Notes", - "Created", - "Updated", - "ProcessedBy", - "CheckedBy", - "CONDDL (mS/cm)", - ] - - manual_dates = [r[1] for r in manual] - records = [] - for r in pressure: - dm = r[columns.index("DateMeasured")] - tag = "pressure" - if dm.date() in manual_dates: - ri = next((ri for ri in manual if ri[1] == dm.date())) - if ri[2] < r[columns.index("DepthToWaterBGS")]: - r = ri - tag = "manual" - manual.remove(ri) - - records.append((tag, r)) - - for mi in manual: - records.append(("manual", mi)) - - for k, record in sorted( - records, key=lambda r: r[1][4].date() if r[0] == "pressure" else r[1][1] - ): - if k == "pressure": - make_continuous_water_level(root, record) - else: - make_water_level(root, record) - return etree.tostring(root) + + root = etree.Element("WaterLevels") + + manual = list(manual) + manual_dates = [r[1] for r in manual] + records = [] + for r in pressure: + dm = r[1] + tag = "pressure" + if dm in manual_dates: + ri = next(ri for ri in manual if ri[1] == dm) + if ri[2] is not None and r[2] is not None and ri[2] < r[2]: + r = ri + tag = "manual" + manual.remove(ri) + + records.append((tag, r)) + + for mi in manual: + records.append(("manual", mi)) + + for k, record in sorted(records, key=lambda r: r[1][1]): + if k == "pressure": + make_continuous_water_level(root, record) + else: + make_water_level(root, record) + return etree.tostring(root) def well_construction_xml(records): @@ -153,39 +140,16 @@ def make_xml(name, records, make_record): # ==================== make records ======================= def make_continuous_water_level(root, r): + """ + r is a transducer_daily_data row: (point_id, date_measured, depth_to_water_bgs) + """ elem = etree.SubElement(root, "WaterLevel") - make_point_id(elem, r, idx=3) - - columns = [ - "GlobalID", - "OBJECTID", - "WellID", - "PointID", - "DateMeasured", - "TemperatureWater", - "WaterHead", - "WaterHeadAdjusted", - "DepthToWaterBGS", - "MeasurementMethod", - "DataSource", - "MeasuringAgency", - "QCed", - "Notes", - "Created", - "Updated", - "ProcessedBy", - "CheckedBy", - "CONDDL (mS/cm)", - ] - - m = r[columns.index("DateMeasured")] + make_point_id(elem, r) + + m = r[1] - # m = datetime.strptime(m, '%Y-%m-%d') for attr, val in ( - ( - "DepthFromLandSurfaceData", - "{:0.2f}".format(r[columns.index("DepthToWaterBGS")]), - ), + ("DepthFromLandSurfaceData", "{:0.2f}".format(r[2])), ("WaterLevelUnits", "ft bgs"), ("MeasuringMethod", "Pressure Transducer"), ("MeasurementMonth", m.month), diff --git a/tests/test_ngwmn_endpoints.py b/tests/test_ngwmn_endpoints.py index 32db2736..7998d31e 100644 --- a/tests/test_ngwmn_endpoints.py +++ b/tests/test_ngwmn_endpoints.py @@ -23,16 +23,19 @@ from xml.etree import ElementTree as etree import pytest -from sqlalchemy import delete +from sqlalchemy import delete, text from db import ( + Deployment, FieldActivity, FieldEvent, GeologicFormation, Observation, Sample, + Sensor, Thing, ThingGeologicFormationAssociation, + TransducerObservation, WellCasingMaterial, WellScreen, ) @@ -40,6 +43,7 @@ from tests import client, get_parameter_id POINT_ID = "NGWMN-TEST-0001" +MERGED_POINT_ID = "NGWMN-TEST-0002" @pytest.fixture(scope="module") @@ -179,6 +183,118 @@ def ngwmn_well(): session.commit() +@pytest.fixture(scope="module") +def ngwmn_merged_well(): + """A well with both manual water levels and daily transducer aggregates.""" + with session_ctx() as session: + thing = Thing( + name=MERGED_POINT_ID, thing_type="water well", release_status="public" + ) + session.add(thing) + session.flush() + + parameter_id = get_parameter_id("groundwater level", "Field Parameter") + + event = FieldEvent( + thing_id=thing.id, + event_date="2024-03-15T19:00:00Z", + release_status="public", + ) + session.add(event) + session.flush() + activity = FieldActivity( + field_event_id=event.id, + activity_type="groundwater level", + release_status="public", + ) + session.add(activity) + session.flush() + + manual_levels = [ + # 2024-03-15 local date, BGS = 50.0 - 2.5 = 47.50 + ("2024-03-15T19:00:00Z", 50.0, 2.5), + # 2024-04-01 (midnight UTC kept as-is), BGS = 33.00 + ("2024-04-01T00:00:00Z", 33.0, None), + ] + for i, (dt, value, mph) in enumerate(manual_levels): + sample = Sample( + field_activity_id=activity.id, + sample_date=dt, + sample_name=f"{MERGED_POINT_ID}-wl-{i}", + sample_matrix="water", + sample_method="Steel-tape measurement", + qc_type="Normal", + release_status="public", + ) + session.add(sample) + session.flush() + session.add( + Observation( + sample_id=sample.id, + parameter_id=parameter_id, + observation_datetime=dt, + value=value, + unit="ft", + measuring_point_height=mph, + nma_data_quality=None, + release_status="public", + ) + ) + + sensor = Sensor( + name=f"{MERGED_POINT_ID}-transducer", + sensor_type="Pressure Transducer", + release_status="public", + ) + session.add(sensor) + session.flush() + deployment = Deployment( + thing_id=thing.id, + sensor_id=sensor.id, + installation_date="2024-01-01", + release_status="public", + ) + session.add(deployment) + session.flush() + + transducer_readings = [ + # 2024-03-15 daily avg 50.0: manual 47.50 is shallower and wins. + ("2024-03-15T06:00:00Z", 49.0), + ("2024-03-15T18:00:00Z", 51.0), + # 2024-03-20 daily avg 30.0: transducer-only date. + ("2024-03-20T06:00:00Z", 29.0), + ("2024-03-20T18:00:00Z", 31.0), + # 2024-04-01 daily avg 20.0: manual 33.00 is deeper and loses. + ("2024-04-01T06:00:00Z", 19.0), + ("2024-04-01T18:00:00Z", 21.0), + ] + for dt, value in transducer_readings: + session.add( + TransducerObservation( + deployment_id=deployment.id, + parameter_id=parameter_id, + observation_datetime=dt, + value=value, + release_status="public", + ) + ) + session.commit() + thing_id = thing.id + sensor_id = sensor.id + + session.execute(text("REFRESH MATERIALIZED VIEW transducer_daily_data")) + session.commit() + + yield MERGED_POINT_ID + + with session_ctx() as session: + session.execute(delete(Thing).where(Thing.id == thing_id)) + session.execute(delete(Sensor).where(Sensor.id == sensor_id)) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW transducer_daily_data")) + session.commit() + + def test_ngwmn_waterlevels(ngwmn_well): response = client.get(f"/ngwmn/waterlevels/{ngwmn_well}") assert response.status_code == 200 @@ -241,6 +357,40 @@ def test_ngwmn_lithology(ngwmn_well): assert lithology.findtext("Description") == "Sandstone" +def test_ngwmn_waterlevels_merges_manual_and_transducer(ngwmn_merged_well): + response = client.get(f"/ngwmn/waterlevels/{ngwmn_merged_well}") + assert response.status_code == 200 + + root = etree.fromstring(response.content) + assert root.tag == "WaterLevels" + levels = root.findall("WaterLevel") + assert len(levels) == 3 + + first, second, third = levels + + # Same-date overlap where the manual reading is shallower: manual wins. + assert first.findtext("PointID") == ngwmn_merged_well + assert first.findtext("DepthFromLandSurfaceData") == "47.50" + assert first.findtext("MeasuringMethod") == "Steel tape" + assert first.findtext("MeasurementMonth") == "3" + assert first.findtext("MeasurementDay") == "15" + + # Transducer-only date: daily average is emitted. + assert second.findtext("DepthFromLandSurfaceData") == "30.00" + assert second.findtext("MeasuringMethod") == "Pressure Transducer" + assert second.findtext("WaterLevelUnits") == "ft bgs" + assert second.findtext("WaterLevelAccuracy") == "0.02 ft" + assert second.findtext("MeasurementMonth") == "3" + assert second.findtext("MeasurementDay") == "20" + + # Same-date overlap where the manual reading is deeper: transducer wins + # and the manual record is dropped. + assert third.findtext("DepthFromLandSurfaceData") == "20.00" + assert third.findtext("MeasuringMethod") == "Pressure Transducer" + assert third.findtext("MeasurementMonth") == "4" + assert third.findtext("MeasurementDay") == "1" + + def test_ngwmn_unknown_pointid_returns_empty(): response = client.get("/ngwmn/waterlevels/NO-SUCH-POINTID") assert response.status_code == 200 From b00d0a43038e46edff10e1d9aef2a2d330d63653 Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 16:29:52 -0600 Subject: [PATCH 5/8] refactor: rename NGWMN views to drop view_ prefix view_NGWMN_WaterLevels, view_NGWMN_WellConstruction, and view_NGWMN_Lithology become NGWMN_WaterLevels, NGWMN_WellConstruction, and NGWMN_Lithology. The migration has not shipped, so it is edited in place rather than adding a rename migration. References to the legacy AMPAPI view names and CSV source tables keep the view_ prefix. Co-Authored-By: Claude Fable 5 --- ...2z3_add_ngwmn_views_from_ocotillo_model.py | 24 +++++++++---------- services/ngwmn_helper.py | 8 +++---- tests/test_ngwmn_endpoints.py | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py index d878e36f..bad96a0e 100644 --- a/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py +++ b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py @@ -4,9 +4,9 @@ exports. These views reproduce the original AMPAPI (SQL Server) view definitions but read from the new Ocotillo tables: -- view_NGWMN_WaterLevels: observation/sample/field_activity/field_event/thing -- view_NGWMN_WellConstruction: thing/well_screen/well_casing_material -- view_NGWMN_Lithology: thing_geologic_formation_association/geologic_formation +- NGWMN_WaterLevels: observation/sample/field_activity/field_event/thing +- NGWMN_WellConstruction: thing/well_screen/well_casing_material +- NGWMN_Lithology: thing_geologic_formation_association/geologic_formation Revision ID: u8v9w0x1y2z3 Revises: t6u7v8w9x0y1 @@ -37,9 +37,9 @@ "geologic_formation", } -DROP_WATERLEVELS_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_WaterLevels"' -DROP_WELLCONSTRUCTION_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_WellConstruction"' -DROP_LITHOLOGY_SQL = 'DROP VIEW IF EXISTS "view_NGWMN_Lithology"' +DROP_WATERLEVELS_SQL = 'DROP VIEW IF EXISTS "NGWMN_WaterLevels"' +DROP_WELLCONSTRUCTION_SQL = 'DROP VIEW IF EXISTS "NGWMN_WellConstruction"' +DROP_LITHOLOGY_SQL = 'DROP VIEW IF EXISTS "NGWMN_Lithology"' def _create_waterlevels_view() -> str: @@ -63,7 +63,7 @@ def _create_waterlevels_view() -> str: # and LU_DataQuality), including the legacy quirk mapping code O # ("Observed...") to 'Acoustic Sounder'. return """ - CREATE VIEW "view_NGWMN_WaterLevels" AS + CREATE VIEW "NGWMN_WaterLevels" AS SELECT t.name AS "PointID", CASE @@ -109,7 +109,7 @@ def _create_wellconstruction_view() -> str: # to controlled material terms during transfer, so it is rebuilt here as # a comma-separated list of well_casing_material terms. return """ - CREATE VIEW "view_NGWMN_WellConstruction" AS + CREATE VIEW "NGWMN_WellConstruction" AS SELECT t.name AS "PointID", CASE WHEN t.well_casing_depth IS NOT NULL THEN 0::double precision END AS "CasingTop", @@ -139,7 +139,7 @@ def _create_lithology_view() -> str: # Lithology and TERM columns. The inner join is reproduced by requiring # a non-null lithology. StratSource was not migrated and is NULL. return """ - CREATE VIEW "view_NGWMN_Lithology" AS + CREATE VIEW "NGWMN_Lithology" AS SELECT tgfa.id AS "OBJECTID", t.name AS "PointID", @@ -172,7 +172,7 @@ def upgrade() -> None: op.execute(text(_create_waterlevels_view())) op.execute( text( - 'COMMENT ON VIEW "view_NGWMN_WaterLevels" IS ' + 'COMMENT ON VIEW "NGWMN_WaterLevels" IS ' "'Public manual groundwater level measurements in the NGWMN " "exchange format, sourced from the Ocotillo observation model.'" ) @@ -182,7 +182,7 @@ def upgrade() -> None: op.execute(text(_create_wellconstruction_view())) op.execute( text( - 'COMMENT ON VIEW "view_NGWMN_WellConstruction" IS ' + 'COMMENT ON VIEW "NGWMN_WellConstruction" IS ' "'Well casing and screen intervals in the NGWMN exchange format, " "sourced from the Ocotillo thing/well_screen model.'" ) @@ -192,7 +192,7 @@ def upgrade() -> None: op.execute(text(_create_lithology_view())) op.execute( text( - 'COMMENT ON VIEW "view_NGWMN_Lithology" IS ' + 'COMMENT ON VIEW "NGWMN_Lithology" IS ' "'Lithology intervals in the NGWMN exchange format, sourced from " "the Ocotillo geologic formation associations.'" ) diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py index c6aa55dd..77431b9c 100644 --- a/services/ngwmn_helper.py +++ b/services/ngwmn_helper.py @@ -39,7 +39,7 @@ def make_xml_response(db, sql, point_id, func): def make_lithology_response(point_id, db): sql = ( 'select "PointID", "StratTop", "StratBottom", "TERM" ' - 'from "view_NGWMN_Lithology" where "PointID"=:point_id' + 'from "NGWMN_Lithology" where "PointID"=:point_id' ) return make_xml_response(db, sql, point_id, lithology_xml) @@ -48,14 +48,14 @@ def make_well_construction_response(point_id, db): sql = ( 'select "PointID", "CasingTop", "CasingBottom", "CasingDepthUnits", ' '"ScreenTop", "ScreenBottom", "ScreenBottomUnit", "ScreenDescription", "CasingDescription" ' - 'from "view_NGWMN_WellConstruction" where "PointID"=:point_id' + 'from "NGWMN_WellConstruction" where "PointID"=:point_id' ) return make_xml_response(db, sql, point_id, well_construction_xml) def make_waterlevels_response(point_id, db): sql = ( - 'select * from "view_NGWMN_WaterLevels" where "PointID"=:point_id ' + 'select * from "NGWMN_WaterLevels" where "PointID"=:point_id ' 'order by "DateMeasured"' ) sql2 = ( @@ -80,7 +80,7 @@ def water_levels_xml(records): def water_levels_xml2(manual, pressure): """ - Merge manual measurements (view_NGWMN_WaterLevels rows) with daily + Merge manual measurements (NGWMN_WaterLevels rows) with daily transducer aggregates (transducer_daily_data rows). Both row types carry (PointID, date, depth to water bgs, ...) in their first three columns. diff --git a/tests/test_ngwmn_endpoints.py b/tests/test_ngwmn_endpoints.py index 7998d31e..631ca7dd 100644 --- a/tests/test_ngwmn_endpoints.py +++ b/tests/test_ngwmn_endpoints.py @@ -14,7 +14,7 @@ # limitations under the License. # =============================================================================== """ -Tests for the /ngwmn endpoints backed by the view_NGWMN_* views, which are +Tests for the /ngwmn endpoints backed by the NGWMN_* views, which are sourced from the new Ocotillo data model (thing/well_screen/observation/ thing_geologic_formation_association) rather than the legacy NMA_view_NGWMN_* copy tables. From ccd1f7bbddbdf0546096a215c87a93d5f7627be9 Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 11 Jun 2026 16:30:39 -0600 Subject: [PATCH 6/8] chore: ignore Claude Code local settings and worktrees Co-Authored-By: Claude Fable 5 --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 92ab7e91..3c93f283 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,7 @@ app.yaml docs/ #Codex -.codex \ No newline at end of file +.codex +# Claude Code +.claude/settings.local.json +.claude/worktrees/ From b5a12ea2fb9806bdb2a28b5c86e095feae9462c4 Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 12 Jun 2026 09:24:41 -0600 Subject: [PATCH 7/8] refactor: use ORM view models in ngwmn_helper instead of raw SQL Add read-only ORM mappings for the NGWMN_* views and the transducer_daily_data materialized view on a separate declarative base, kept out of Base.metadata so Alembic autogenerate does not try to create tables for them. The NGWMN response builders now query these models column-wise instead of executing raw SQL strings. Co-Authored-By: Claude Fable 5 --- db/ngwmn_views.py | 120 +++++++++++++++++++++++++++++++++++++++ services/ngwmn_helper.py | 90 ++++++++++++++++++----------- 2 files changed, 178 insertions(+), 32 deletions(-) create mode 100644 db/ngwmn_views.py diff --git a/db/ngwmn_views.py b/db/ngwmn_views.py new file mode 100644 index 00000000..4b843695 --- /dev/null +++ b/db/ngwmn_views.py @@ -0,0 +1,120 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Read-only ORM mappings over database views. + +These models use their own declarative base, deliberately kept out of +``db.Base.metadata``: the views are created by hand-written Alembic +migrations, and registering them with the main metadata would make +autogenerate try to emit CREATE TABLE statements for them. + +The primary keys declared here exist only to satisfy the ORM mapper. +Query individual columns (``session.query(Model.col, ...)``) rather than +full entities, so the identity map cannot silently collapse view rows +that happen to share a key. +""" + +from datetime import date, datetime + +from sqlalchemy import BigInteger, Boolean, Date, DateTime, Float, Integer, String +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class ViewBase(DeclarativeBase): + """Declarative base for view mappings, excluded from Alembic metadata.""" + + +class NGWMNWaterLevels(ViewBase): + """The NGWMN_WaterLevels view (manual groundwater level measurements).""" + + __tablename__ = "NGWMN_WaterLevels" + + point_id: Mapped[str] = mapped_column("PointID", String, primary_key=True) + date_measured: Mapped[date] = mapped_column("DateMeasured", Date, primary_key=True) + depth_to_water_bgs: Mapped[float | None] = mapped_column("DepthToWaterBGS", Float) + wl_units: Mapped[str | None] = mapped_column("WLUnits", String) + measurement_method: Mapped[str | None] = mapped_column("MeasurementMethod", String) + wl_accuracy: Mapped[str | None] = mapped_column("WLAccuracy", String) + public_release: Mapped[bool | None] = mapped_column("PublicRelease", Boolean) + + +class NGWMNWellConstruction(ViewBase): + """The NGWMN_WellConstruction view (casing and screen intervals).""" + + __tablename__ = "NGWMN_WellConstruction" + + point_id: Mapped[str] = mapped_column("PointID", String, primary_key=True) + casing_top: Mapped[float | None] = mapped_column( + "CasingTop", Float, primary_key=True + ) + casing_bottom: Mapped[float | None] = mapped_column("CasingBottom", Float) + casing_depth_units: Mapped[str | None] = mapped_column("CasingDepthUnits", String) + screen_top: Mapped[float | None] = mapped_column( + "ScreenTop", Float, primary_key=True + ) + screen_bottom: Mapped[float | None] = mapped_column("ScreenBottom", Float) + screen_bottom_unit: Mapped[str | None] = mapped_column("ScreenBottomUnit", String) + screen_description: Mapped[str | None] = mapped_column("ScreenDescription", String) + casing_description: Mapped[str | None] = mapped_column("CasingDescription", String) + + +class NGWMNLithology(ViewBase): + """The NGWMN_Lithology view (lithology intervals).""" + + __tablename__ = "NGWMN_Lithology" + + object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) + point_id: Mapped[str | None] = mapped_column("PointID", String) + lithology: Mapped[str | None] = mapped_column("Lithology", String) + term: Mapped[str | None] = mapped_column("TERM", String) + strat_source: Mapped[str | None] = mapped_column("StratSource", String) + strat_top: Mapped[float | None] = mapped_column("StratTop", Float) + strat_top_unit: Mapped[str | None] = mapped_column("StratTopUnit", String) + strat_bottom: Mapped[float | None] = mapped_column("StratBottom", Float) + strat_bottom_unit: Mapped[str | None] = mapped_column("StratBottomUnit", String) + + +class TransducerDailyData(ViewBase): + """ + The transducer_daily_data materialized view (daily aggregates of + transducer observations per well, parameter, and QC status). + """ + + __tablename__ = "transducer_daily_data" + + thing_id: Mapped[int] = mapped_column(Integer, primary_key=True) + parameter_id: Mapped[int] = mapped_column(Integer, primary_key=True) + date_measured: Mapped[date] = mapped_column(Date, primary_key=True) + qced: Mapped[bool] = mapped_column(Boolean, primary_key=True) + point_id: Mapped[str | None] = mapped_column(String) + parameter_name: Mapped[str | None] = mapped_column(String) + depth_to_water_bgs: Mapped[float | None] = mapped_column(Float) + depth_to_water_bgs_min: Mapped[float | None] = mapped_column(Float) + depth_to_water_bgs_max: Mapped[float | None] = mapped_column(Float) + measurement_count: Mapped[int | None] = mapped_column(BigInteger) + first_measurement_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True) + ) + last_measurement_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True) + ) + temperature_water: Mapped[float | None] = mapped_column(Float) + water_head: Mapped[float | None] = mapped_column(Float) + water_head_adjusted: Mapped[float | None] = mapped_column(Float) + conddl_ms_cm: Mapped[float | None] = mapped_column(Float) + + +# ============= EOF ============================================= diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py index 77431b9c..c48ea120 100644 --- a/services/ngwmn_helper.py +++ b/services/ngwmn_helper.py @@ -15,7 +15,12 @@ # =============================================================================== from xml.etree import ElementTree as etree -from sqlalchemy import text +from db.ngwmn_views import ( + NGWMNLithology, + NGWMNWaterLevels, + NGWMNWellConstruction, + TransducerDailyData, +) def _as_text(v): @@ -25,48 +30,69 @@ def _as_text(v): # NSMAP = dict(xsi="http://www.w3.org/2001/XMLSchema-instance", xsd="http://www.w3.org/2001/XMLSchema") -def make_xml_response(db, sql, point_id, func): - if not isinstance(sql, (tuple, list)): - sql = (sql,) - - rs = [] - for si in sql: - records = db.execute(text(si), {"point_id": point_id}) - rs.append(records.fetchall()) - return func(*rs) - - def make_lithology_response(point_id, db): - sql = ( - 'select "PointID", "StratTop", "StratBottom", "TERM" ' - 'from "NGWMN_Lithology" where "PointID"=:point_id' + records = ( + db.query( + NGWMNLithology.point_id, + NGWMNLithology.strat_top, + NGWMNLithology.strat_bottom, + NGWMNLithology.term, + ) + .filter(NGWMNLithology.point_id == point_id) + .all() ) - return make_xml_response(db, sql, point_id, lithology_xml) + return lithology_xml(records) def make_well_construction_response(point_id, db): - sql = ( - 'select "PointID", "CasingTop", "CasingBottom", "CasingDepthUnits", ' - '"ScreenTop", "ScreenBottom", "ScreenBottomUnit", "ScreenDescription", "CasingDescription" ' - 'from "NGWMN_WellConstruction" where "PointID"=:point_id' + records = ( + db.query( + NGWMNWellConstruction.point_id, + NGWMNWellConstruction.casing_top, + NGWMNWellConstruction.casing_bottom, + NGWMNWellConstruction.casing_depth_units, + NGWMNWellConstruction.screen_top, + NGWMNWellConstruction.screen_bottom, + NGWMNWellConstruction.screen_bottom_unit, + NGWMNWellConstruction.screen_description, + NGWMNWellConstruction.casing_description, + ) + .filter(NGWMNWellConstruction.point_id == point_id) + .all() ) - return make_xml_response(db, sql, point_id, well_construction_xml) + return well_construction_xml(records) def make_waterlevels_response(point_id, db): - sql = ( - 'select * from "NGWMN_WaterLevels" where "PointID"=:point_id ' - 'order by "DateMeasured"' + manual = ( + db.query( + NGWMNWaterLevels.point_id, + NGWMNWaterLevels.date_measured, + NGWMNWaterLevels.depth_to_water_bgs, + NGWMNWaterLevels.wl_units, + NGWMNWaterLevels.measurement_method, + NGWMNWaterLevels.wl_accuracy, + NGWMNWaterLevels.public_release, + ) + .filter(NGWMNWaterLevels.point_id == point_id) + .order_by(NGWMNWaterLevels.date_measured) + .all() ) - sql2 = ( - "select point_id, date_measured, depth_to_water_bgs " - "from transducer_daily_data " - "where point_id=:point_id and qced is true " - "and parameter_name='groundwater level' " - "order by date_measured" + pressure = ( + db.query( + TransducerDailyData.point_id, + TransducerDailyData.date_measured, + TransducerDailyData.depth_to_water_bgs, + ) + .filter( + TransducerDailyData.point_id == point_id, + TransducerDailyData.qced.is_(True), + TransducerDailyData.parameter_name == "groundwater level", + ) + .order_by(TransducerDailyData.date_measured) + .all() ) - - return make_xml_response(db, (sql, sql2), point_id, water_levels_xml2) + return water_levels_xml2(manual, pressure) # ==================== make xml ======================= From d7e7c5d18a2b42e25c5dbf03b13b4ee79144971b Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 12 Jun 2026 10:36:14 -0600 Subject: [PATCH 8/8] fix: restrict NGWMN exports to public records across joined entities The NGWMN views filtered only on observation.release_status, so a private thing (or private screen/casing/lithology rows) could still be exported. Require release_status='public' on the thing, field event, field activity, sample, and observation in NGWMN_WaterLevels; on the thing, well_screen, and well_casing_material rows in NGWMN_WellConstruction; and on the thing and formation association in NGWMN_Lithology. The transducer daily query in make_waterlevels_response now joins thing and requires it to be public as well. Co-Authored-By: Claude Fable 5 --- ...2z3_add_ngwmn_views_from_ocotillo_model.py | 12 +- services/ngwmn_helper.py | 3 + tests/test_ngwmn_endpoints.py | 130 ++++++++++++++++++ 3 files changed, 143 insertions(+), 2 deletions(-) diff --git a/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py index bad96a0e..bf76b540 100644 --- a/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py +++ b/alembic/versions/u8v9w0x1y2z3_add_ngwmn_views_from_ocotillo_model.py @@ -98,6 +98,10 @@ def _create_waterlevels_view() -> str: JOIN parameter AS p ON p.id = o.parameter_id WHERE p.parameter_name = 'groundwater level' AND o.release_status = 'public' + AND s.release_status = 'public' + AND fa.release_status = 'public' + AND fe.release_status = 'public' + AND t.release_status = 'public' """ @@ -121,13 +125,15 @@ def _create_wellconstruction_view() -> str: ws.screen_description AS "ScreenDescription", cm.materials AS "CasingDescription" FROM thing AS t - LEFT JOIN well_screen AS ws ON ws.thing_id = t.id + LEFT JOIN well_screen AS ws + ON ws.thing_id = t.id AND ws.release_status = 'public' LEFT JOIN LATERAL ( SELECT string_agg(wcm.material, ', ' ORDER BY wcm.material) AS materials FROM well_casing_material AS wcm - WHERE wcm.thing_id = t.id + WHERE wcm.thing_id = t.id AND wcm.release_status = 'public' ) AS cm ON TRUE WHERE t.thing_type = 'water well' + AND t.release_status = 'public' """ @@ -154,6 +160,8 @@ def _create_lithology_view() -> str: JOIN thing AS t ON t.id = tgfa.thing_id JOIN geologic_formation AS gf ON gf.id = tgfa.geologic_formation_id WHERE gf.lithology IS NOT NULL + AND tgfa.release_status = 'public' + AND t.release_status = 'public' """ diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py index c48ea120..d164507f 100644 --- a/services/ngwmn_helper.py +++ b/services/ngwmn_helper.py @@ -15,6 +15,7 @@ # =============================================================================== from xml.etree import ElementTree as etree +from db import Thing from db.ngwmn_views import ( NGWMNLithology, NGWMNWaterLevels, @@ -84,10 +85,12 @@ def make_waterlevels_response(point_id, db): TransducerDailyData.date_measured, TransducerDailyData.depth_to_water_bgs, ) + .join(Thing, Thing.id == TransducerDailyData.thing_id) .filter( TransducerDailyData.point_id == point_id, TransducerDailyData.qced.is_(True), TransducerDailyData.parameter_name == "groundwater level", + Thing.release_status == "public", ) .order_by(TransducerDailyData.date_measured) .all() diff --git a/tests/test_ngwmn_endpoints.py b/tests/test_ngwmn_endpoints.py index 631ca7dd..cf1a6b86 100644 --- a/tests/test_ngwmn_endpoints.py +++ b/tests/test_ngwmn_endpoints.py @@ -44,6 +44,7 @@ POINT_ID = "NGWMN-TEST-0001" MERGED_POINT_ID = "NGWMN-TEST-0002" +PRIVATE_POINT_ID = "NGWMN-TEST-0003" @pytest.fixture(scope="module") @@ -391,6 +392,135 @@ def test_ngwmn_waterlevels_merges_manual_and_transducer(ngwmn_merged_well): assert third.findtext("MeasurementDay") == "1" +@pytest.fixture(scope="module") +def ngwmn_private_well(): + """A private well whose child rows are public; nothing may be exported.""" + with session_ctx() as session: + thing = Thing( + name=PRIVATE_POINT_ID, + thing_type="water well", + release_status="private", + well_casing_depth=100.0, + ) + session.add(thing) + session.flush() + + session.add( + WellScreen( + thing_id=thing.id, + screen_depth_top=10.0, + screen_depth_bottom=50.0, + release_status="public", + ) + ) + + formation = GeologicFormation( + formation_code=None, lithology="Sandstone", release_status="public" + ) + session.add(formation) + session.flush() + session.add( + ThingGeologicFormationAssociation( + thing_id=thing.id, + geologic_formation_id=formation.id, + top_depth=0.0, + bottom_depth=50.0, + release_status="public", + ) + ) + + parameter_id = get_parameter_id("groundwater level", "Field Parameter") + + event = FieldEvent( + thing_id=thing.id, + event_date="2024-03-15T19:00:00Z", + release_status="public", + ) + session.add(event) + session.flush() + activity = FieldActivity( + field_event_id=event.id, + activity_type="groundwater level", + release_status="public", + ) + session.add(activity) + session.flush() + sample = Sample( + field_activity_id=activity.id, + sample_date="2024-03-15T19:00:00Z", + sample_name=f"{PRIVATE_POINT_ID}-wl-0", + sample_matrix="water", + sample_method="Steel-tape measurement", + qc_type="Normal", + release_status="public", + ) + session.add(sample) + session.flush() + session.add( + Observation( + sample_id=sample.id, + parameter_id=parameter_id, + observation_datetime="2024-03-15T19:00:00Z", + value=50.0, + unit="ft", + release_status="public", + ) + ) + + sensor = Sensor( + name=f"{PRIVATE_POINT_ID}-transducer", + sensor_type="Pressure Transducer", + release_status="public", + ) + session.add(sensor) + session.flush() + deployment = Deployment( + thing_id=thing.id, + sensor_id=sensor.id, + installation_date="2024-01-01", + release_status="public", + ) + session.add(deployment) + session.flush() + session.add( + TransducerObservation( + deployment_id=deployment.id, + parameter_id=parameter_id, + observation_datetime="2024-03-20T12:00:00Z", + value=30.0, + release_status="public", + ) + ) + session.commit() + thing_id = thing.id + sensor_id = sensor.id + + session.execute(text("REFRESH MATERIALIZED VIEW transducer_daily_data")) + session.commit() + + yield PRIVATE_POINT_ID + + with session_ctx() as session: + session.execute(delete(Thing).where(Thing.id == thing_id)) + session.execute(delete(Sensor).where(Sensor.id == sensor_id)) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW transducer_daily_data")) + session.commit() + + +def test_ngwmn_private_thing_exports_nothing(ngwmn_private_well): + for endpoint, root_tag, record_tag in ( + ("waterlevels", "WaterLevels", "WaterLevel"), + ("wellconstruction", "Casings", "Casing"), + ("lithology", "Lithologies", "Lithology"), + ): + response = client.get(f"/ngwmn/{endpoint}/{ngwmn_private_well}") + assert response.status_code == 200 + root = etree.fromstring(response.content) + assert root.tag == root_tag + assert len(root.findall(record_tag)) == 0, endpoint + + def test_ngwmn_unknown_pointid_returns_empty(): response = client.get("/ngwmn/waterlevels/NO-SUCH-POINTID") assert response.status_code == 200