NO TICKET fix(importers): prevent duplicate well-name collisions in well inventory and water-level imports#633
Conversation
…orts Preserve well inventory re-import idempotency while blocking creation of a new water well when the same `Thing.name` already exists in the database. Also keep the water-level importer defensive against ambiguous well lookups so duplicate well names produce row-level validation errors instead of crashing the CLI with `MultipleResultsFound`. This reduces the risk of creating duplicate `Thing` records and makes import failures clearer for operators.
There was a problem hiding this comment.
Pull request overview
Hardens the well inventory and water-level CSV import flows against duplicate Thing.name collisions for water well records, preventing silent duplicate creation and ensuring ambiguous well lookups fail as row-level validation errors instead of crashing.
Changes:
- Well inventory import: after existing “re-import/idempotency” detection, block creation when a
water wellwith the sameThing.namealready exists elsewhere in the DB. - Water-level import: catch
MultipleResultsFoundduring well resolution and surface a row-level validation error instead of aborting. - Adds regression tests covering both the “existing DB well name” inventory case and the “duplicate name matches” water-level case.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
services/well_inventory_csv.py |
Adds DB-wide duplicate-name guard and maps the resulting row error to well_name_point_id. |
services/water_level_csv.py |
Catches MultipleResultsFound from ambiguous well-name lookups and records a row-level validation error. |
tests/test_well_inventory.py |
Adds coverage asserting inventory import fails when the well name already exists in the DB. |
tests/test_water_level_csv_service.py |
Adds coverage ensuring water-level import reports duplicate well-name matches instead of crashing. |
|
|
||
| with session_ctx() as session: | ||
| session.add(Thing(name=row["well_name_point_id"], thing_type="water well")) | ||
| session.commit() | ||
|
|
||
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | ||
| with file_path.open("w", encoding="utf-8", newline="") as f: | ||
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | ||
| writer.writeheader() | ||
| writer.writerow(row) | ||
|
|
||
| result = well_inventory_csv(file_path) | ||
|
|
||
| assert result.exit_code == 1, result.stderr | ||
| errors = result.payload.get("validation_errors", []) | ||
| assert errors | ||
| assert errors[0]["field"] == "well_name_point_id" | ||
| assert ( | ||
| errors[0]["error"] | ||
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | ||
| ) | ||
|
|
||
| with session_ctx() as session: | ||
| things = ( | ||
| session.query(Thing) | ||
| .filter( | ||
| Thing.name == row["well_name_point_id"], | ||
| Thing.thing_type == "water well", | ||
| ) | ||
| .all() | ||
| ) | ||
| assert len(things) == 1 |
There was a problem hiding this comment.
This test inserts a Thing into the shared session-scoped test database but never deletes it. Because the test suite shares DB state across tests (see tests/conftest.py session fixture), leaving this row behind can make other well-inventory tests flaky by causing unexpected name collisions. Clean up the created Thing (e.g., capture its id and delete it in a finally, or use a fixture that yields and deletes).
| with session_ctx() as session: | |
| session.add(Thing(name=row["well_name_point_id"], thing_type="water well")) | |
| session.commit() | |
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | |
| with file_path.open("w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | |
| writer.writeheader() | |
| writer.writerow(row) | |
| result = well_inventory_csv(file_path) | |
| assert result.exit_code == 1, result.stderr | |
| errors = result.payload.get("validation_errors", []) | |
| assert errors | |
| assert errors[0]["field"] == "well_name_point_id" | |
| assert ( | |
| errors[0]["error"] | |
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | |
| ) | |
| with session_ctx() as session: | |
| things = ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == row["well_name_point_id"], | |
| Thing.thing_type == "water well", | |
| ) | |
| .all() | |
| ) | |
| assert len(things) == 1 | |
| thing_name = row["well_name_point_id"] | |
| try: | |
| with session_ctx() as session: | |
| session.add(Thing(name=thing_name, thing_type="water well")) | |
| session.commit() | |
| file_path = tmp_path / "well-inventory-existing-db-well.csv" | |
| with file_path.open("w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(row.keys())) | |
| writer.writeheader() | |
| writer.writerow(row) | |
| result = well_inventory_csv(file_path) | |
| assert result.exit_code == 1, result.stderr | |
| errors = result.payload.get("validation_errors", []) | |
| assert errors | |
| assert errors[0]["field"] == "well_name_point_id" | |
| assert ( | |
| errors[0]["error"] | |
| == "Well already exists in database for well_name_point_id 'TEST-0001'" | |
| ) | |
| with session_ctx() as session: | |
| things = ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == thing_name, | |
| Thing.thing_type == "water well", | |
| ) | |
| .all() | |
| ) | |
| assert len(things) == 1 | |
| finally: | |
| # Clean up the Thing created for this test to avoid leaking state | |
| with session_ctx() as session: | |
| ( | |
| session.query(Thing) | |
| .filter( | |
| Thing.name == thing_name, | |
| Thing.thing_type == "water well", | |
| ) | |
| .delete(synchronize_session=False) | |
| ) | |
| session.commit() |
| if error_text.startswith( | ||
| "Well already exists in database for well_name_point_id " | ||
| ): | ||
| field = "well_name_point_id" | ||
| else: | ||
| field = _extract_field_from_value_error(error_text) |
There was a problem hiding this comment.
Mapping the validation field by checking error_text.startswith(...) is brittle and couples control flow to an exact error-message string. Consider raising a structured/typed exception (or returning a structured error) that carries the target field explicitly, so future message wording changes don’t silently regress field attribution.
| if error_text.startswith( | |
| "Well already exists in database for well_name_point_id " | |
| ): | |
| field = "well_name_point_id" | |
| else: | |
| field = _extract_field_from_value_error(error_text) | |
| field = _extract_field_from_value_error(error_text) |
jirhiker
left a comment
There was a problem hiding this comment.
both of changes seem related to querying the db to check for an existing well. this should be a shared function
|
There are two related checks here:
I think only the second one should be shared.
Example for
That is different from this case:
The overlap between these changes is the generic Example for the generic lookup in water-level import:
Example for the generic lookup in well-inventory import:
So I agree the generic water-well lookup should be shared, but I would keep I can make that refactor in a follow-up commit. |
|
@jirhiker Okay, I added a helper
So the shared query is centralized, but the two importer-specific behaviors remain separate. |
Extract the generic water-well lookup by `Thing.name` into a shared helper and reuse it in the well inventory and water-level CSV importers. Keep importer-specific behavior separate: - water-level import still distinguishes unknown vs ambiguous matches - well-inventory import still applies its own re-import detection before enforcing the duplicate-name check
| if error_text.startswith( | ||
| "Well already exists in database for well_name_point_id " |
There was a problem hiding this comment.
Since this string is used in multiple places I would recommend creating a global constant at the beginning and then inserting it into f-strings wherever it is used. This way you don't have to worry about having the text correspond between its different invocations and can easily change it in the future. Something like
WELL_EXISTS_SUBSTRING = "Well already in database for well_name_point_id"
...
...
...
if error_text.startswith(WELL_EXISTS_SUBSTRING)
...
...
raise ValueError(f"{WELL_EXISTS_SUBSTRING} `{model.well_name_point_id}`")| ) | ||
|
|
||
|
|
||
| def test_bulk_upload_water_levels_reports_duplicate_well_name_matches(): |
There was a problem hiding this comment.
This test raises what appears to be an omission in the DB. Shouldn't names be unique? That would prevent this error from even occuring. There's even a TODO above the name column:
# TODO: should name be unique?
Summary
This PR hardens the CSV importers around duplicate water well names.
It updates the well inventory importer so it:
water wellwith the sameThing.namealready exists elsewhere in the databaseThingrecordsIt also keeps the water-level importer defensive so ambiguous well lookups no longer crash the CLI and instead return a row-level validation error.
Why
This PR addresses the following problem / context:
I found that duplicate
Thing.namerecords forwater wellcould exist in the database, and at least one duplicate appears to have been introduced by the well inventory import path.That caused two issues:
Thingwith the same well nameMultipleResultsFoundwhen trying to resolve a well by nameHow
Implementation summary - the following was changed / added / removed:
well_name_point_idvalidation error when the well already existsWhy This Approach
The well inventory importer still needs to support safe reruns of the same import, so the existing
_find_existing_imported_well()logic is evaluated first.Only if the row does not match a prior import do we check whether a well with the same name already exists in the database and block creation.
That gives us:
ThingcreationNotes
Any special considerations, workarounds, or follow-up work to note?
Thing.namerows already in the database. Any pre-existing duplicates should be identified and resolved separately.