NO TICKET fix(transfers): add scoped PointID transfer mode and rerun guards#642
NO TICKET fix(transfers): add scoped PointID transfer mode and rerun guards#642ksmuczynski wants to merge 1 commit into
Conversation
Preserve the default bulk legacy transfer behavior while making `TRANSFER_TEST_POINTIDS` an opt-in scoped mode that limits transfers to the requested legacy PointIDs across well and non-well site types. Also harden scoped reruns by adding duplicate guards to create-only transfer paths such as water levels, assets, permissions, group associations, and thing-id links, and scope location cleanup to affected locations only. This makes targeted remediation runs safer, faster, and predictable without changing the existing full-transfer workflow.
There was a problem hiding this comment.
Pull request overview
Adds an opt-in “scoped transfer mode” for the legacy ETL pipeline driven by TRANSFER_TEST_POINTIDS, so targeted remediation runs only touch requested legacy PointIDs and are safer to rerun.
Changes:
- Normalize and thread scoped PointIDs through orchestration and many transferers; add scoped preflight validation.
- Scope formerly-global cleanup work (notably
Locationcleanup) to requested PointIDs when in scoped mode. - Add/strengthen rerun idempotency guards to reduce duplicates/uniqueness failures during targeted reruns.
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| transfers/transfer.py | Adds scoped PointID normalization, preflight validation, scoped foundational seeding, and threads pointids through parallel/session transfers and cleanup. |
| transfers/transferer.py | Normalizes incoming pointids; adds scoped helpers; hardens _capture_database_error; scopes chemistry sample info cache query. |
| transfers/well_transfer.py | Filters WellData rows via filter_to_valid_point_ids(..., self.pointids) for scoped runs. |
| transfers/well_transfer_util.py | Scopes cleanup_locations() to Locations associated to requested PointIDs (via joins) when scoped. |
| transfers/thing_transfer.py | Adds pointids parameter and filters Location-derived Thing creation to requested PointIDs. |
| transfers/permissions_transfer.py | Adds pointids parameter; scopes WellData/well query; adds duplicate guards for PermissionHistory creation. |
| transfers/waterlevels_transfer.py | Scopes input DF filtering; adds duplicate guards by WaterLevels GlobalID; adds contact reuse fallback on uniqueness failures; expands caches. |
| transfers/waterlevelscontinuous_pressure_daily.py | Applies scoped PointID filtering before thing validation. |
| transfers/waterlevels_transducer_transfer.py | Applies scoped PointID filtering. |
| transfers/weather_data.py | Applies scoped PointID filtering. |
| transfers/weather_photos.py | Applies scoped PointID filtering. |
| transfers/surface_water_data.py | Applies scoped PointID filtering. |
| transfers/surface_water_photos.py | Applies scoped PointID filtering. |
| transfers/ngwmn_views.py | Applies scoped PointID filtering. |
| transfers/sensor_transfer.py | Applies scoped PointID filtering. |
| transfers/hydraulicsdata.py | Applies scoped PointID filtering before thing validation. |
| transfers/associated_data.py | Applies scoped PointID filtering. |
| transfers/asset_transfer.py | Applies scoped PointID filtering; adds missing-well guard and duplicate asset guards. |
| transfers/link_ids_transfer.py | Applies scoped PointID filtering; adds duplicate link guards. |
| transfers/group_transfer.py | Adds scoped filtering for prefix expansion; avoids duplicate group-thing associations. |
| transfers/stratigraphy_transfer.py | Adds pointids parameter and applies scoped PointID filtering. |
| transfers/soil_rock_results.py | Scopes Soil/Rock results to requested Point_IDs during scoped runs. |
| transfers/chemistry_sampleinfo.py | Scopes location->thing cache building by requested PointIDs in scoped runs. |
| transfers/minor_trace_chemistry_transfer.py | Scopes sample info cache by requested PointIDs in scoped runs. |
| tests/test_transfer_scoping.py | New tests for pointid normalization, preflight validation, pointids threading, scoped thing transfer, scoped location cleanup. |
| tests/test_transfer_legacy_dates.py | Adds tests for hardened DB error capture and waterlevels contact reuse behavior. |
| tests/test_thing_transfer.py | Updates test helper signature to include pointids. |
Comments suppressed due to low confidence (1)
transfers/permissions_transfer.py:95
- visited is a list but is used for membership checks (well.id in visited) inside the main loop. For many wells this becomes O(n^2). Use a set for visited (or dedupe in the SQL query) to keep membership checks O(1).
created_count = 0
visited = []
for chunk in chunk_by_size(transferred_wells, 100):
objs = []
for row in chunk.itertuples():
well = row.Thing
contact = row.Contact
if well.id in visited:
continue
visited.append(well.id)
| group_globalids = [ | ||
| str(global_id) | ||
| for global_id in group["GlobalID"].tolist() | ||
| if pd.notna(global_id) | ||
| ] | ||
| existing_globalids: set[str] = set() | ||
| if group_globalids: | ||
| existing_globalids.update( | ||
| global_id | ||
| for (global_id,) in session.query(Sample.nma_pk_waterlevels) | ||
| .filter(Sample.nma_pk_waterlevels.in_(group_globalids)) | ||
| .all() | ||
| if global_id | ||
| ) | ||
| existing_globalids.update( | ||
| global_id | ||
| for (global_id,) in session.query(Observation.nma_pk_waterlevels) | ||
| .filter(Observation.nma_pk_waterlevels.in_(group_globalids)) | ||
| .all() | ||
| if global_id | ||
| ) |
There was a problem hiding this comment.
WaterLevelsTransferer now issues up to 2 DB queries per PointID group to check existing GlobalIDs (Sample + Observation). In bulk runs with many PointIDs this becomes an N+1 pattern and can substantially slow transfers. Consider preloading all existing nma_pk_waterlevels for the entire cleaned_df GlobalID set in one (or two) queries before the group loop (or using a UNION) and then doing in-memory membership checks per row/group.
| self._thing_id_by_pointid = { | ||
| name: thing_id | ||
| for name, thing_id in session.query(Thing.name, Thing.id).all() | ||
| } | ||
| self._created_contact_id_by_key = { | ||
| (name, organization): contact_id | ||
| for name, organization, contact_id in session.query( | ||
| Contact.name, Contact.organization, Contact.id | ||
| ).all() | ||
| } |
There was a problem hiding this comment.
_build_caches() now loads all Contact rows into memory (name, organization -> id). This can be expensive for bulk transfers and may significantly increase startup time/memory usage. If the cache is only needed for scoped reruns/duplicate handling, consider limiting this query to the scoped PointIDs (or lazily resolving only the missing (name, organization) keys encountered during processing).
| existing_permissions = { | ||
| (target_id, contact_id, permission_type) | ||
| for target_id, contact_id, permission_type in session.query( | ||
| PermissionHistory.target_id, | ||
| PermissionHistory.contact_id, | ||
| PermissionHistory.permission_type, | ||
| ) | ||
| .filter(PermissionHistory.target_table == "thing") | ||
| .all() | ||
| } |
There was a problem hiding this comment.
transfer_permissions builds existing_permissions by loading all PermissionHistory rows for target_table='thing' into a Python set. On large datasets this can be a major memory/time hit. Since this is only used to prevent duplicates for the wells being processed, consider filtering the query to the relevant target_ids (and permission_types) for this run (e.g., the well ids from transferred_wells_query).
| existing_permissions = { | |
| (target_id, contact_id, permission_type) | |
| for target_id, contact_id, permission_type in session.query( | |
| PermissionHistory.target_id, | |
| PermissionHistory.contact_id, | |
| PermissionHistory.permission_type, | |
| ) | |
| .filter(PermissionHistory.target_table == "thing") | |
| .all() | |
| } | |
| permission_types = {"Water Chemistry Sample", "Water Level Sample"} | |
| relevant_well_ids = {well.id for well, _ in transferred_wells} | |
| if relevant_well_ids: | |
| existing_permissions = { | |
| (target_id, contact_id, permission_type) | |
| for target_id, contact_id, permission_type in session.query( | |
| PermissionHistory.target_id, | |
| PermissionHistory.contact_id, | |
| PermissionHistory.permission_type, | |
| ) | |
| .filter(PermissionHistory.target_table == "thing") | |
| .filter(PermissionHistory.target_id.in_(relevant_well_ids)) | |
| .filter(PermissionHistory.permission_type.in_(permission_types)) | |
| .all() | |
| } | |
| else: | |
| existing_permissions = set() |
| normalized_pointids = ldf["PointID"].map( | ||
| lambda value: str(value).strip().upper() | ||
| ) | ||
| ldf = ldf[normalized_pointids.isin(set(pointids))] |
There was a problem hiding this comment.
Scoped filtering normalizes the Location df PointID values to uppercase, but compares against the raw pointids list. If callers pass lowercase/mixed-case PointIDs, the filter will unexpectedly drop matches. Normalize the incoming pointids (e.g., via Transferer._normalize_pointids / _normalize_pointid) before building the comparison set so matching is case-insensitive and consistent with the rest of the scoped transfer pipeline.
| normalized_pointids = ldf["PointID"].map( | |
| lambda value: str(value).strip().upper() | |
| ) | |
| ldf = ldf[normalized_pointids.isin(set(pointids))] | |
| normalized_requested_pointids = { | |
| str(value).strip().upper() for value in pointids | |
| } | |
| normalized_pointids = ldf["PointID"].map( | |
| lambda value: str(value).strip().upper() | |
| ) | |
| ldf = ldf[normalized_pointids.isin(normalized_requested_pointids)] |
jirhiker
left a comment
There was a problem hiding this comment.
Lets change this into a cli function and not touch any of the existing transfer logic.
This should alway you to greatly simplify the logic as no parallel processing really necessary.
At this point we should never have to run a bulk wholesale transfer from NM_Aquifer again
|
I like the idea of a separate CLI path for scoped transfers. It would definitely simplify the operator path for targeted runs. That said, I don’t think a separate CLI alone would have been sufficient here. The main issues I found were not just entrypoint/orchestration issues, but transferer-level correctness issues. For example:
So even with a separate CLI, we still would have needed the lower-level transfer logic changes to make scoped runs behave correctly. What do you think about keeping this PR focused on the underlying transferer correctness fixes, and then opening a follow-up PR to add a dedicated scoped-transfer CLI/entrypoint on top of that? @jirhiker |
jacob-a-brown
left a comment
There was a problem hiding this comment.
Overall it looks quite good! I mostly made style comments that I think would make the code easier to read and maintain.
| .join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id) | ||
| .filter(AssetThingAssociation.thing_id == db_item.id) | ||
| .all() | ||
| if name |
There was a problem hiding this comment.
This can be updated to if name is not None (if that's the intended behavior (would it be if len(name) > 1?). There may be some edge cases where name is "" or 0, which may be valid, but with falsiness/truthiness if name will evaluate to False
| .join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id) | ||
| .filter(AssetThingAssociation.thing_id == db_item.id) | ||
| .all() | ||
| if storage_path |
There was a problem hiding this comment.
Same as above comment: use if storage_path is not None (if that's the intended behavior). I think it's better to be explicit, particularly when checking different conditions
| if self.is_scoped_run(): | ||
| results = results.join( | ||
| Thing, Thing.id == LocationThingAssociation.thing_id | ||
| ).filter(Thing.name.in_(self.pointids)) |
There was a problem hiding this comment.
I'd recommend using an if-else block since results may be overwritten if self.is_scoped_run() is True. It's a bit harder to follow otherwise
if self.is_scored_run():
results_query = query
else:
results_query = different query
results = results_query.all()| if self.is_scoped_run(): | ||
| sql = sql.where(Thing.name.in_(self.pointids)) |
There was a problem hiding this comment.
Id' recommend using an if-else block to make it easier to follow since sql may be overwritten if self.is_scoped_run() is True
if self.is_scoped_run():
sql = query...
else:
sql = a different query...
records = session.scalars(sql).unique().all()| if record.id not in existing_thing_ids: | ||
| gta = GroupThingAssociation(group=group, thing=record) | ||
| session.add(gta) | ||
| group.thing_associations.append(gta) | ||
| existing_thing_ids.add(record.id) |
There was a problem hiding this comment.
Should we log that another GroupThingAssociation is being created?
| query = session.query( | ||
| NMA_Chemistry_SampleInfo.nma_sample_pt_id, | ||
| NMA_Chemistry_SampleInfo.id, | ||
| ).filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None)) | ||
| if self.is_scoped_run(): | ||
| query = query.join( | ||
| Thing, Thing.id == NMA_Chemistry_SampleInfo.thing_id | ||
| ).filter(Thing.name.in_(self.pointids)) |
There was a problem hiding this comment.
same comment as above: consider using an if-else block here for clarity
| permission = _make_permission( | ||
| wdf, well, contact.id, "SampleOK", "Water Chemistry Sample" | ||
| ) | ||
| if permission: | ||
| if ( | ||
| permission | ||
| and ( | ||
| well.id, | ||
| contact.id, | ||
| permission.permission_type, | ||
| ) | ||
| not in existing_permissions | ||
| ): | ||
| objs.append(permission) | ||
| created_count += 1 | ||
| existing_permissions.add( | ||
| (well.id, contact.id, permission.permission_type) | ||
| ) | ||
|
|
||
| permission = _make_permission( | ||
| wdf, well, contact.id, "MonitorOK", "Water Level Sample" | ||
| ) | ||
| if permission: | ||
| if ( | ||
| permission | ||
| and ( | ||
| well.id, | ||
| contact.id, | ||
| permission.permission_type, | ||
| ) | ||
| not in existing_permissions | ||
| ): | ||
| objs.append(permission) | ||
| created_count += 1 | ||
| existing_permissions.add( | ||
| (well.id, contact.id, permission.permission_type) | ||
| ) |
There was a problem hiding this comment.
This is repeated code with different variables when calling _make_permission. Consider putting it in a for loop for clarity and ease of maintenance:
for permission_tuple in (("MonitorOK", "Water Level Sample"), ("SampleOK", "Water Chemistry Sample")):
nma_field = permission_tuple[0]
permission_type = permission_tuple[1]
permission = _make_permission(wdf, well, contact.id, nma_field, permission_type)
if (
permission
and (
well.id,
contact.id,
permission.permission_type,
)
not in existing_permissions
):
objs.append(permission)
created_count += 1
existing_permissions.add((well.id, contact.id, permission.permission_type))There was a problem hiding this comment.
There are a bunch of new functions defined in this file. I'd recommend adding a docstring to each of them to make them easier to understand and use (for now and the future).
| normalized: list[str] = [] | ||
| seen: set[str] = set() |
There was a problem hiding this comment.
An aside, I really like that you are typing your variables 🔥 . I want to do that whenever possible moving forward, I think it's good practice
| location_query = session.query(Location) | ||
| if normalized_pointids: | ||
| location_query = ( | ||
| location_query.join( | ||
| LocationThingAssociation, | ||
| LocationThingAssociation.location_id == Location.id, | ||
| ) | ||
| .join(Thing, Thing.id == LocationThingAssociation.thing_id) | ||
| .filter(Thing.name.in_(normalized_pointids)) | ||
| .distinct() | ||
| ) |
There was a problem hiding this comment.
like above consider using an if-else block here for clarity
|
@ksmuczynski i don't want any changes to the transferer logic. Lets consider it deprecated. The CLI will need to implement the correct transfer logic for scoped point id transfer |
|
Closed. Do not build on A new PR will be opened to implement a new CLI command for targeted legacy PointID transfer. Use |
Summary
This PR adds an opt-in scoped transfer mode for the legacy ETL pipeline using
TRANSFER_TEST_POINTIDS.When
TRANSFER_TEST_POINTIDSis unset, the existing bulk transfer workflow remains unchanged.When
TRANSFER_TEST_POINTIDSis set, the transfer pipeline now scopes work to the requested legacyPointIDvalues across applicable transferers instead of pulling unrelated records. This PR also hardens targeted reruns so scoped remediation runs are safer and more predictable.Why
This PR addresses the following problem / context:
TRANSFER_TEST_POINTIDSdid not previously behave as a universal scoped transfer mode.A targeted run for a single
PointIDcould still:ThingrecordsThat made targeted remediation runs unsafe in staging/production and broke the expectation that scoped runs should only touch the requested legacy records.
This PR keeps the default bulk transfer path intact and only changes behavior when
TRANSFER_TEST_POINTIDSis explicitly set.That gives us:
PointIDvaluesPointIDvalues are missingHow
Implementation summary - the following was changed / added / removed:
Scoped transfer orchestration
TRANSFER_TEST_POINTIDSthrough the transfer pipelinePointIDvalues fail fast before transfer work beginsTRANSFER_TEST_POINTIDSis not setScoped transferer behavior
Updated transferers to honor scoped
PointIDruns where applicable, including:ThingtransfersScoped cleanup behavior
Locationrows associated with the requestedPointIDvalues instead of scanning the full table during targeted runsIdempotency hardening for targeted reruns
Added duplicate guards for transfer paths that were still create-only during scoped reruns, including:
Also hardened water-level contact reuse so existing contacts are reused instead of causing a transaction-aborting uniqueness failure.
Validation
Validated locally and through targeted scoped transfer runs:
pytest tests/test_transfer_scoping.py tests/test_thing_transfer.py tests/test_well_transfer.py tests/transfers/test_contact_with_multiple_wells.py -qSM-0001LocationrowsNotes
Any special considerations, workarounds, or follow-up work to note?
TRANSFER_TEST_POINTIDSis set.SM-0001)