Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion services/thing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
from datetime import datetime
import logging
import time
from datetime import datetime
from typing import Sequence
from zoneinfo import ZoneInfo

from fastapi import Request, HTTPException
Expand Down Expand Up @@ -78,6 +79,26 @@ def is_debug_timing_enabled() -> bool:
WATER_WELL_THING_TYPE = "water well"


def find_water_wells_by_name(
session: Session,
name: str,
*,
options: Sequence | None = None,
) -> list[Thing]:
sql = (
select(Thing)
.where(
Thing.name == name,
Thing.thing_type == WATER_WELL_THING_TYPE,
)
.order_by(Thing.id.asc())
)
if options:
sql = sql.options(*options)

return session.scalars(sql).all()


def wkb_to_geojson(wkb_element):
if wkb_element is None:
return None
Expand Down
20 changes: 12 additions & 8 deletions services/water_level_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from services.thing_helper import find_water_wells_by_name

REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS)
HEADER_ALIASES: dict[str, str] = dict(WATER_LEVEL_HEADER_ALIASES)
Expand Down Expand Up @@ -251,15 +252,18 @@ def _validate_rows(
well_name = model.well_name_point_id
well = wells_by_name.get(well_name)
if well is None:
sql = (
select(Thing)
.options(selectinload(Thing.measuring_points))
.where(
Thing.name == well_name,
Thing.thing_type == "water well",
)
matches = find_water_wells_by_name(
session,
well_name,
options=(selectinload(Thing.measuring_points),),
)
well = session.scalars(sql).one_or_none()
if len(matches) > 1:
errors.append(
f"Row {idx}: Multiple wells found for well_name_point_id "
f"'{well_name}'"
)
continue
well = matches[0] if matches else None
if well is None:
errors.append(f"Row {idx}: Unknown well_name_point_id '{well_name}'")
continue
Expand Down
15 changes: 13 additions & 2 deletions services/well_inventory_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from schemas.well_inventory import WellInventoryRow
from services.contact_helper import add_contact
from services.exceptions_helper import PydanticStyleException
from services.thing_helper import add_thing
from services.thing_helper import add_thing, find_water_wells_by_name
from services.util import transform_srid, convert_ft_to_m

AUTOGEN_DEFAULT_PREFIX = "NM-"
Expand Down Expand Up @@ -280,7 +280,12 @@ class dialect:
field = "Database error"
else:
error_text = str(e)
field = _extract_field_from_value_error(error_text)
if error_text.startswith(
"Well already exists in database for well_name_point_id "
Comment on lines +283 to +284

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this string is used in multiple places I would recommend creating a global constant at the beginning and then inserting it into f-strings wherever it is used. This way you don't have to worry about having the text correspond between its different invocations and can easily change it in the future. Something like

WELL_EXISTS_SUBSTRING = "Well already in database for well_name_point_id"

...
...
...
if error_text.startswith(WELL_EXISTS_SUBSTRING)
...
...
raise ValueError(f"{WELL_EXISTS_SUBSTRING} `{model.well_name_point_id}`")

):
field = "well_name_point_id"
else:
field = _extract_field_from_value_error(error_text)
Comment on lines +283 to +288

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping the validation field by checking error_text.startswith(...) is brittle and couples control flow to an exact error-message string. Consider raising a structured/typed exception (or returning a structured error) that carries the target field explicitly, so future message wording changes don’t silently regress field attribution.

Suggested change
if error_text.startswith(
"Well already exists in database for well_name_point_id "
):
field = "well_name_point_id"
else:
field = _extract_field_from_value_error(error_text)
field = _extract_field_from_value_error(error_text)

Copilot uses AI. Check for mistakes.

logging.error(
f"Error while importing row {row_number} ('{current_row_id}'): {error_text}"
Expand Down Expand Up @@ -666,6 +671,12 @@ def _add_csv_row(session: Session, group: Group, model: WellInventoryRow, user)
if existing_well is not None:
return existing_well.name

existing_named_wells = find_water_wells_by_name(session, model.well_name_point_id)
if existing_named_wells:
raise ValueError(
f"Well already exists in database for well_name_point_id '{model.well_name_point_id}'"
)

# --------------------
# Location and associated tables
# --------------------
Expand Down
62 changes: 62 additions & 0 deletions tests/test_water_level_csv_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,65 @@ def test_bulk_upload_water_levels_imports_valid_rows_when_other_rows_fail(
"Unknown well_name_point_id 'Unknown Well'" in message
for message in result.payload["validation_errors"]
)


def test_bulk_upload_water_levels_reports_duplicate_well_name_matches():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test raises what appears to be an omission in the DB. Shouldn't names be unique? That would prevent this error from even occuring. There's even a TODO above the name column:

# TODO: should name be unique?

with session_ctx() as session:
well_one = Thing(name="Duplicate Well", thing_type="water well")
well_two = Thing(name="Duplicate Well", thing_type="water well")
session.add_all([well_one, well_two])
session.commit()
well_one_id = well_one.id
well_two_id = well_two.id

csv_content = "\n".join(
[
",".join(
[
"field_staff",
"well_name_point_id",
"field_event_date_time",
"measurement_date_time",
"sampler",
"sample_method",
"mp_height",
"level_status",
"depth_to_water_ft",
"data_quality",
"water_level_notes",
]
),
",".join(
[
"A Lopez",
"Duplicate Well",
"2025-02-15T08:00:00-07:00",
"2025-02-15T10:30:00-07:00",
"A Lopez",
"electric tape",
"1.5",
"Water level not affected",
"7.0",
"Water level accurate to within two hundreths of a foot",
"Initial measurement",
]
),
]
)

try:
result = bulk_upload_water_levels(csv_content.encode("utf-8"))

assert result.exit_code == 1
assert result.payload["summary"]["total_rows_processed"] == 1
assert result.payload["summary"]["total_rows_imported"] == 0
assert result.payload["validation_errors"] == [
"Row 1: Multiple wells found for well_name_point_id 'Duplicate Well'"
]
finally:
with session_ctx() as session:
for well_id in (well_one_id, well_two_id):
well = session.get(Thing, well_id)
if well is not None:
session.delete(well)
session.commit()
36 changes: 36 additions & 0 deletions tests/test_well_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,42 @@ def test_upload_duplicate_well_ids(self):
errors = result.payload.get("validation_errors", [])
assert any("Duplicate" in str(e) for e in errors)

def test_upload_fails_when_well_name_already_exists_in_database(self, tmp_path):
"""Upload fails when a water well with the same Thing.name already exists."""
row = _minimal_valid_well_inventory_row()

with session_ctx() as session:
session.add(Thing(name=row["well_name_point_id"], thing_type="water well"))
session.commit()

file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)

result = well_inventory_csv(file_path)

assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)

with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == row["well_name_point_id"],
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
Comment on lines +839 to +870

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test inserts a Thing into the shared session-scoped test database but never deletes it. Because the test suite shares DB state across tests (see tests/conftest.py session fixture), leaving this row behind can make other well-inventory tests flaky by causing unexpected name collisions. Clean up the created Thing (e.g., capture its id and delete it in a finally, or use a fixture that yields and deletes).

Suggested change
with session_ctx() as session:
session.add(Thing(name=row["well_name_point_id"], thing_type="water well"))
session.commit()
file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)
result = well_inventory_csv(file_path)
assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)
with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == row["well_name_point_id"],
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
thing_name = row["well_name_point_id"]
try:
with session_ctx() as session:
session.add(Thing(name=thing_name, thing_type="water well"))
session.commit()
file_path = tmp_path / "well-inventory-existing-db-well.csv"
with file_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(row.keys()))
writer.writeheader()
writer.writerow(row)
result = well_inventory_csv(file_path)
assert result.exit_code == 1, result.stderr
errors = result.payload.get("validation_errors", [])
assert errors
assert errors[0]["field"] == "well_name_point_id"
assert (
errors[0]["error"]
== "Well already exists in database for well_name_point_id 'TEST-0001'"
)
with session_ctx() as session:
things = (
session.query(Thing)
.filter(
Thing.name == thing_name,
Thing.thing_type == "water well",
)
.all()
)
assert len(things) == 1
finally:
# Clean up the Thing created for this test to avoid leaking state
with session_ctx() as session:
(
session.query(Thing)
.filter(
Thing.name == thing_name,
Thing.thing_type == "water well",
)
.delete(synchronize_session=False)
)
session.commit()

Copilot uses AI. Check for mistakes.

def test_upload_blank_well_name_point_id_autogenerates(self, tmp_path):
"""Upload succeeds when well_name_point_id is blank and auto-generates IDs."""
source_path = Path("tests/features/data/well-inventory-valid.csv")
Expand Down
Loading