From 5333caa57b5ed20d2827599d97b09de75fdf62d0 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Tue, 7 Apr 2026 18:31:45 -0600 Subject: [PATCH] fix(transfers): add scoped PointID transfer mode and rerun guards Preserve the default bulk legacy transfer behavior while making `TRANSFER_TEST_POINTIDS` an opt-in scoped mode that limits transfers to the requested legacy PointIDs across well and non-well site types. Also harden scoped reruns by adding duplicate guards to create-only transfer paths such as water levels, assets, permissions, group associations, and thing-id links, and scope location cleanup to affected locations only. This makes targeted remediation runs safer, faster, and predictable without changing the existing full-transfer workflow. --- tests/test_thing_transfer.py | 9 +- tests/test_transfer_legacy_dates.py | 59 +++ tests/test_transfer_scoping.py | 234 ++++++++++++ transfers/asset_transfer.py | 47 ++- transfers/associated_data.py | 3 +- transfers/chemistry_sampleinfo.py | 8 +- transfers/group_transfer.py | 13 +- transfers/hydraulicsdata.py | 5 +- transfers/link_ids_transfer.py | 59 ++- transfers/minor_trace_chemistry_transfer.py | 19 +- transfers/ngwmn_views.py | 5 +- transfers/permissions_transfer.py | 50 ++- transfers/sensor_transfer.py | 2 +- transfers/soil_rock_results.py | 3 + transfers/stratigraphy_transfer.py | 6 +- transfers/surface_water_data.py | 5 +- transfers/surface_water_photos.py | 3 +- transfers/thing_transfer.py | 69 +++- transfers/transfer.py | 359 +++++++++++++++--- transfers/transferer.py | 82 +++- transfers/waterlevels_transducer_transfer.py | 2 +- transfers/waterlevels_transfer.py | 93 ++++- .../waterlevelscontinuous_pressure_daily.py | 5 +- transfers/weather_data.py | 5 +- transfers/weather_photos.py | 3 +- transfers/well_transfer.py | 2 +- transfers/well_transfer_util.py | 28 +- 27 files changed, 1035 insertions(+), 143 deletions(-) create mode 100644 tests/test_transfer_scoping.py diff --git a/tests/test_thing_transfer.py b/tests/test_thing_transfer.py index ea33baf7c..df1a0c367 100644 --- a/tests/test_thing_transfer.py +++ b/tests/test_thing_transfer.py @@ -27,17 +27,19 @@ def test_transfer_new_site_types_calls_transfer_thing( ): calls = [] - def fake_transfer_thing(session, site_type, make_payload, limit=None): + def fake_transfer_thing( + session, site_type, make_payload, limit=None, pointids=None + ): class Row: PointID = "PT-1" PublicRelease = False payload = make_payload(Row) - calls.append((site_type, payload, limit)) + calls.append((site_type, payload, limit, pointids)) monkeypatch.setattr(tt, "transfer_thing", fake_transfer_thing) - getattr(tt, func_name)(session=None, limit=7) + getattr(tt, func_name)(session=None, limit=7, pointids=["PT-1"]) assert calls == [ ( @@ -48,5 +50,6 @@ class Row: "release_status": "private", }, 7, + ["PT-1"], ) ] diff --git a/tests/test_transfer_legacy_dates.py b/tests/test_transfer_legacy_dates.py index 32732b971..59336d3fd 100644 --- a/tests/test_transfer_legacy_dates.py +++ b/tests/test_transfer_legacy_dates.py @@ -22,6 +22,7 @@ """ import datetime +from types import SimpleNamespace from unittest.mock import patch import numpy as np @@ -29,6 +30,8 @@ import pytest from db import Sample +from db.engine import session_ctx +from transfers.transferer import Transferer from transfers.well_transfer import _normalize_completion_date from transfers.util import make_location from transfers.waterlevels_transfer import WaterLevelTransferer @@ -473,6 +476,62 @@ def create_test_row(i, has_site_date): assert locations_with_site_date == 9 # 9% should have nma_site_date +def test_capture_database_error_uses_message_when_detail_missing(): + transfer = Transferer() + transfer.source_table = "TestTable" + + class FakeOrig: + args = ({"M": "current transaction is aborted", "t": "contact"},) + + class FakeDatabaseError(Exception): + def __init__(self): + self.orig = FakeOrig() + + transfer._capture_database_error("PT-1", FakeDatabaseError()) + + assert transfer.errors == [ + { + "pointid": "PT-1", + "error": "current transaction is aborted", + "table": "TestTable", + "field": "contact", + } + ] + + +def test_get_field_event_participant_ids_reuses_existing_contact(contact): + transfer = WaterLevelTransferer.__new__(WaterLevelTransferer) + transfer._measured_by_mapper = { + "Tester": [contact.name, contact.organization, "Owner"] + } + transfer._created_contact_id_by_key = {} + transfer._owner_contact_id_by_pointid = {} + transfer._last_contacts_created_count = 0 + transfer._last_contacts_reused_count = 0 + + row = SimpleNamespace( + PointID="TEST-POINTID", + GlobalID="TEST-GLOBALID", + MeasuredBy="Tester", + ) + + with session_ctx() as session: + participant_ids = transfer._get_field_event_participant_ids(session, row) + matching_contacts = ( + session.query(contact.__class__) + .filter( + contact.__class__.name == contact.name, + contact.__class__.organization == contact.organization, + ) + .all() + ) + + assert participant_ids == [contact.id] + assert transfer._last_contacts_created_count == 0 + assert transfer._last_contacts_reused_count == 1 + assert len(matching_contacts) == 1 + + # ============================================================================ # EOF # ============================================================================ diff --git a/tests/test_transfer_scoping.py b/tests/test_transfer_scoping.py new file mode 100644 index 000000000..a62ff9efc --- /dev/null +++ b/tests/test_transfer_scoping.py @@ -0,0 +1,234 @@ +from types import SimpleNamespace +from contextlib import contextmanager + +import pandas as pd +import pytest + +from transfers import transfer as transfer_module +from transfers import thing_transfer as thing_transfer_module +from transfers import well_transfer_util as well_transfer_util_module + + +class _FakeExecuteResult: + def __init__(self, rows=None): + self._rows = rows or [] + + def all(self): + return list(self._rows) + + +class _FakeSession: + def __init__(self): + self.inserted_location_rows = [] + self.inserted_thing_rows = [] + self.commits = 0 + + def execute(self, statement, rows): + table_name = statement.table.name + if table_name == "location": + self.inserted_location_rows.extend(rows) + returned = [ + (index + 1, row["nma_pk_location"]) for index, row in enumerate(rows) + ] + return _FakeExecuteResult(returned) + if table_name == "thing": + self.inserted_thing_rows.extend(rows) + returned = [ + (index + 10, row["nma_pk_location"]) for index, row in enumerate(rows) + ] + return _FakeExecuteResult(returned) + return _FakeExecuteResult() + + def commit(self): + self.commits += 1 + + +def test_normalize_test_pointids_dedupes_and_upcases(): + pointids = transfer_module._normalize_test_pointids(" sm-0001,SM-0001, sp-1 ") + + assert pointids == ["SM-0001", "SP-1"] + + +def test_validate_scoped_pointids_or_raise_raises_for_missing(monkeypatch): + monkeypatch.setattr( + transfer_module, + "_collect_available_scoped_pointids", + lambda _opts: {"SM-0001"}, + ) + opts = transfer_module.load_transfer_options() + + with pytest.raises(RuntimeError, match="MISSING-1"): + transfer_module._validate_scoped_pointids_or_raise( + ["SM-0001", "MISSING-1"], opts + ) + + +def test_execute_session_transfer_with_timing_passes_pointids(monkeypatch): + seen = {} + + @contextmanager + def fake_session_ctx(): + yield object() + + monkeypatch.setattr(transfer_module, "session_ctx", fake_session_ctx) + + def fake_transfer(session, limit=None, pointids=None): + seen["limit"] = limit + seen["pointids"] = pointids + return "ok" + + name, result, _elapsed = transfer_module._execute_session_transfer_with_timing( + "Fake", + fake_transfer, + 30, + ["SM-0001"], + ) + + assert name == "Fake" + assert result == "ok" + assert seen["limit"] == 3 + assert seen["pointids"] == ["SM-0001"] + + +def test_transfer_thing_filters_to_requested_pointids(monkeypatch): + location_df = pd.DataFrame( + [ + { + "SiteType": "SP", + "PointID": "PT-1", + "Easting": 1, + "Northing": 1, + "LocationId": "loc-1", + "PublicRelease": True, + }, + { + "SiteType": "SP", + "PointID": "PT-2", + "Easting": 2, + "Northing": 2, + "LocationId": "loc-2", + "PublicRelease": False, + }, + ] + ) + + fake_location = SimpleNamespace( + nma_pk_location="loc-1", + description=None, + point="POINT", + elevation=1.0, + release_status="public", + nma_date_created=None, + nma_site_date=None, + nma_location_notes=None, + nma_coordinate_notes=None, + nma_data_reliability=None, + ) + + monkeypatch.setattr(thing_transfer_module, "_get_location_df", lambda: location_df) + monkeypatch.setattr( + thing_transfer_module, + "make_location", + lambda row, _cache: (fake_location, "manual", {}), + ) + monkeypatch.setattr( + thing_transfer_module, + "make_location_data_provenance", + lambda row, location_stub, elevation_method: [], + ) + + session = _FakeSession() + + thing_transfer_module.transfer_thing( + session, + "SP", + lambda row: { + "name": row.PointID, + "thing_type": "spring", + "release_status": "public", + }, + pointids=["PT-1"], + ) + + assert [row["name"] for row in session.inserted_thing_rows] == ["PT-1"] + + +def test_cleanup_locations_scopes_to_requested_pointids(monkeypatch): + class FakeBlob: + def exists(self): + return False + + class FakeBucket: + def blob(self, _name): + return FakeBlob() + + class FakeQuery: + def __init__(self, locations): + self.locations = locations + self.join_calls = 0 + self.filter_calls = 0 + self.distinct_calls = 0 + + def join(self, *_args, **_kwargs): + self.join_calls += 1 + return self + + def filter(self, *_args, **_kwargs): + self.filter_calls += 1 + return self + + def distinct(self): + self.distinct_calls += 1 + return self + + def all(self): + return self.locations + + class FakeSession: + def __init__(self, query): + self._query = query + self.updated = [] + self.commits = 0 + + def query(self, _model): + return self._query + + def bulk_update_mappings(self, _model, updates): + self.updated.extend(updates) + + def commit(self): + self.commits += 1 + + location = SimpleNamespace( + id=1, + latlon=(35.0, -106.0), + state="New Mexico", + county="Bernalillo", + quad_name="Albuquerque West", + ) + query = FakeQuery([location]) + session = FakeSession(query) + + monkeypatch.setattr( + well_transfer_util_module, "get_storage_bucket", lambda: FakeBucket() + ) + monkeypatch.setattr( + well_transfer_util_module, "upload_blob_json", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr( + well_transfer_util_module, "download_blob_json", lambda *_args, **_kwargs: {} + ) + + well_transfer_util_module.cleanup_locations(session, pointids=["sm-0001"]) + + assert query.join_calls == 2 + assert query.filter_calls == 1 + assert query.distinct_calls == 1 + assert session.updated == [ + { + "id": 1, + "state": "New Mexico", + "county": "Bernalillo", + "quad_name": "Albuquerque West", + } + ] diff --git a/transfers/asset_transfer.py b/transfers/asset_transfer.py index d8ec6525b..4c0fd6cb8 100644 --- a/transfers/asset_transfer.py +++ b/transfers/asset_transfer.py @@ -18,7 +18,7 @@ from sqlalchemy.orm import Session from starlette.datastructures import UploadFile -from db import Thing +from db import Thing, Asset, AssetThingAssociation from services.asset_helper import upload_and_associate from services.gcs_helper import ( get_storage_bucket, @@ -40,7 +40,7 @@ def __init__(self, *args, **kw): def _get_dfs(self): input_df = read_csv(self.source_table) - cleaned_df = filter_to_valid_point_ids(input_df) + cleaned_df = filter_to_valid_point_ids(input_df, self.pointids) return input_df, cleaned_df def _transfer_hook(self, session: Session): @@ -55,6 +55,13 @@ def _transfer_hook(self, session: Session): .filter(Thing.name == row.PointID, Thing.thing_type == "water well") .one_or_none() ) + if well is None: + self._capture_error( + row.PointID, + "Thing not found", + "PointID", + ) + continue self._asset_step(session, i, well) session.commit() @@ -68,6 +75,22 @@ def _asset_step(self, session, i, db_item): return n = len(photos) + existing_asset_names = { + name + for (name,) in session.query(Asset.name) + .join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id) + .filter(AssetThingAssociation.thing_id == db_item.id) + .all() + if name + } + existing_asset_paths = { + storage_path + for (storage_path,) in session.query(Asset.storage_path) + .join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id) + .filter(AssetThingAssociation.thing_id == db_item.id) + .all() + if storage_path + } for j, row in enumerate(photos.itertuples()): photo_path = row.OLEPath srcblob = self._bucket.get_blob(f"nma-photos/{photo_path}") @@ -78,6 +101,16 @@ def _asset_step(self, session, i, db_item): continue head, filename = srcblob.name.split("/") + if filename in existing_asset_names or any( + storage_path.endswith(filename) for storage_path in existing_asset_paths + ): + logger.info( + "Skipping existing asset %s for thing.id=%s thing=%s", + filename, + db_item.id, + db_item.name, + ) + continue f = srcblob.download_as_bytes() ff = UploadFile(file=io.BytesIO(f), filename=filename, size=len(f)) @@ -91,8 +124,16 @@ def _asset_step(self, session, i, db_item): ) logger.info( - f"Added asset {i}-{j}/{n} thing.id={db_item.id} thing={db_item.name} uri: {uri}" + "Added asset %s-%s/%s thing.id=%s thing=%s uri: %s", + i, + j, + n, + db_item.id, + db_item.name, + uri, ) + existing_asset_names.add(filename) + existing_asset_paths.add(uri[1]) # ============= EOF ============================================= diff --git a/transfers/associated_data.py b/transfers/associated_data.py index ebe1cebe5..388a43720 100644 --- a/transfers/associated_data.py +++ b/transfers/associated_data.py @@ -37,7 +37,7 @@ from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import replace_nans +from transfers.util import replace_nans, filter_to_valid_point_ids class AssociatedDataTransferer(Transferer): @@ -73,6 +73,7 @@ def _build_thing_id_cache(self) -> None: def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) cleaned_df = replace_nans(df) + cleaned_df = filter_to_valid_point_ids(cleaned_df, self.pointids) return df, cleaned_df def _transfer_hook(self, session: Session) -> None: diff --git a/transfers/chemistry_sampleinfo.py b/transfers/chemistry_sampleinfo.py index ce8674368..1519a8d22 100644 --- a/transfers/chemistry_sampleinfo.py +++ b/transfers/chemistry_sampleinfo.py @@ -23,7 +23,7 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_Chemistry_SampleInfo, Location, LocationThingAssociation +from db import NMA_Chemistry_SampleInfo, Location, LocationThingAssociation, Thing from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer @@ -75,8 +75,12 @@ def _build_thing_id_cache(self): Location.id == LocationThingAssociation.location_id, ) .filter(Location.nma_pk_location.isnot(None)) - .all() ) + if self.is_scoped_run(): + results = results.join( + Thing, Thing.id == LocationThingAssociation.thing_id + ).filter(Thing.name.in_(self.pointids)) + results = results.all() location_to_thing = {} for nma_pk_location, thing_id in results: if nma_pk_location is None: diff --git a/transfers/group_transfer.py b/transfers/group_transfer.py index 5549a81d1..6aa1cfbb4 100644 --- a/transfers/group_transfer.py +++ b/transfers/group_transfer.py @@ -47,11 +47,16 @@ def _step(self, session: Session, df: pd.DataFrame, i: int, row: pd.Series): if prefix: # get all PointIDs that start with prefix sql = select(Thing).where(Thing.name.like(f"{prefix}%")) + if self.is_scoped_run(): + sql = sql.where(Thing.name.in_(self.pointids)) records = session.scalars(sql).unique().all() if records: logger.info( f"Adding {len(records)} things to group {group.name}, prefix {prefix}" ) + existing_thing_ids = { + assoc.thing_id for assoc in group.thing_associations + } group_is_monitoring_plan = False for record in records: # set the group_type to Monitoring Plan if at least one well is currently monitored @@ -78,9 +83,11 @@ def _step(self, session: Session, df: pd.DataFrame, i: int, row: pd.Series): f" Setting group {group.name} type to Monitoring Plan based on thing {record.name}" ) - gta = GroupThingAssociation(group=group, thing=record) - session.add(gta) - group.thing_associations.append(gta) + if record.id not in existing_thing_ids: + gta = GroupThingAssociation(group=group, thing=record) + session.add(gta) + group.thing_associations.append(gta) + existing_thing_ids.add(record.id) session.add(group) session.commit() diff --git a/transfers/hydraulicsdata.py b/transfers/hydraulicsdata.py index d5a2b1800..f7e4e8a6a 100644 --- a/transfers/hydraulicsdata.py +++ b/transfers/hydraulicsdata.py @@ -37,7 +37,7 @@ from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import read_csv +from transfers.util import read_csv, filter_to_valid_point_ids class HydraulicsDataTransferer(Transferer): @@ -63,7 +63,8 @@ def _build_thing_id_cache(self) -> None: def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = read_csv(self.source_table) - cleaned_df = self._filter_to_valid_things(df) + cleaned_df = filter_to_valid_point_ids(df, self.pointids) + cleaned_df = self._filter_to_valid_things(cleaned_df) return df, cleaned_df def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: diff --git a/transfers/link_ids_transfer.py b/transfers/link_ids_transfer.py index 462f6de73..632018005 100644 --- a/transfers/link_ids_transfer.py +++ b/transfers/link_ids_transfer.py @@ -50,6 +50,9 @@ def _transfer_hook(self, session): len(chunk), len(thing_id_by_pointid), ) + existing_link_keys = _fetch_existing_link_keys( + session, thing_id_by_pointid.values() + ) rows_to_insert: list[dict] = [] for row in chunk.itertuples(index=False): @@ -92,14 +95,18 @@ def _transfer_hook(self, session): ) continue - rows_to_insert.append( - { - "thing_id": thing_id, - "relation": relation, - "alternate_id": aid_text, - "alternate_organization": "NMOSE", - } - ) + link_row = { + "thing_id": thing_id, + "relation": relation, + "alternate_id": aid_text, + "alternate_organization": "NMOSE", + } + link_key = _link_row_key(link_row) + if link_key in existing_link_keys: + continue + + rows_to_insert.append(link_row) + existing_link_keys.add(link_key) if rows_to_insert: session.execute(insert(ThingIdLink), rows_to_insert) @@ -135,7 +142,7 @@ def _get_dfs(self): ldf = input_df[input_df["SiteType"] == self.site_type] ldf = ldf[ldf["Easting"].notna() & ldf["Northing"].notna()] ldf = replace_nans(ldf) - cleaned_df = filter_to_valid_point_ids(ldf) + cleaned_df = filter_to_valid_point_ids(ldf, self.pointids) return input_df, cleaned_df def _transfer_hook(self, session): @@ -153,6 +160,9 @@ def _transfer_hook(self, session): len(chunk), len(thing_id_by_pointid), ) + existing_link_keys = _fetch_existing_link_keys( + session, thing_id_by_pointid.values() + ) rows_to_insert: list[dict] = [] for row in chunk.itertuples(index=False): @@ -168,7 +178,11 @@ def _transfer_hook(self, session): ): link_row = func(row, thing_id) if link_row: + link_key = _link_row_key(link_row) + if link_key in existing_link_keys: + continue rows_to_insert.append(link_row) + existing_link_keys.add(link_key) if rows_to_insert: session.execute(insert(ThingIdLink), rows_to_insert) @@ -248,4 +262,31 @@ def _make_thing_id_link( } +def _link_row_key(row: dict) -> tuple[int, str, str, str]: + return ( + row["thing_id"], + row["relation"], + row["alternate_id"], + row["alternate_organization"], + ) + + +def _fetch_existing_link_keys(session, thing_ids) -> set[tuple[int, str, str, str]]: + thing_ids = list(set(thing_ids)) + if not thing_ids: + return set() + + return { + (thing_id, relation, alternate_id, alternate_organization) + for thing_id, relation, alternate_id, alternate_organization in session.query( + ThingIdLink.thing_id, + ThingIdLink.relation, + ThingIdLink.alternate_id, + ThingIdLink.alternate_organization, + ) + .filter(ThingIdLink.thing_id.in_(thing_ids)) + .all() + } + + # ============= EOF ============================================= diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 92fdb8b13..4bb747845 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -38,7 +38,7 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMA_Chemistry_SampleInfo, NMA_MinorTraceChemistry +from db import NMA_Chemistry_SampleInfo, NMA_MinorTraceChemistry, Thing from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer @@ -65,14 +65,15 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _build_sample_info_cache(self): """Build cache of ChemistrySampleInfo.nma_sample_pt_id -> ChemistrySampleInfo.id.""" with session_ctx() as session: - sample_infos = ( - session.query( - NMA_Chemistry_SampleInfo.nma_sample_pt_id, - NMA_Chemistry_SampleInfo.id, - ) - .filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None)) - .all() - ) + query = session.query( + NMA_Chemistry_SampleInfo.nma_sample_pt_id, + NMA_Chemistry_SampleInfo.id, + ).filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None)) + if self.is_scoped_run(): + query = query.join( + Thing, Thing.id == NMA_Chemistry_SampleInfo.thing_id + ).filter(Thing.name.in_(self.pointids)) + sample_infos = query.all() self._sample_info_cache = { nma_sample_pt_id: csi_id for nma_sample_pt_id, csi_id in sample_infos } diff --git a/transfers/ngwmn_views.py b/transfers/ngwmn_views.py index ffad11397..657fc9c49 100644 --- a/transfers/ngwmn_views.py +++ b/transfers/ngwmn_views.py @@ -29,7 +29,7 @@ ) from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import read_csv +from transfers.util import read_csv, filter_to_valid_point_ids class _BaseNGWMNTransferer(Transferer): @@ -46,7 +46,8 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = read_csv(self.source_table, parse_dates=self.parse_dates) - return df, df + cleaned_df = filter_to_valid_point_ids(df, self.pointids) + return df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows = self._dedupe_rows( diff --git a/transfers/permissions_transfer.py b/transfers/permissions_transfer.py index 346e9f147..b55796bdb 100644 --- a/transfers/permissions_transfer.py +++ b/transfers/permissions_transfer.py @@ -6,6 +6,7 @@ from db import Thing, PermissionHistory, Contact, ThingContactAssociation from transfers.util import read_csv, logger, replace_nans, chunk_by_size +from transfers.transferer import Transferer """ Developer's notes @@ -43,7 +44,7 @@ def _make_permission( return permission -def transfer_permissions(session: Session) -> None: +def transfer_permissions(session: Session, pointids: list[str] | None = None) -> None: """ The transferred wells and contacts need to be transferred first - to access the auto-generated well IDs @@ -52,17 +53,34 @@ def transfer_permissions(session: Session) -> None: """ wdf = read_csv("WellData", dtype={"OSEWelltagID": str}) wdf = replace_nans(wdf) + if pointids: + normalized_pointids = wdf["PointID"].map(Transferer._normalize_pointid) + wdf = wdf[normalized_pointids.isin(set(pointids))] logger.info("Starting transfer: Permissions") - transferred_wells = ( + transferred_wells_query = ( session.query(Thing, Contact) .select_from(Thing) .join(ThingContactAssociation, ThingContactAssociation.thing_id == Thing.id) .join(Contact, Contact.id == ThingContactAssociation.contact_id) .filter(Thing.thing_type == "water well") .order_by(Thing.name) - .all() ) + if pointids: + transferred_wells_query = transferred_wells_query.filter( + Thing.name.in_(pointids) + ) + transferred_wells = transferred_wells_query.all() + existing_permissions = { + (target_id, contact_id, permission_type) + for target_id, contact_id, permission_type in session.query( + PermissionHistory.target_id, + PermissionHistory.contact_id, + PermissionHistory.permission_type, + ) + .filter(PermissionHistory.target_table == "thing") + .all() + } created_count = 0 visited = [] for chunk in chunk_by_size(transferred_wells, 100): @@ -78,16 +96,38 @@ def transfer_permissions(session: Session) -> None: permission = _make_permission( wdf, well, contact.id, "SampleOK", "Water Chemistry Sample" ) - if permission: + if ( + permission + and ( + well.id, + contact.id, + permission.permission_type, + ) + not in existing_permissions + ): objs.append(permission) created_count += 1 + existing_permissions.add( + (well.id, contact.id, permission.permission_type) + ) permission = _make_permission( wdf, well, contact.id, "MonitorOK", "Water Level Sample" ) - if permission: + if ( + permission + and ( + well.id, + contact.id, + permission.permission_type, + ) + not in existing_permissions + ): objs.append(permission) created_count += 1 + existing_permissions.add( + (well.id, contact.id, permission.permission_type) + ) session.bulk_save_objects(objs) session.commit() diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index a1c65b275..4b87ea83f 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -92,7 +92,7 @@ def _get_dfs(self): input_df = read_csv(self.source_table) input_df.columns = input_df.columns.str.replace(" ", "_") input_df = input_df[input_df.SerialNo.notna()] - cleaned_df = filter_to_valid_point_ids(input_df) + cleaned_df = filter_to_valid_point_ids(input_df, self.pointids) cleaned_df = replace_nans(cleaned_df) return input_df, cleaned_df diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py index fd3894e52..b89f8263c 100644 --- a/transfers/soil_rock_results.py +++ b/transfers/soil_rock_results.py @@ -67,6 +67,9 @@ def _build_thing_id_cache(self) -> None: def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) cleaned_df = replace_nans(df) + if self.is_scoped_run(): + normalized_pointids = cleaned_df["Point_ID"].map(self._normalize_point_id) + cleaned_df = cleaned_df[normalized_pointids.isin(self.scoped_pointid_set())] return df, cleaned_df def _transfer_hook(self, session: Session) -> None: diff --git a/transfers/stratigraphy_transfer.py b/transfers/stratigraphy_transfer.py index 09ce86904..b0e192465 100644 --- a/transfers/stratigraphy_transfer.py +++ b/transfers/stratigraphy_transfer.py @@ -20,7 +20,9 @@ ) -def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: +def transfer_stratigraphy( + session: Session, limit: int = None, pointids: list[str] | None = None +) -> tuple: """ Transfer detailed stratigraphy (lithology log) data from Stratigraphy CSV. @@ -50,7 +52,7 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: cleaned_df = replace_nans(input_df) # Step 2: Filter to only wells that exist in database - cleaned_df = filter_to_valid_point_ids(cleaned_df) + cleaned_df = filter_to_valid_point_ids(cleaned_df, pointids) n_records = len(cleaned_df) n_wells = len(cleaned_df["PointID"].unique()) diff --git a/transfers/surface_water_data.py b/transfers/surface_water_data.py index 519d9a627..ac6b320e1 100644 --- a/transfers/surface_water_data.py +++ b/transfers/surface_water_data.py @@ -27,7 +27,7 @@ from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import read_csv +from transfers.util import read_csv, filter_to_valid_point_ids class SurfaceWaterDataTransferer(Transferer): @@ -58,7 +58,8 @@ def _build_thing_id_cache(self) -> None: def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = read_csv(self.source_table, parse_dates=["DateMeasured"]) - return df, df + cleaned_df = filter_to_valid_point_ids(df, self.pointids) + return df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows: list[dict[str, Any]] = [] diff --git a/transfers/surface_water_photos.py b/transfers/surface_water_photos.py index 12d9c5897..084032d05 100644 --- a/transfers/surface_water_photos.py +++ b/transfers/surface_water_photos.py @@ -26,7 +26,7 @@ from db import NMA_SurfaceWaterPhotos from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import replace_nans +from transfers.util import replace_nans, filter_to_valid_point_ids class SurfaceWaterPhotosTransferer(Transferer): @@ -41,6 +41,7 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) cleaned_df = replace_nans(df) + cleaned_df = filter_to_valid_point_ids(cleaned_df, self.pointids) return df, cleaned_df def _transfer_hook(self, session: Session) -> None: diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index a7442bb3f..e28e1a790 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -43,10 +43,27 @@ def _get_location_df(): return _LOCATION_DF_CACHE -def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None: +def transfer_thing( + session: Session, + site_type: str, + make_payload, + limit=None, + pointids: list[str] | None = None, +) -> None: ldf = _get_location_df() ldf = ldf[ldf["SiteType"] == site_type] ldf = ldf[ldf["Easting"].notna() & ldf["Northing"].notna()] + if pointids: + normalized_pointids = ldf["PointID"].map( + lambda value: str(value).strip().upper() + ) + ldf = ldf[normalized_pointids.isin(set(pointids))] + if ldf.empty: + logger.info( + "No matching PointIDs for site type %s in scoped run; skipping", + site_type, + ) + return # Pre-compute duplicate PointIDs once to avoid O(n^2) filtering in the loop. duplicate_mask = ldf["PointID"].duplicated(keep=False) @@ -219,7 +236,7 @@ def _release_status(row) -> str: return "public" if row.PublicRelease else "private" -def transfer_springs(session, limit=None): +def transfer_springs(session, limit=None, pointids: list[str] | None = None): def make_payload(row): return { "name": row.PointID, @@ -227,10 +244,10 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "SP", make_payload, limit) + transfer_thing(session, "SP", make_payload, limit, pointids) -def transfer_perennial_streams(session, limit=None): +def transfer_perennial_streams(session, limit=None, pointids: list[str] | None = None): def make_payload(row): return { "name": row.PointID, @@ -238,10 +255,10 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "PS", make_payload, limit) + transfer_thing(session, "PS", make_payload, limit, pointids) -def transfer_ephemeral_streams(session, limit=None): +def transfer_ephemeral_streams(session, limit=None, pointids: list[str] | None = None): def make_payload(row): return { "name": row.PointID, @@ -249,10 +266,10 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "ES", make_payload, limit) + transfer_thing(session, "ES", make_payload, limit, pointids) -def transfer_met_stations(session, limit=None): +def transfer_met_stations(session, limit=None, pointids: list[str] | None = None): def make_payload(row): return { "name": row.PointID, @@ -260,10 +277,12 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "M", make_payload, limit) + transfer_thing(session, "M", make_payload, limit, pointids) -def transfer_rock_sample_locations(session, limit=None): +def transfer_rock_sample_locations( + session, limit=None, pointids: list[str] | None = None +): def make_payload(row): return { "name": row.PointID, @@ -271,10 +290,12 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "R", make_payload, limit) + transfer_thing(session, "R", make_payload, limit, pointids) -def transfer_diversion_of_surface_water(session, limit=None): +def transfer_diversion_of_surface_water( + session, limit=None, pointids: list[str] | None = None +): def make_payload(row): return { "name": row.PointID, @@ -282,10 +303,12 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "D", make_payload, limit) + transfer_thing(session, "D", make_payload, limit, pointids) -def transfer_lake_pond_reservoir(session, limit=None): +def transfer_lake_pond_reservoir( + session, limit=None, pointids: list[str] | None = None +): def make_payload(row): return { "name": row.PointID, @@ -293,10 +316,12 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "L", make_payload, limit) + transfer_thing(session, "L", make_payload, limit, pointids) -def transfer_soil_gas_sample_locations(session, limit=None): +def transfer_soil_gas_sample_locations( + session, limit=None, pointids: list[str] | None = None +): def make_payload(row): return { "name": row.PointID, @@ -304,10 +329,10 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "S", make_payload, limit) + transfer_thing(session, "S", make_payload, limit, pointids) -def transfer_other_site_types(session, limit=None): +def transfer_other_site_types(session, limit=None, pointids: list[str] | None = None): def make_payload(row): return { "name": row.PointID, @@ -315,10 +340,12 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "OT", make_payload, limit) + transfer_thing(session, "OT", make_payload, limit, pointids) -def transfer_outfall_wastewater_return_flow(session, limit=None): +def transfer_outfall_wastewater_return_flow( + session, limit=None, pointids: list[str] | None = None +): def make_payload(row): return { "name": row.PointID, @@ -326,7 +353,7 @@ def make_payload(row): "release_status": _release_status(row), } - transfer_thing(session, "O", make_payload, limit) + transfer_thing(session, "O", make_payload, limit, pointids) # ============= EOF ============================================= diff --git a/transfers/transfer.py b/transfers/transfer.py index 419d4870a..a6bdabe65 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -18,8 +18,10 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager from dataclasses import dataclass +from typing import Iterable from dotenv import load_dotenv +from sqlalchemy.dialects.postgresql import insert as pg_insert from transfers.thing_transfer import ( transfer_rock_sample_locations, @@ -109,6 +111,9 @@ from transfers.weather_data import WeatherDataTransferer from transfers.weather_photos import WeatherPhotosTransferer from transfers.logger import logger, save_log_to_bucket +from transfers.transferer import Transferer +from transfers.util import read_csv +from db import GeologicFormation @dataclass @@ -229,20 +234,233 @@ def transfer_context(name: str, *, pad: int = 10): def _get_test_pointids(): - pointids = None - if os.getenv("TRANSFER_TEST_POINTIDS"): - pointids = os.getenv("TRANSFER_TEST_POINTIDS").split(",") + return _normalize_test_pointids(os.getenv("TRANSFER_TEST_POINTIDS")) + + +def _normalize_test_pointids(raw: str | None) -> list[str] | None: + if not raw: + return None + return Transferer._normalize_pointids(raw.split(",")) + + +def _normalize_pointid_series(values: Iterable) -> set[str]: + pointids: set[str] = set() + for value in values: + normalized = Transferer._normalize_pointid(value) + if normalized is not None: + pointids.add(normalized) return pointids -def _execute_transfer(klass, flags: dict = None): +def _source_pointids(table_name: str, column: str = "PointID", **read_kw) -> set[str]: + df = read_csv(table_name, **read_kw) + if column not in df.columns: + return set() + return _normalize_pointid_series(df[column].tolist()) + + +def _location_pointids_for_site_types(site_types: set[str]) -> set[str]: + if not site_types: + return set() + df = read_csv("Location") + if "SiteType" not in df.columns or "PointID" not in df.columns: + return set() + filtered = df[df["SiteType"].isin(site_types)] + return _normalize_pointid_series(filtered["PointID"].tolist()) + + +def _collect_available_scoped_pointids(transfer_options: TransferOptions) -> set[str]: + available: set[str] = set() + direct_sources: list[tuple[str, str, dict]] = [] + + direct_sources.append(("WellData", "PointID", {"dtype": {"OSEWelltagID": str}})) + + if transfer_options.transfer_waterlevels: + direct_sources.append(("WaterLevels", "PointID", {})) + if transfer_options.transfer_pressure: + direct_sources.append( + ( + "WaterLevelsContinuous_Pressure", + "PointID", + {"parse_dates": ["DateMeasured"]}, + ) + ) + if transfer_options.transfer_acoustic: + direct_sources.append( + ( + "WaterLevelsContinuous_Acoustic", + "PointID", + {"parse_dates": ["DateMeasured"]}, + ) + ) + if transfer_options.transfer_pressure_daily: + direct_sources.append( + ( + "WaterLevelsContinuous_Pressure_Daily", + "PointID", + {"parse_dates": ["DateMeasured", "Created", "Updated"]}, + ) + ) + if transfer_options.transfer_assets: + direct_sources.append(("WellPhotos", "PointID", {})) + if transfer_options.transfer_sensors: + direct_sources.append(("Equipment", "PointID", {})) + if transfer_options.transfer_associated_data: + direct_sources.append(("AssociatedData", "PointID", {})) + if transfer_options.transfer_hydraulics_data: + direct_sources.append(("HydraulicsData", "PointID", {})) + if transfer_options.transfer_surface_water_data: + direct_sources.append( + ("SurfaceWaterData", "PointID", {"parse_dates": ["DateMeasured"]}) + ) + if transfer_options.transfer_surface_water_photos: + direct_sources.append(("SurfaceWaterPhotos", "PointID", {})) + if transfer_options.transfer_weather_data: + direct_sources.append(("WeatherData", "PointID", {})) + if transfer_options.transfer_weather_photos: + direct_sources.append(("WeatherPhotos", "PointID", {})) + if transfer_options.transfer_ngwmn_views: + direct_sources.extend( + [ + ("view_NGWMN_WellConstruction", "PointID", {}), + ( + "view_NGWMN_WaterLevels", + "PointID", + {"parse_dates": ["DateMeasured"]}, + ), + ("view_NGWMN_Lithology", "PointID", {}), + ] + ) + if transfer_options.transfer_nma_stratigraphy: + direct_sources.append(("Stratigraphy", "PointID", {})) + if transfer_options.transfer_soil_rock_results: + direct_sources.append(("Soil_Rock_Results", "Point_ID", {})) + + for table_name, column, read_kw in direct_sources: + available.update(_source_pointids(table_name, column=column, **read_kw)) + + location_site_types: set[str] = set() + if any( + ( + transfer_options.transfer_springs, + transfer_options.transfer_perennial_streams, + transfer_options.transfer_ephemeral_streams, + transfer_options.transfer_met_stations, + transfer_options.transfer_rock_sample_locations, + transfer_options.transfer_diversion_of_surface_water, + transfer_options.transfer_lake_pond_reservoir, + transfer_options.transfer_soil_gas_sample_locations, + transfer_options.transfer_other_site_types, + transfer_options.transfer_outfall_wastewater_return_flow, + transfer_options.transfer_weather_data, + transfer_options.transfer_weather_photos, + transfer_options.transfer_surface_water_data, + transfer_options.transfer_surface_water_photos, + transfer_options.transfer_soil_rock_results, + ) + ): + site_types_by_option = { + "transfer_springs": "SP", + "transfer_perennial_streams": "PS", + "transfer_ephemeral_streams": "ES", + "transfer_met_stations": "M", + "transfer_rock_sample_locations": "R", + "transfer_diversion_of_surface_water": "D", + "transfer_lake_pond_reservoir": "L", + "transfer_soil_gas_sample_locations": "S", + "transfer_other_site_types": "OT", + "transfer_outfall_wastewater_return_flow": "O", + } + for option_name, site_type in site_types_by_option.items(): + if getattr(transfer_options, option_name): + location_site_types.add(site_type) + if any( + ( + transfer_options.transfer_weather_data, + transfer_options.transfer_weather_photos, + ) + ): + location_site_types.add("M") + if any( + ( + transfer_options.transfer_surface_water_data, + transfer_options.transfer_surface_water_photos, + ) + ): + location_site_types.update({"SP", "PS", "ES", "D", "L", "O"}) + if transfer_options.transfer_soil_rock_results: + location_site_types.add("R") + + available.update(_location_pointids_for_site_types(location_site_types)) + return available + + +def _validate_scoped_pointids_or_raise( + pointids: list[str], transfer_options: TransferOptions +) -> None: + available_pointids = _collect_available_scoped_pointids(transfer_options) + missing = sorted(set(pointids) - available_pointids) + if missing: + raise RuntimeError( + "Scoped transfer preflight failed: requested PointIDs not found in " + f"applicable source data: {missing}" + ) + + +def _seed_scoped_geologic_formations( + pointids: list[str], transfer_options: TransferOptions +) -> None: + required_codes: set[str] = set() + pointid_set = set(pointids) + + well_df = read_csv("WellData", dtype={"OSEWelltagID": str}) + if "PointID" in well_df.columns and "FormationZone" in well_df.columns: + filtered = well_df[ + well_df["PointID"].map(Transferer._normalize_pointid).isin(pointid_set) + ] + required_codes.update( + _normalize_pointid_series(filtered["FormationZone"].tolist()) + ) + + if transfer_options.transfer_nma_stratigraphy: + strat_df = read_csv("Stratigraphy") + if "PointID" in strat_df.columns and "UnitIdentifier" in strat_df.columns: + filtered = strat_df[ + strat_df["PointID"].map(Transferer._normalize_pointid).isin(pointid_set) + ] + required_codes.update( + _normalize_pointid_series(filtered["UnitIdentifier"].tolist()) + ) + + if not required_codes: + logger.info("Scoped run has no geologic formations to seed") + return + + rows = [ + {"formation_code": code, "description": None, "lithology": None} + for code in sorted(required_codes) + ] + with session_ctx() as session: + stmt = ( + pg_insert(GeologicFormation) + .values(rows) + .on_conflict_do_nothing(index_elements=["formation_code"]) + ) + session.execute(stmt) + session.commit() + logger.info("Seeded scoped geologic formations: %s", sorted(required_codes)) + + +def _execute_transfer(klass, flags: dict = None, pointids: list[str] | None = None): """Execute a single transfer class. Thread-safe since each creates its own session.""" - transferer = klass(flags=flags, pointids=_get_test_pointids()) + transferer = klass(flags=flags, pointids=pointids) transferer.transfer() return transferer.input_df, transferer.cleaned_df, transferer.errors -def _execute_transfer_with_timing(name: str, klass, flags: dict = None): +def _execute_transfer_with_timing( + name: str, klass, flags: dict = None, pointids: list[str] | None = None +): """Execute transfer and return timing info.""" start = time.time() logger.info(f"Starting parallel transfer: {name}") @@ -250,30 +468,35 @@ def _execute_transfer_with_timing(name: str, klass, flags: dict = None): yield_transfer_limit = effective_flags.get("LIMIT", 0) if yield_transfer_limit: effective_flags["LIMIT"] = max(1, yield_transfer_limit // 10) - result = _execute_transfer(klass, effective_flags) + result = _execute_transfer(klass, effective_flags, pointids) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed -def _execute_session_transfer_with_timing(name: str, transfer_func, limit: int): +def _execute_session_transfer_with_timing( + name: str, + transfer_func, + limit: int, + pointids: list[str] | None = None, +): """Execute a session-based transfer function and return timing info.""" start = time.time() logger.info(f"Starting parallel transfer: {name}") with session_ctx() as session: effective_limit = max(1, limit // 10) if limit else 0 - result = transfer_func(session, limit=effective_limit) + result = transfer_func(session, limit=effective_limit, pointids=pointids) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, result, elapsed -def _execute_permissions_with_timing(name: str): +def _execute_permissions_with_timing(name: str, pointids: list[str] | None = None): """Execute permissions transfer and return timing info.""" start = time.time() logger.info(f"Starting parallel transfer: {name}") with session_ctx() as session: - transfer_permissions(session) + transfer_permissions(session, pointids=pointids) elapsed = time.time() - start logger.info(f"Completed parallel transfer: {name} in {elapsed:.2f}s") return name, None, elapsed @@ -346,6 +569,12 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} message("TRANSFER_FLAGS") logger.info(flags) + scoped_pointids = _get_test_pointids() + if scoped_pointids: + message("SCOPED TRANSFER MODE") + logger.info("Scoped transfer mode active for PointIDs: %s", scoped_pointids) + _validate_scoped_pointids_or_raise(scoped_pointids, transfer_options) + logger.info("Preflight validation passed for requested PointIDs") profile_artifacts: list[ProfileArtifact] = [] continuous_water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) @@ -359,39 +588,48 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: return profile_artifacts else: message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") - foundational_tasks = [ - ("AquiferSystems", transfer_aquifer_systems), - ("GeologicFormations", transfer_geologic_formations), - ] + foundational_tasks = [] + if scoped_pointids: + _seed_scoped_geologic_formations(scoped_pointids, transfer_options) + else: + foundational_tasks = [ + ("AquiferSystems", transfer_aquifer_systems), + ("GeologicFormations", transfer_geologic_formations), + ] + + if foundational_tasks: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = { + executor.submit( + _execute_foundational_transfer_with_timing, name, func, limit + ): name + for name, func in foundational_tasks + } - with ThreadPoolExecutor(max_workers=2) as executor: - futures = { - executor.submit( - _execute_foundational_transfer_with_timing, name, func, limit - ): name - for name, func in foundational_tasks - } - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - logger.info( - f"Foundational transfer {result_name} completed in {elapsed:.2f}s" - ) - except Exception as e: - logger.critical(f"Foundational transfer {name} failed: {e}") - raise # Fail fast - foundational transfers must succeed + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + logger.info( + f"Foundational transfer {result_name} completed in {elapsed:.2f}s" + ) + except Exception as e: + logger.critical(f"Foundational transfer {name} failed: {e}") + raise # Fail fast - foundational transfers must succeed + elif scoped_pointids: + logger.info("Skipping broad foundational lookup transfers in scoped mode") message("TRANSFERRING WELLS") use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", True) if use_parallel_wells: logger.info("Using PARALLEL wells transfer") - transferer = WellTransferer(flags=flags, pointids=_get_test_pointids()) + transferer = WellTransferer(flags=flags, pointids=scoped_pointids) transferer.transfer_parallel() results = (transferer.input_df, transferer.cleaned_df, transferer.errors) else: - results = _execute_transfer(WellTransferer, flags=flags) + results = _execute_transfer( + WellTransferer, flags=flags, pointids=scoped_pointids + ) metrics.well_metrics(*results) # Get transfer flags @@ -439,7 +677,11 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: with ThreadPoolExecutor(max_workers=len(non_well_tasks)) as executor: futures = { executor.submit( - _execute_session_transfer_with_timing, name, func, limit + _execute_session_transfer_with_timing, + name, + func, + limit, + scoped_pointids, ): name for name, func in non_well_tasks } @@ -459,6 +701,7 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: flags, limit, transfer_options, + scoped_pointids, ) return profile_artifacts @@ -466,6 +709,7 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: def _run_continuous_water_level_transfers(metrics, flags): message("CONTINUOUS WATER LEVEL TRANSFERS") + pointids = _get_test_pointids() # ========================================================================= # PHASE 4: Parallel Group 2 (Continuous water levels - after sensors) @@ -480,7 +724,13 @@ def _run_continuous_water_level_transfers(metrics, flags): with ThreadPoolExecutor(max_workers=2) as executor: futures = {} for name, klass in parallel_tasks: - future = executor.submit(_execute_transfer_with_timing, name, klass, flags) + future = executor.submit( + _execute_transfer_with_timing, + name, + klass, + flags, + pointids, + ) futures[future] = name for future in as_completed(futures): @@ -489,7 +739,7 @@ def _run_continuous_water_level_transfers(metrics, flags): result_name, result, elapsed = future.result() results_map[result_name] = result logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s") - except Exception as e: + except Exception: import traceback logger.critical( @@ -507,6 +757,7 @@ def _transfer_parallel( flags, limit, transfer_options: TransferOptions, + pointids: list[str] | None = None, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -571,7 +822,13 @@ def _transfer_parallel( # Submit class-based transfers for name, klass in parallel_tasks_1: - future = executor.submit(_execute_transfer_with_timing, name, klass, flags) + future = executor.submit( + _execute_transfer_with_timing, + name, + klass, + flags, + pointids, + ) futures[future] = name future = executor.submit( @@ -579,6 +836,7 @@ def _transfer_parallel( "StratigraphyNew", transfer_stratigraphy, limit, + pointids, ) futures[future] = "StratigraphyNew" @@ -645,7 +903,8 @@ def _transfer_parallel( # Permissions require contact associations; run after group 1 completes. try: result_name, result, elapsed = _execute_permissions_with_timing( - "Permissions" + "Permissions", + pointids, ) results_map[result_name] = result logger.info(f"Task {result_name} completed in {elapsed:.2f}s") @@ -654,22 +913,30 @@ def _transfer_parallel( if opts.transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") - results = _execute_transfer(MajorChemistryTransferer, flags=flags) + results = _execute_transfer( + MajorChemistryTransferer, flags=flags, pointids=pointids + ) metrics.major_chemistry_metrics(*results) if opts.transfer_radionuclides: message("TRANSFERRING RADIONUCLIDES") - results = _execute_transfer(RadionuclidesTransferer, flags=flags) + results = _execute_transfer( + RadionuclidesTransferer, flags=flags, pointids=pointids + ) metrics.radionuclides_metrics(*results) if opts.transfer_minor_trace_chemistry: message("TRANSFERRING MINOR TRACE CHEMISTRY") - results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) + results = _execute_transfer( + MinorTraceChemistryTransferer, flags=flags, pointids=pointids + ) metrics.minor_trace_chemistry_metrics(*results) if opts.transfer_field_parameters: message("TRANSFERRING FIELD PARAMETERS") - results = _execute_transfer(FieldParametersTransferer, flags=flags) + results = _execute_transfer( + FieldParametersTransferer, flags=flags, pointids=pointids + ) metrics.field_parameters_metrics(*results) # ========================================================================= @@ -677,7 +944,7 @@ def _transfer_parallel( # ========================================================================= if opts.transfer_sensors: message("TRANSFERRING SENSORS") - results = _execute_transfer(SensorTransferer, flags=flags) + results = _execute_transfer(SensorTransferer, flags=flags, pointids=pointids) metrics.sensor_metrics(*results) # # ========================================================================= @@ -693,7 +960,7 @@ def _transfer_parallel( if get_bool_env("CLEANUP_LOCATIONS", True): message("CLEANING UP LOCATIONS") with session_ctx() as session: - cleanup_locations(session) + cleanup_locations(session, pointids=pointids) def main(): diff --git a/transfers/transferer.py b/transfers/transferer.py index e05fd90d3..2d8c527f0 100644 --- a/transfers/transferer.py +++ b/transfers/transferer.py @@ -45,7 +45,59 @@ def __init__(self, flags: dict = None, pointids: list = None): self.errors = [] self.flags = flags if flags else {} self.manual_fixer = ManualFixer() - self.pointids = pointids + self.pointids = self._normalize_pointids(pointids) + + @staticmethod + def _normalize_pointid(value: Any) -> str | None: + if value is None: + return None + try: + if pd.isna(value): + return None + except (TypeError, ValueError): + pass + text = str(value).strip() + if not text: + return None + return text.upper() + + @classmethod + def _normalize_pointids(cls, pointids: list | None) -> list[str] | None: + if not pointids: + return None + normalized: list[str] = [] + seen: set[str] = set() + for pointid in pointids: + normalized_pointid = cls._normalize_pointid(pointid) + if normalized_pointid is None or normalized_pointid in seen: + continue + seen.add(normalized_pointid) + normalized.append(normalized_pointid) + return normalized or None + + def scoped_pointids(self) -> list[str] | None: + return self.pointids + + def scoped_pointid_set(self) -> set[str] | None: + if not self.pointids: + return None + return set(self.pointids) + + def is_scoped_run(self) -> bool: + return bool(self.pointids) + + def filter_df_to_requested_pointids( + self, df: pd.DataFrame, column: str = "PointID" + ) -> pd.DataFrame: + if not self.is_scoped_run() or column not in df.columns: + return df + + pointid_set = self.scoped_pointid_set() + if not pointid_set: + return df + + normalized_series = df[column].map(self._normalize_pointid) + return df[normalized_series.isin(pointid_set)].copy() def _df_len(self, df: pd.DataFrame | None) -> int: return int(len(df)) if df is not None else 0 @@ -71,8 +123,15 @@ def _capture_validation_error(self, pointid: str, err: ValidationError) -> None: ) def _capture_database_error(self, pointid: str, err: DatabaseError) -> None: - error_dict = err.orig.args[0] - self._capture_error(pointid, error_dict["D"], error_dict["t"]) + error_dict = {} + if getattr(err, "orig", None) is not None and getattr(err.orig, "args", None): + first_arg = err.orig.args[0] + if isinstance(first_arg, dict): + error_dict = first_arg + + error_message = error_dict.get("D") or error_dict.get("M") or str(err) + error_field = error_dict.get("t") or "DatabaseError" + self._capture_error(pointid, error_message, error_field) def _capture_error(self, pointid: str, error: str, field: str, table=None) -> None: if table is None: @@ -298,14 +357,15 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _build_sample_info_cache(self) -> None: """Build cache of nma_sample_pt_id -> id for FK lookups.""" with session_ctx() as session: - sample_infos = ( - session.query( - NMA_Chemistry_SampleInfo.nma_sample_pt_id, - NMA_Chemistry_SampleInfo.id, - ) - .filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None)) - .all() - ) + query = session.query( + NMA_Chemistry_SampleInfo.nma_sample_pt_id, + NMA_Chemistry_SampleInfo.id, + ).filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None)) + if self.is_scoped_run(): + query = query.join( + Thing, Thing.id == NMA_Chemistry_SampleInfo.thing_id + ).filter(Thing.name.in_(self.pointids)) + sample_infos = query.all() self._sample_info_cache = { nma_sample_pt_id: csi_id for nma_sample_pt_id, csi_id in sample_infos } diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index 27c5255e3..ec217ef9d 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -58,7 +58,7 @@ def __init__(self, *args, **kw): def _get_dfs(self): input_df = read_csv(self.source_table, parse_dates=["DateMeasured"]) - cleaned_df = filter_to_valid_point_ids(input_df) + cleaned_df = filter_to_valid_point_ids(input_df, self.pointids) cleaned_df = cleaned_df.sort_values(by=["PointID"]) # remove rows with no date measured diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 5ab4819af..6f51286ca 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -106,6 +106,12 @@ def _build_caches(self) -> None: name: thing_id for name, thing_id in session.query(Thing.name, Thing.id).all() } + self._created_contact_id_by_key = { + (name, organization): contact_id + for name, organization, contact_id in session.query( + Contact.name, Contact.organization, Contact.id + ).all() + } owner_rows = ( session.query(Thing.name, ThingContactAssociation.contact_id) @@ -122,15 +128,16 @@ def _build_caches(self) -> None: self._owner_contact_id_by_pointid = owner_contact_cache logger.info( - "Built WaterLevels caches: %s Things, %s owner contacts", + "Built WaterLevels caches: %s Things, %s contacts, %s owner contacts", len(self._thing_id_by_pointid), + len(self._created_contact_id_by_key), len(self._owner_contact_id_by_pointid), ) def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: input_df = read_csv(self.source_table, dtype={"MeasuredBy": str}) input_df = replace_nans(input_df) - cleaned_df = filter_to_valid_point_ids(input_df) + cleaned_df = filter_to_valid_point_ids(input_df, self.pointids) cleaned_df = filter_by_valid_measuring_agency(cleaned_df) logger.info( "Prepared %s rows for %s after filtering (%s -> %s)", @@ -160,6 +167,7 @@ def _transfer_hook(self, session: Session) -> None: "contacts_created": 0, "contacts_reused": 0, "notes_created": 0, + "rows_skipped_existing": 0, } gwd = self.cleaned_df.groupby(["PointID"]) @@ -175,6 +183,28 @@ def _transfer_hook(self, session: Session) -> None: len(group), ) + group_globalids = [ + str(global_id) + for global_id in group["GlobalID"].tolist() + if pd.notna(global_id) + ] + existing_globalids: set[str] = set() + if group_globalids: + existing_globalids.update( + global_id + for (global_id,) in session.query(Sample.nma_pk_waterlevels) + .filter(Sample.nma_pk_waterlevels.in_(group_globalids)) + .all() + if global_id + ) + existing_globalids.update( + global_id + for (global_id,) in session.query(Observation.nma_pk_waterlevels) + .filter(Observation.nma_pk_waterlevels.in_(group_globalids)) + .all() + if global_id + ) + thing_id = self._thing_id_by_pointid.get(pointid) if thing_id is None: stats["groups_skipped_missing_thing"] += 1 @@ -184,6 +214,10 @@ def _transfer_hook(self, session: Session) -> None: prepared_rows: list[dict[str, Any]] = [] for i, row in enumerate(group.itertuples()): stats["rows_total"] += 1 + row_globalid = str(row.GlobalID) if pd.notna(row.GlobalID) else None + if row_globalid and row_globalid in existing_globalids: + stats["rows_skipped_existing"] += 1 + continue dt_utc = self._get_dt_utc(row) if dt_utc is None: stats["rows_skipped_dt"] += 1 @@ -648,14 +682,15 @@ def _get_field_event_participant_ids(self, session, row) -> list[int]: if contacts_to_create: try: - created_contact_ids = ( - session.execute( - insert(Contact).returning(Contact.id), - contacts_to_create, + with session.begin_nested(): + created_contact_ids = ( + session.execute( + insert(Contact).returning(Contact.id), + contacts_to_create, + ) + .scalars() + .all() ) - .scalars() - .all() - ) except Exception as e: logger.critical( "Contact insert failed for PointID=%s, GlobalID=%s: %s", @@ -663,6 +698,26 @@ def _get_field_event_participant_ids(self, session, row) -> list[int]: row.GlobalID, str(e), ) + existing_contact_ids = self._lookup_existing_contact_ids( + session, missing_keys + ) + unresolved_keys: list[tuple[str, str]] = [] + for key in missing_keys: + existing_contact_id = existing_contact_ids.get(key) + if existing_contact_id is None: + unresolved_keys.append(key) + continue + self._created_contact_id_by_key[key] = existing_contact_id + field_event_participant_ids.append(existing_contact_id) + self._last_contacts_reused_count += 1 + + if unresolved_keys: + logger.critical( + "Unable to resolve existing contact ids for PointID=%s, GlobalID=%s, keys=%s", + row.PointID, + row.GlobalID, + unresolved_keys, + ) else: for key, created_contact_id, payload in zip( missing_keys, created_contact_ids, contacts_to_create @@ -703,6 +758,23 @@ def _get_field_event_participant_ids(self, session, row) -> list[int]: return field_event_participant_ids + def _lookup_existing_contact_ids( + self, session: Session, keys: list[tuple[str, str]] + ) -> dict[tuple[str, str], int]: + existing_contact_ids: dict[tuple[str, str], int] = {} + for name, organization in keys: + contact_id = ( + session.query(Contact.id) + .filter( + Contact.name == name, + Contact.organization == organization, + ) + .scalar() + ) + if contact_id is not None: + existing_contact_ids[(name, organization)] = contact_id + return existing_contact_ids + def _row_context(self, row: Any) -> str: return ( f"PointID={getattr(row, 'PointID', None)}, " @@ -713,7 +785,7 @@ def _row_context(self, row: Any) -> str: def _log_transfer_summary(self, stats: dict[str, int]) -> None: logger.info( "WaterLevels summary: groups total=%s processed=%s skipped_missing_thing=%s failed_commit=%s " - "rows total=%s created=%s skipped_dt=%s skipped_reason=%s missing_participants=%s well_destroyed=%s " + "rows total=%s created=%s skipped_existing=%s skipped_dt=%s skipped_reason=%s missing_participants=%s well_destroyed=%s " "field_events=%s activities=%s samples=%s observations=%s contacts_created=%s contacts_reused=%s", stats["groups_total"], stats["groups_processed"], @@ -721,6 +793,7 @@ def _log_transfer_summary(self, stats: dict[str, int]) -> None: stats["groups_failed_commit"], stats["rows_total"], stats["rows_created"], + stats["rows_skipped_existing"], stats["rows_skipped_dt"], stats["rows_skipped_reason"], stats["rows_missing_participants"], diff --git a/transfers/waterlevelscontinuous_pressure_daily.py b/transfers/waterlevelscontinuous_pressure_daily.py index 0c364697f..06135f2e1 100644 --- a/transfers/waterlevelscontinuous_pressure_daily.py +++ b/transfers/waterlevelscontinuous_pressure_daily.py @@ -26,7 +26,7 @@ from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import read_csv +from transfers.util import read_csv, filter_to_valid_point_ids class NMA_WaterLevelsContinuous_Pressure_DailyTransferer(Transferer): @@ -73,7 +73,8 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: self.source_table, parse_dates=["DateMeasured", "Created", "Updated"], ) - cleaned_df = self._filter_to_valid_things(input_df) + cleaned_df = filter_to_valid_point_ids(input_df, self.pointids) + cleaned_df = self._filter_to_valid_things(cleaned_df) return input_df, cleaned_df def _transfer_hook(self, session: Session) -> None: diff --git a/transfers/weather_data.py b/transfers/weather_data.py index 9be3f1574..b48bf0ece 100644 --- a/transfers/weather_data.py +++ b/transfers/weather_data.py @@ -26,7 +26,7 @@ from db import NMA_WeatherData from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import read_csv +from transfers.util import read_csv, filter_to_valid_point_ids class WeatherDataTransferer(Transferer): @@ -42,7 +42,8 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = read_csv(self.source_table) - return df, df + cleaned_df = filter_to_valid_point_ids(df, self.pointids) + return df, cleaned_df def _transfer_hook(self, session: Session) -> None: rows = self._dedupe_rows( diff --git a/transfers/weather_photos.py b/transfers/weather_photos.py index 1a204f8af..368701f2c 100644 --- a/transfers/weather_photos.py +++ b/transfers/weather_photos.py @@ -26,7 +26,7 @@ from db import NMA_WeatherPhotos from transfers.logger import logger from transfers.transferer import Transferer -from transfers.util import replace_nans +from transfers.util import replace_nans, filter_to_valid_point_ids class WeatherPhotosTransferer(Transferer): @@ -41,6 +41,7 @@ def __init__(self, *args, batch_size: int = 1000, **kwargs): def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) cleaned_df = replace_nans(df) + cleaned_df = filter_to_valid_point_ids(cleaned_df, self.pointids) return df, cleaned_df def _transfer_hook(self, session: Session) -> None: diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index e477eb3b5..50e8247d1 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -1077,7 +1077,7 @@ def _get_dfs(self): input_df = read_csv(self.source_table, self.source_dtypes) wdf = replace_nans(input_df) - cleaned_df = filter_to_valid_point_ids(wdf) + cleaned_df = filter_to_valid_point_ids(wdf, self.pointids) return input_df, cleaned_df def _get_df_chunk(self, session, chunk): diff --git a/transfers/well_transfer_util.py b/transfers/well_transfer_util.py index 40660349f..9a1ed22ea 100644 --- a/transfers/well_transfer_util.py +++ b/transfers/well_transfer_util.py @@ -19,7 +19,8 @@ from pandas import isna from sqlalchemy.orm import Session -from db import GeologicFormation, Location +from db import GeologicFormation, Location, LocationThingAssociation, Thing +from transfers.transferer import Transferer from services.gcs_helper import get_storage_bucket from services.util import ( get_state_from_point, @@ -169,11 +170,32 @@ def dump_cached_elevations(lut: dict): upload_blob_json(blob, lut) -def cleanup_locations(session): - locations = session.query(Location).all() +def cleanup_locations(session, pointids: list[str] | None = None): + normalized_pointids = Transferer._normalize_pointids(pointids) + + location_query = session.query(Location) + if normalized_pointids: + location_query = ( + location_query.join( + LocationThingAssociation, + LocationThingAssociation.location_id == Location.id, + ) + .join(Thing, Thing.id == LocationThingAssociation.thing_id) + .filter(Thing.name.in_(normalized_pointids)) + .distinct() + ) + + locations = location_query.all() n = len(locations) lut = {} + if normalized_pointids: + logger.info( + "Scoped location cleanup active for PointIDs %s (%s Location records)", + normalized_pointids, + n, + ) + bucket = get_storage_bucket() log_filename = "transfer_data/location_cleanup.json" blob = bucket.blob(log_filename)