Skip to content

NO TICKET fix(transfers): add scoped PointID transfer mode and rerun guards#642

Closed
ksmuczynski wants to merge 1 commit into
stagingfrom
kas-transfer-select-pointids-and-data
Closed

NO TICKET fix(transfers): add scoped PointID transfer mode and rerun guards#642
ksmuczynski wants to merge 1 commit into
stagingfrom
kas-transfer-select-pointids-and-data

Conversation

@ksmuczynski

Copy link
Copy Markdown
Contributor

Summary

This PR adds an opt-in scoped transfer mode for the legacy ETL pipeline using TRANSFER_TEST_POINTIDS.

When TRANSFER_TEST_POINTIDS is unset, the existing bulk transfer workflow remains unchanged.

When TRANSFER_TEST_POINTIDS is set, the transfer pipeline now scopes work to the requested legacy PointID values across applicable transferers instead of pulling unrelated records. This PR also hardens targeted reruns so scoped remediation runs are safer and more predictable.

Why

This PR addresses the following problem / context:
TRANSFER_TEST_POINTIDS did not previously behave as a universal scoped transfer mode.

A targeted run for a single PointID could still:

  • transfer unrelated non-well Thing records
  • transfer unrelated assets and other downstream records
  • perform global location cleanup
  • create duplicate records on rerun for some create-only transfer paths

That made targeted remediation runs unsafe in staging/production and broke the expectation that scoped runs should only touch the requested legacy records.

This PR keeps the default bulk transfer path intact and only changes behavior when TRANSFER_TEST_POINTIDS is explicitly set.

That gives us:

  • backwards-compatible bulk transfers
  • targeted transfer runs for one or more requested legacy PointID values
  • clearer operator feedback when requested PointID values are missing
  • safer reruns through transfer-specific duplicate guards
  • faster scoped runs by avoiding global cleanup work

How

Implementation summary - the following was changed / added / removed:

Scoped transfer orchestration

  • Normalized and threaded TRANSFER_TEST_POINTIDS through the transfer pipeline
  • Added scoped preflight validation so missing requested PointID values fail fast before transfer work begins
  • Preserved existing bulk transfer behavior when TRANSFER_TEST_POINTIDS is not set
  • Added scoped geologic formation seeding instead of broad foundational lookup loading during scoped runs

Scoped transferer behavior

Updated transferers to honor scoped PointID runs where applicable, including:

  • well and non-well Thing transfers
  • assets
  • associated data
  • chemistry sample info and chemistry child transfers
  • hydraulics
  • sensors
  • NGWMN views
  • surface water / weather data and photos
  • soil / rock results
  • stratigraphy
  • water levels and continuous water-level variants
  • link IDs
  • group associations
  • permissions

Scoped cleanup behavior

  • Scoped location cleanup to only Location rows associated with the requested PointID values instead of scanning the full table during targeted runs

Idempotency hardening for targeted reruns

Added duplicate guards for transfer paths that were still create-only during scoped reruns, including:

  • water level field-event lineage
  • asset associations
  • permission history
  • group-to-thing associations
  • thing ID links

Also hardened water-level contact reuse so existing contacts are reused instead of causing a transaction-aborting uniqueness failure.

Validation

Validated locally and through targeted scoped transfer runs:

  • pytest tests/test_transfer_scoping.py tests/test_thing_transfer.py tests/test_well_transfer.py tests/transfers/test_contact_with_multiple_wells.py -q
  • additional targeted pytest coverage for transfer error handling and scoped helpers
  • successful scoped transfer rerun for SM-0001
  • verified scoped location cleanup against only affected Location rows
  • verified rerun behavior after adding duplicate guards

Notes

Any special considerations, workarounds, or follow-up work to note?

  • Bulk transfer behavior is intended to remain unchanged unless TRANSFER_TEST_POINTIDS is set.
  • The scoped mode was validated against a real targeted transfer scenario (SM-0001)

Preserve the default bulk legacy transfer behavior while making `TRANSFER_TEST_POINTIDS` an opt-in scoped mode that limits transfers to the requested legacy PointIDs across well and non-well site types.

Also harden scoped reruns by adding duplicate guards to create-only transfer paths such as water levels, assets, permissions, group associations, and thing-id links, and scope location cleanup to affected locations only.

This makes targeted remediation runs safer, faster, and predictable without changing the existing full-transfer workflow.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in “scoped transfer mode” for the legacy ETL pipeline driven by TRANSFER_TEST_POINTIDS, so targeted remediation runs only touch requested legacy PointIDs and are safer to rerun.

Changes:

  • Normalize and thread scoped PointIDs through orchestration and many transferers; add scoped preflight validation.
  • Scope formerly-global cleanup work (notably Location cleanup) to requested PointIDs when in scoped mode.
  • Add/strengthen rerun idempotency guards to reduce duplicates/uniqueness failures during targeted reruns.

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
transfers/transfer.py Adds scoped PointID normalization, preflight validation, scoped foundational seeding, and threads pointids through parallel/session transfers and cleanup.
transfers/transferer.py Normalizes incoming pointids; adds scoped helpers; hardens _capture_database_error; scopes chemistry sample info cache query.
transfers/well_transfer.py Filters WellData rows via filter_to_valid_point_ids(..., self.pointids) for scoped runs.
transfers/well_transfer_util.py Scopes cleanup_locations() to Locations associated to requested PointIDs (via joins) when scoped.
transfers/thing_transfer.py Adds pointids parameter and filters Location-derived Thing creation to requested PointIDs.
transfers/permissions_transfer.py Adds pointids parameter; scopes WellData/well query; adds duplicate guards for PermissionHistory creation.
transfers/waterlevels_transfer.py Scopes input DF filtering; adds duplicate guards by WaterLevels GlobalID; adds contact reuse fallback on uniqueness failures; expands caches.
transfers/waterlevelscontinuous_pressure_daily.py Applies scoped PointID filtering before thing validation.
transfers/waterlevels_transducer_transfer.py Applies scoped PointID filtering.
transfers/weather_data.py Applies scoped PointID filtering.
transfers/weather_photos.py Applies scoped PointID filtering.
transfers/surface_water_data.py Applies scoped PointID filtering.
transfers/surface_water_photos.py Applies scoped PointID filtering.
transfers/ngwmn_views.py Applies scoped PointID filtering.
transfers/sensor_transfer.py Applies scoped PointID filtering.
transfers/hydraulicsdata.py Applies scoped PointID filtering before thing validation.
transfers/associated_data.py Applies scoped PointID filtering.
transfers/asset_transfer.py Applies scoped PointID filtering; adds missing-well guard and duplicate asset guards.
transfers/link_ids_transfer.py Applies scoped PointID filtering; adds duplicate link guards.
transfers/group_transfer.py Adds scoped filtering for prefix expansion; avoids duplicate group-thing associations.
transfers/stratigraphy_transfer.py Adds pointids parameter and applies scoped PointID filtering.
transfers/soil_rock_results.py Scopes Soil/Rock results to requested Point_IDs during scoped runs.
transfers/chemistry_sampleinfo.py Scopes location->thing cache building by requested PointIDs in scoped runs.
transfers/minor_trace_chemistry_transfer.py Scopes sample info cache by requested PointIDs in scoped runs.
tests/test_transfer_scoping.py New tests for pointid normalization, preflight validation, pointids threading, scoped thing transfer, scoped location cleanup.
tests/test_transfer_legacy_dates.py Adds tests for hardened DB error capture and waterlevels contact reuse behavior.
tests/test_thing_transfer.py Updates test helper signature to include pointids.
Comments suppressed due to low confidence (1)

transfers/permissions_transfer.py:95

  • visited is a list but is used for membership checks (well.id in visited) inside the main loop. For many wells this becomes O(n^2). Use a set for visited (or dedupe in the SQL query) to keep membership checks O(1).
    created_count = 0
    visited = []
    for chunk in chunk_by_size(transferred_wells, 100):
        objs = []
        for row in chunk.itertuples():
            well = row.Thing
            contact = row.Contact
            if well.id in visited:
                continue

            visited.append(well.id)

Comment on lines +186 to +206
group_globalids = [
str(global_id)
for global_id in group["GlobalID"].tolist()
if pd.notna(global_id)
]
existing_globalids: set[str] = set()
if group_globalids:
existing_globalids.update(
global_id
for (global_id,) in session.query(Sample.nma_pk_waterlevels)
.filter(Sample.nma_pk_waterlevels.in_(group_globalids))
.all()
if global_id
)
existing_globalids.update(
global_id
for (global_id,) in session.query(Observation.nma_pk_waterlevels)
.filter(Observation.nma_pk_waterlevels.in_(group_globalids))
.all()
if global_id
)

Copilot AI Apr 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaterLevelsTransferer now issues up to 2 DB queries per PointID group to check existing GlobalIDs (Sample + Observation). In bulk runs with many PointIDs this becomes an N+1 pattern and can substantially slow transfers. Consider preloading all existing nma_pk_waterlevels for the entire cleaned_df GlobalID set in one (or two) queries before the group loop (or using a UNION) and then doing in-memory membership checks per row/group.

Copilot uses AI. Check for mistakes.
Comment on lines 105 to +114
self._thing_id_by_pointid = {
name: thing_id
for name, thing_id in session.query(Thing.name, Thing.id).all()
}
self._created_contact_id_by_key = {
(name, organization): contact_id
for name, organization, contact_id in session.query(
Contact.name, Contact.organization, Contact.id
).all()
}

Copilot AI Apr 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_caches() now loads all Contact rows into memory (name, organization -> id). This can be expensive for bulk transfers and may significantly increase startup time/memory usage. If the cache is only needed for scoped reruns/duplicate handling, consider limiting this query to the scoped PointIDs (or lazily resolving only the missing (name, organization) keys encountered during processing).

Copilot uses AI. Check for mistakes.
Comment on lines +74 to +83
existing_permissions = {
(target_id, contact_id, permission_type)
for target_id, contact_id, permission_type in session.query(
PermissionHistory.target_id,
PermissionHistory.contact_id,
PermissionHistory.permission_type,
)
.filter(PermissionHistory.target_table == "thing")
.all()
}

Copilot AI Apr 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transfer_permissions builds existing_permissions by loading all PermissionHistory rows for target_table='thing' into a Python set. On large datasets this can be a major memory/time hit. Since this is only used to prevent duplicates for the wells being processed, consider filtering the query to the relevant target_ids (and permission_types) for this run (e.g., the well ids from transferred_wells_query).

Suggested change
existing_permissions = {
(target_id, contact_id, permission_type)
for target_id, contact_id, permission_type in session.query(
PermissionHistory.target_id,
PermissionHistory.contact_id,
PermissionHistory.permission_type,
)
.filter(PermissionHistory.target_table == "thing")
.all()
}
permission_types = {"Water Chemistry Sample", "Water Level Sample"}
relevant_well_ids = {well.id for well, _ in transferred_wells}
if relevant_well_ids:
existing_permissions = {
(target_id, contact_id, permission_type)
for target_id, contact_id, permission_type in session.query(
PermissionHistory.target_id,
PermissionHistory.contact_id,
PermissionHistory.permission_type,
)
.filter(PermissionHistory.target_table == "thing")
.filter(PermissionHistory.target_id.in_(relevant_well_ids))
.filter(PermissionHistory.permission_type.in_(permission_types))
.all()
}
else:
existing_permissions = set()

Copilot uses AI. Check for mistakes.
Comment on lines +57 to +60
normalized_pointids = ldf["PointID"].map(
lambda value: str(value).strip().upper()
)
ldf = ldf[normalized_pointids.isin(set(pointids))]

Copilot AI Apr 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scoped filtering normalizes the Location df PointID values to uppercase, but compares against the raw pointids list. If callers pass lowercase/mixed-case PointIDs, the filter will unexpectedly drop matches. Normalize the incoming pointids (e.g., via Transferer._normalize_pointids / _normalize_pointid) before building the comparison set so matching is case-insensitive and consistent with the rest of the scoped transfer pipeline.

Suggested change
normalized_pointids = ldf["PointID"].map(
lambda value: str(value).strip().upper()
)
ldf = ldf[normalized_pointids.isin(set(pointids))]
normalized_requested_pointids = {
str(value).strip().upper() for value in pointids
}
normalized_pointids = ldf["PointID"].map(
lambda value: str(value).strip().upper()
)
ldf = ldf[normalized_pointids.isin(normalized_requested_pointids)]

Copilot uses AI. Check for mistakes.

@jirhiker jirhiker left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change this into a cli function and not touch any of the existing transfer logic.
This should alway you to greatly simplify the logic as no parallel processing really necessary.

At this point we should never have to run a bulk wholesale transfer from NM_Aquifer again

@ksmuczynski

ksmuczynski commented Apr 8, 2026

Copy link
Copy Markdown
Contributor Author

I like the idea of a separate CLI path for scoped transfers. It would definitely simplify the operator path for targeted runs.

That said, I don’t think a separate CLI alone would have been sufficient here. The main issues I found were not just entrypoint/orchestration issues, but transferer-level correctness issues. For example:

  • some transferers ignored pointids
  • some child transfers needed parent-based scoping, not just direct PointID filtering
  • some create-only paths were not idempotent on rerun

So even with a separate CLI, we still would have needed the lower-level transfer logic changes to make scoped runs behave correctly.

What do you think about keeping this PR focused on the underlying transferer correctness fixes, and then opening a follow-up PR to add a dedicated scoped-transfer CLI/entrypoint on top of that? @jirhiker

@jacob-a-brown jacob-a-brown left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks quite good! I mostly made style comments that I think would make the code easier to read and maintain.

.join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id)
.filter(AssetThingAssociation.thing_id == db_item.id)
.all()
if name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be updated to if name is not None (if that's the intended behavior (would it be if len(name) > 1?). There may be some edge cases where name is "" or 0, which may be valid, but with falsiness/truthiness if name will evaluate to False

.join(AssetThingAssociation, AssetThingAssociation.asset_id == Asset.id)
.filter(AssetThingAssociation.thing_id == db_item.id)
.all()
if storage_path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment: use if storage_path is not None (if that's the intended behavior). I think it's better to be explicit, particularly when checking different conditions

Comment on lines +79 to +82
if self.is_scoped_run():
results = results.join(
Thing, Thing.id == LocationThingAssociation.thing_id
).filter(Thing.name.in_(self.pointids))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using an if-else block since results may be overwritten if self.is_scoped_run() is True. It's a bit harder to follow otherwise

if self.is_scored_run():
    results_query = query
else:
    results_query = different query
results = results_query.all()

Comment on lines +50 to +51
if self.is_scoped_run():
sql = sql.where(Thing.name.in_(self.pointids))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Id' recommend using an if-else block to make it easier to follow since sql may be overwritten if self.is_scoped_run() is True

if self.is_scoped_run():
    sql = query...
else:
    sql = a different query...
records = session.scalars(sql).unique().all()

Comment on lines +86 to +90
if record.id not in existing_thing_ids:
gta = GroupThingAssociation(group=group, thing=record)
session.add(gta)
group.thing_associations.append(gta)
existing_thing_ids.add(record.id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log that another GroupThingAssociation is being created?

Comment on lines +68 to +75
query = session.query(
NMA_Chemistry_SampleInfo.nma_sample_pt_id,
NMA_Chemistry_SampleInfo.id,
).filter(NMA_Chemistry_SampleInfo.nma_sample_pt_id.isnot(None))
if self.is_scoped_run():
query = query.join(
Thing, Thing.id == NMA_Chemistry_SampleInfo.thing_id
).filter(Thing.name.in_(self.pointids))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above: consider using an if-else block here for clarity

Comment on lines 96 to +130
permission = _make_permission(
wdf, well, contact.id, "SampleOK", "Water Chemistry Sample"
)
if permission:
if (
permission
and (
well.id,
contact.id,
permission.permission_type,
)
not in existing_permissions
):
objs.append(permission)
created_count += 1
existing_permissions.add(
(well.id, contact.id, permission.permission_type)
)

permission = _make_permission(
wdf, well, contact.id, "MonitorOK", "Water Level Sample"
)
if permission:
if (
permission
and (
well.id,
contact.id,
permission.permission_type,
)
not in existing_permissions
):
objs.append(permission)
created_count += 1
existing_permissions.add(
(well.id, contact.id, permission.permission_type)
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeated code with different variables when calling _make_permission. Consider putting it in a for loop for clarity and ease of maintenance:

for permission_tuple in (("MonitorOK", "Water Level Sample"), ("SampleOK", "Water Chemistry Sample")):
    nma_field = permission_tuple[0]
    permission_type = permission_tuple[1]

    permission = _make_permission(wdf, well, contact.id, nma_field, permission_type)
    if (
        permission
        and (
            well.id,
            contact.id,
            permission.permission_type,
         )
        not in existing_permissions
    ):
        objs.append(permission)
        created_count += 1
        existing_permissions.add((well.id, contact.id, permission.permission_type))

Comment thread transfers/transfer.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of new functions defined in this file. I'd recommend adding a docstring to each of them to make them easier to understand and use (for now and the future).

Comment thread transfers/transferer.py
Comment on lines +68 to +69
normalized: list[str] = []
seen: set[str] = set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An aside, I really like that you are typing your variables 🔥 . I want to do that whenever possible moving forward, I think it's good practice

Comment on lines +176 to +186
location_query = session.query(Location)
if normalized_pointids:
location_query = (
location_query.join(
LocationThingAssociation,
LocationThingAssociation.location_id == Location.id,
)
.join(Thing, Thing.id == LocationThingAssociation.thing_id)
.filter(Thing.name.in_(normalized_pointids))
.distinct()
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like above consider using an if-else block here for clarity

@jirhiker

jirhiker commented Apr 9, 2026

Copy link
Copy Markdown
Member

@ksmuczynski i don't want any changes to the transferer logic. Lets consider it deprecated.

The CLI will need to implement the correct transfer logic for scoped point id transfer

@ksmuczynski

Copy link
Copy Markdown
Contributor Author

Closed. Do not build on kas-transfer-select-pointids-and-data and do not extend the deprecated transferer execution path in transfers/transfer.py.

A new PR will be opened to implement a new CLI command for targeted legacy PointID transfer.

Use kas-transfer-select-pointids-and-data only as a behavioral reference for scoping rules, rerun/idempotency rules, and test scenarios.

@ksmuczynski ksmuczynski closed this Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants