From 9ab9846a0e3ab8b24021bd0ec67d19f3d5ea82c2 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 15 Apr 2026 16:32:31 -0600 Subject: [PATCH 1/8] feat(water-level-importer): add support for field event participants and sampler validation - create or reuse field event participants for imported staff columns - link imported samples to the measuring participant - fail a row when measuring_person does not resolve to exactly one participant --- services/water_level_csv.py | 124 ++++++++++++- tests/test_water_level_csv_service.py | 253 +++++++++++++++++++++++++- 2 files changed, 375 insertions(+), 2 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 9faa5af2..e694f919 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -24,7 +24,16 @@ from pathlib import Path from typing import Any, BinaryIO, Iterable, List -from db import Thing, FieldEvent, FieldActivity, Sample, Observation, Parameter +from db import ( + Thing, + FieldEvent, + FieldActivity, + Sample, + Observation, + Parameter, + Contact, + FieldEventParticipant, +) from db.engine import session_ctx from pydantic import ValidationError from schemas.water_level_csv import ( @@ -35,6 +44,7 @@ ) from sqlalchemy import select from sqlalchemy.orm import Session, selectinload +from services.contact_helper import add_contact from services.thing_helper import find_water_wells_by_name REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) @@ -56,6 +66,8 @@ class _ValidatedRow: raw: dict[str, str] well: Thing field_staff: str + field_staff_2: str | None + field_staff_3: str | None sampler: str sample_method_term: str field_event_dt: datetime @@ -291,6 +303,8 @@ def _validate_rows( raw={**normalized}, well=well, field_staff=model.field_staff, + field_staff_2=model.field_staff_2, + field_staff_3=model.field_staff_3, sampler=model.measuring_person, sample_method_term=model.sample_method, field_event_dt=model.field_event_date_time, @@ -352,6 +366,7 @@ def _validate_depth_to_water_against_well( def _create_records( session: Session, parameter_id: int, rows: list[_ValidatedRow] ) -> tuple[list[dict[str, Any]], list[str]]: + """Create or update field-event, sample, and observation rows for each CSV row.""" created: list[dict[str, Any]] = [] errors: list[str] = [] @@ -393,6 +408,12 @@ def _create_records( _apply_sample_values(sample, row, sample_name) _apply_observation_values(observation, row, parameter_id) + # Add participants after required sample/observation fields are populated + # so the contact lookup does not trigger an autoflush of incomplete rows. + participants = _ensure_field_event_participants(session, field_event, row) + sample.field_event_participant = _resolve_measuring_participant( + row, participants + ) session.flush() savepoint.commit() @@ -427,12 +448,14 @@ def _create_records( def _build_sample_name(row: _ValidatedRow) -> str: + """Build the deterministic sample identifier used for create/update matching.""" return f"{row.well.name}-WL-{row.measurement_dt.strftime('%Y%m%d%H%M')}" def _find_existing_imported_sample( session: Session, row: _ValidatedRow, sample_name: str ) -> Sample | None: + """Return the previously imported groundwater-level sample for this row, if any.""" sql = ( select(Sample) .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) @@ -454,13 +477,111 @@ def _find_existing_imported_sample( def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation | None: + """Return the groundwater-level observation already linked to the sample, if any.""" for observation in sample.observations: if observation.parameter_id == parameter_id: return observation return None +def _ensure_field_event_participants( + session: Session, field_event: FieldEvent, row: _ValidatedRow +) -> list[FieldEventParticipant]: + """Return event participants for imported staff names, creating any missing ones.""" + participant_specs = ( + (row.field_staff, "Lead"), + (row.field_staff_2, "Participant"), + (row.field_staff_3, "Participant"), + ) + existing_participants = session.scalars( + select(FieldEventParticipant) + .options(selectinload(FieldEventParticipant.participant)) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + + for staff_name, role in participant_specs: + if not staff_name: + continue + + contact = _get_or_create_field_staff_contact(session, staff_name) + participant = next( + ( + existing + for existing in existing_participants + if existing.contact_id == contact.id + and existing.participant_role == role + ), + None, + ) + if participant is None: + participant = FieldEventParticipant( + field_event=field_event, + contact_id=contact.id, + participant_role=role, + ) + session.add(participant) + # Attach the resolved contact eagerly so downstream matching can use + # participant.participant.name without an extra lookup. + participant.participant = contact + existing_participants.append(participant) + + return existing_participants + + +def _get_or_create_field_staff_contact(session: Session, staff_name: str) -> Contact: + """Resolve or create the contact record used by field event participants.""" + contact_type = "Field Event Participant" + organization = "NMBGMR" + contact = session.scalars( + select(Contact) + .where(Contact.name == staff_name) + .where(Contact.organization == organization) + .where(Contact.contact_type == contact_type) + ).first() + + if contact is None: + payload = { + "name": staff_name, + "role": "Technician", + "organization": organization, + "contact_type": contact_type, + } + contact = add_contact(session, payload, None, commit=False) + + return contact + + +def _resolve_measuring_participant( + row: _ValidatedRow, participants: list[FieldEventParticipant] +) -> FieldEventParticipant: + """Return the unique participant matching measuring_person or raise a row error.""" + matching_participants = [ + participant + for participant in participants + if participant.participant is not None + and participant.participant.name == row.sampler + ] + if len(matching_participants) == 1: + return matching_participants[0] + + if not matching_participants: + raise ValueError( + "measuring_person " + f"'{row.sampler}' could not be matched to a field event participant" + ) + + raise ValueError( + "measuring_person " + f"'{row.sampler}' matched multiple field event participants; " + # Ambiguous staff rows should fail so the importer never guesses which + # participant performed the measurement. + "field_staff values must identify exactly one measuring person" + ) + + def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None: + """Apply normalized sample values from the validated CSV row.""" sample.sample_date = row.measurement_dt sample.sample_name = sample_name sample.sample_matrix = "groundwater" @@ -472,6 +593,7 @@ def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) - def _apply_observation_values( observation: Observation, row: _ValidatedRow, parameter_id: int ) -> None: + """Apply normalized observation values from the validated CSV row.""" observation.observation_datetime = row.measurement_dt observation.parameter_id = parameter_id observation.value = row.depth_to_water_ft diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index e4b01d9c..03b2d138 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -2,7 +2,15 @@ from decimal import Decimal from types import SimpleNamespace -from db import FieldActivity, FieldEvent, Observation, Sample, Thing +from db import ( + Contact, + FieldActivity, + FieldEvent, + FieldEventParticipant, + Observation, + Sample, + Thing, +) from db.measuring_point_history import MeasuringPointHistory from db.engine import session_ctx from tests import get_parameter_id @@ -218,6 +226,8 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert len(observations) == 1 assert samples[0].sample_name == "Test Well-WL-202502151730" assert samples[0].sample_matrix == "groundwater" + assert samples[0].field_event_participant is not None + assert samples[0].field_event_participant.participant.name == "A Lopez" assert observations[0].groundwater_level_reason == "Water level not affected" assert ( observations[0].nma_data_quality @@ -226,6 +236,247 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert observations[0].measuring_point_height == 1.5 +def test_bulk_upload_water_levels_creates_field_event_participants(water_well_thing): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "field_staff_3", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + "C Diaz", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + field_event = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).one() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + contacts = session.scalars( + select(Contact) + .where( + Contact.name.in_(["A Lopez", "B Chen", "C Diaz"]), + Contact.organization == "NMBGMR", + Contact.contact_type == "Field Event Participant", + ) + .order_by(Contact.name.asc()) + ).all() + + assert len(participants) == 3 + assert [participant.participant_role for participant in participants] == [ + "Lead", + "Participant", + "Participant", + ] + assert {participant.field_event_id for participant in participants} == { + field_event.id + } + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_event.id) + ).one() + assert sample.field_event_participant_id == participants[0].id + assert sample.field_event_participant.participant.name == "A Lopez" + + +def test_bulk_upload_water_levels_does_not_duplicate_field_event_participants_on_rerun( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + first = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert first.exit_code == 0, first.payload + + with session_ctx() as session: + field_event = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).one() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_event.id) + ).one() + + # Capture the exact participant/contact linkage from the first import so + # the rerun can prove the importer reused those records rather than + # creating replacements. + first_participant_ids = [participant.id for participant in participants] + first_contact_ids = [participant.contact_id for participant in participants] + first_sample_participant_id = sample.field_event_participant_id + + second = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert second.exit_code == 0, second.payload + + with session_ctx() as session: + field_events = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).all() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_events[0].id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_events[0].id) + ).one() + + assert len(field_events) == 1 + assert len(participants) == 2 + assert [participant.id for participant in participants] == first_participant_ids + assert [ + participant.contact_id for participant in participants + ] == first_contact_ids + assert sample.field_event_participant_id == first_sample_participant_id + assert sample.field_event_participant is not None + assert sample.field_event_participant.participant.name == "A Lopez" + + +def test_bulk_upload_water_levels_fails_when_measuring_person_is_ambiguous( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 1 + assert result.payload["summary"]["total_rows_imported"] == 0 + assert result.payload["validation_errors"] == [ + "Row 1: measuring_person 'A Lopez' matched multiple field event " + "participants; field_staff values must identify exactly one measuring " + "person" + ] + + with session_ctx() as session: + samples = session.scalars(select(Sample)).all() + participants = session.scalars(select(FieldEventParticipant)).all() + + assert samples == [] + assert participants == [] + + def test_bulk_upload_water_levels_warns_when_mp_height_differs_from_history( water_well_thing, ): From 35f9c9f402504ad7c6d8758434ac6c2c5c37cbf3 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 15 Apr 2026 16:36:25 -0600 Subject: [PATCH 2/8] refactor(water-level-importer): stop storing staff names in note fields - Keep `water_level_notes` as freeform notes - Stop duplicating field staff and sampler into field event/activity notes - Treat structured participants as the authoritative staff source --- services/water_level_csv.py | 15 ++++++++------- tests/test_cli_commands.py | 11 ++++++----- tests/test_water_level_csv_service.py | 8 ++++++++ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index e694f919..eab23a35 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -386,7 +386,9 @@ def _create_records( field_activity = FieldActivity( field_event=field_event, activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", + # Measuring staff now lives on structured participants and the + # sample participant link, not in field_activity.notes. + notes=None, ) sample = Sample(field_activity=field_activity) observation = Observation(sample=sample) @@ -404,7 +406,9 @@ def _create_records( field_event.event_date = row.field_event_dt field_event.notes = _build_field_event_notes(row) - field_activity.notes = f"Sampler: {row.sampler}" + # Clear any legacy sampler note text so downstream readers use + # structured participant data as the authoritative source. + field_activity.notes = None _apply_sample_values(sample, row, sample_name) _apply_observation_values(observation, row, parameter_id) @@ -605,11 +609,8 @@ def _apply_observation_values( def _build_field_event_notes(row: _ValidatedRow) -> str | None: - parts = [f"Field staff: {row.field_staff}"] - if row.water_level_notes: - parts.append(row.water_level_notes) - notes = " | ".join(part for part in parts if part) - return notes or None + """Return only freeform field-event notes; staff lives in structured participants.""" + return row.water_level_notes or None def _get_groundwater_level_parameter_id(session: Session) -> int: diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index a1d4515f..5059ff8a 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -696,10 +696,12 @@ def _write_csv(path: Path, *, well_name: str, notes: str): "Water level accurate to within two hundreths of a foot," f"{notes}" ) - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ {header} {row} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" @@ -742,9 +744,8 @@ def _write_csv(path: Path, *, well_name: str, notes: str): observation.nma_data_quality == "Water level accurate to within two hundreths of a foot" ) - assert ( - field_event.notes == f"Field staff: CLI Tester | {unique_notes}" - ), "Field event notes should capture field staff and notes" + assert field_event.notes == unique_notes + assert field_activity.notes is None created_ids = { "observation_id": observation.id, diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index 03b2d138..c0a499b9 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -310,6 +310,14 @@ def test_bulk_upload_water_levels_creates_field_event_participants(water_well_th assert {participant.field_event_id for participant in participants} == { field_event.id } + # Notes now carry only freeform text; staff identity should come from the + # structured participant records and the sample participant link. + assert field_event.notes == "Initial measurement" + assert len(contacts) == 3 + field_activity = session.scalars( + select(FieldActivity).where(FieldActivity.field_event_id == field_event.id) + ).one() + assert field_activity.notes is None sample = session.scalars( select(Sample) .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) From 2d61ecf620b488dc989056d14c1eb2e90574f783 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 15 Apr 2026 16:43:38 -0600 Subject: [PATCH 3/8] test(api): verify well details exposes imported water-level staff - Verify the well details payload exposes the measuring person on latest_field_event_sample.contact - Verify the latest field event participants list includes imported staff --- tests/test_thing.py | 58 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/test_thing.py b/tests/test_thing.py index 9201dfbe..fd9c86d9 100644 --- a/tests/test_thing.py +++ b/tests/test_thing.py @@ -32,6 +32,7 @@ from schemas import DT_FMT from schemas.location import LocationResponse from schemas.thing import UpdateWell, ValidateWell +from services.water_level_csv import bulk_upload_water_levels from tests import ( client, override_authentication, @@ -721,6 +722,63 @@ def test_get_water_well_details_payload_uses_latest_observation_sample( session.commit() +def test_get_water_well_details_payload_includes_imported_water_level_staff( + water_well_thing, +): + """Imported water-level rows should expose measuring staff via structured detail payload fields.""" + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Imported measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + # `/details` is the primary frontend payload for latest water-level staff. + response = client.get(f"/thing/water-well/{water_well_thing.id}/details") + + assert response.status_code == 200 + data = response.json() + assert data["latest_field_event_sample"]["contact"]["name"] == "A Lopez" + assert { + participant["participant"]["name"] + for participant in data["field_event_participants"] + } == {"A Lopez", "B Chen"} + + def test_get_water_well_details_payload_404_not_found(): response = client.get("/thing/water-well/999999/details") From 3b62751fade707c49e5467edfc0ec6a9b6a41e7c Mon Sep 17 00:00:00 2001 From: ksmuczynski <20096455+ksmuczynski@users.noreply.github.com> Date: Thu, 16 Apr 2026 02:14:38 +0000 Subject: [PATCH 4/8] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 5059ff8a..b9b28fa1 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -696,12 +696,10 @@ def _write_csv(path: Path, *, well_name: str, notes: str): "Water level accurate to within two hundreths of a foot," f"{notes}" ) - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ {header} {row} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" From 56c96a8615ccbd9f7e605624bb778036981386ce Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Wed, 15 Apr 2026 20:36:44 -0600 Subject: [PATCH 5/8] fix(water-level-importer): reuse contacts by unique name and organization `Contact` rows are unique on `name + organization`, so the water-level importer now uses that same key when resolving field staff contacts. This avoids missing an existing contact with a different contact_type and attempting a duplicate insert. Also adds regression coverage for reusing an existing same-name/same-organization contact during import. --- services/water_level_csv.py | 4 +- tests/test_water_level_csv_service.py | 69 +++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index eab23a35..abe239b0 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -537,11 +537,13 @@ def _get_or_create_field_staff_contact(session: Session, staff_name: str) -> Con """Resolve or create the contact record used by field event participants.""" contact_type = "Field Event Participant" organization = "NMBGMR" + # Contact uniqueness is enforced on (name, organization), so the lookup + # must use the same key to avoid missing an existing row with a different + # contact_type and attempting a duplicate insert. contact = session.scalars( select(Contact) .where(Contact.name == staff_name) .where(Contact.organization == organization) - .where(Contact.contact_type == contact_type) ).first() if contact is None: diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index c0a499b9..fb6fc233 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -485,6 +485,75 @@ def test_bulk_upload_water_levels_fails_when_measuring_person_is_ambiguous( assert participants == [] +def test_bulk_upload_water_levels_reuses_contact_with_same_name_and_organization( + water_well_thing, +): + staff_name = "Z Vega" + + with session_ctx() as session: + existing_contact = Contact( + name=staff_name, + organization="NMBGMR", + role="Technician", + contact_type="Primary", + ) + session.add(existing_contact) + session.commit() + existing_contact_id = existing_contact.id + + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + staff_name, + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + staff_name, + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + contacts = session.scalars( + select(Contact) + .where(Contact.name == staff_name) + .where(Contact.organization == "NMBGMR") + ).all() + participants = session.scalars(select(FieldEventParticipant)).all() + + assert len(contacts) == 1 + assert contacts[0].id == existing_contact_id + assert len(participants) == 1 + assert participants[0].contact_id == existing_contact_id + + def test_bulk_upload_water_levels_warns_when_mp_height_differs_from_history( water_well_thing, ): From 2eaa6b84a1b72e86e0065abf559bb8b9a7550534 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 16 Apr 2026 09:23:38 -0600 Subject: [PATCH 6/8] test(water-level-importer): clean up imported participant contacts in tests Water-level importer tests were leaving participant contacts behind in the shared test database, which caused test_get_contacts to fail during full-suite runs. Update fixture and CLI test cleanup so importer-created staff contacts are removed after tests finish, while keeping the contact API assertions strict. --- tests/conftest.py | 19 ++++++++++++++++++- tests/test_cli_commands.py | 28 +++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5818707b..9eb1afd1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,7 @@ from alembic import command from alembic.config import Config from dotenv import load_dotenv -from sqlalchemy import delete +from sqlalchemy import delete, select from sqlalchemy import inspect as sa_inspect from core.initializers import init_lexicon, init_parameter @@ -174,6 +174,10 @@ def second_location(): @pytest.fixture() def water_well_thing(location): with session_ctx() as session: + # Some importer tests create participant contacts as a side effect. Keep + # a baseline so teardown can remove only the contacts introduced while + # this fixture-owned well existed. + existing_contact_ids = set(session.scalars(select(Contact.id)).all()) water_well = Thing( name="Test Well", first_visit_date="2023-03-03", @@ -209,10 +213,23 @@ def water_well_thing(location): session.refresh(water_well) session.refresh(assoc) yield water_well + # Capture participant contacts before deleting the well, because the + # field event rows cascade away with the well and would no longer be + # queryable afterward. + imported_contact_ids = set( + session.scalars( + select(FieldEventParticipant.contact_id) + .join(FieldEvent) + .where(FieldEvent.thing_id == water_well.id) + ).all() + ) session.delete(water_well) session.delete(assoc) session.delete(measuring_point_history) session.commit() + for contact_id in imported_contact_ids - existing_contact_ids: + _delete_if_present(session, session.get(Contact, contact_id)) + session.commit() @pytest.fixture() diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index b9b28fa1..c47345a5 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -27,7 +27,14 @@ from cli.cli import cli from cli.service_adapter import WellInventoryResult -from db import FieldActivity, FieldEvent, Observation, Sample +from db import ( + Contact, + FieldActivity, + FieldEvent, + FieldEventParticipant, + Observation, + Sample, +) from db.engine import session_ctx @@ -696,10 +703,12 @@ def _write_csv(path: Path, *, well_name: str, notes: str): "Water level accurate to within two hundreths of a foot," f"{notes}" ) - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ {header} {row} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" @@ -755,6 +764,14 @@ def _write_csv(path: Path, *, well_name: str, notes: str): if created_ids: # Clean up committed rows so other tests see a pristine database. with session_ctx() as session: + # Collect participant contacts before deleting the field event so + # importer-created staff contacts do not leak into later tests. + participant_contact_ids = session.scalars( + select(FieldEventParticipant.contact_id).where( + FieldEventParticipant.field_event_id + == created_ids["field_event_id"] + ) + ).all() observation = session.get(Observation, created_ids["observation_id"]) sample = session.get(Sample, created_ids["sample_id"]) field_activity = session.get( @@ -774,6 +791,11 @@ def _write_csv(path: Path, *, well_name: str, notes: str): if field_event: session.delete(field_event) session.flush() + for contact_id in participant_contact_ids: + contact = session.get(Contact, contact_id) + if contact: + session.delete(contact) + session.flush() session.commit() From 8465a535cc329512d58a64c434f1ea11f2a1b937 Mon Sep 17 00:00:00 2001 From: ksmuczynski <20096455+ksmuczynski@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:24:09 +0000 Subject: [PATCH 7/8] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index c47345a5..bc9ff031 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -703,12 +703,10 @@ def _write_csv(path: Path, *, well_name: str, notes: str): "Water level accurate to within two hundreths of a foot," f"{notes}" ) - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ {header} {row} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" From 7b3c8ad497fd486934ae604ed41a585d6119dc02 Mon Sep 17 00:00:00 2001 From: Kelsey Smuczynski Date: Thu, 16 Apr 2026 15:07:24 -0600 Subject: [PATCH 8/8] refactor(water-level-importer): normalize staff columns before participant creation Normalize the fixed field_staff CSV columns into a single iterable shape after validation, so participant creation no longer hardcodes each slot in the importer. This keeps the input CSV/schema unchanged while making the participant logic easier to read and easier to extend later if the staff-field shape changes. --- services/water_level_csv.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/services/water_level_csv.py b/services/water_level_csv.py index abe239b0..a9f4198d 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -65,9 +65,7 @@ class _ValidatedRow: row_index: int raw: dict[str, str] well: Thing - field_staff: str - field_staff_2: str | None - field_staff_3: str | None + field_staff_entries: tuple[tuple[str, str], ...] sampler: str sample_method_term: str field_event_dt: datetime @@ -302,9 +300,7 @@ def _validate_rows( row_index=idx, raw={**normalized}, well=well, - field_staff=model.field_staff, - field_staff_2=model.field_staff_2, - field_staff_3=model.field_staff_3, + field_staff_entries=_normalize_field_staff_entries(model), sampler=model.measuring_person, sample_method_term=model.sample_method, field_event_dt=model.field_event_date_time, @@ -323,6 +319,20 @@ def _validate_rows( return valid_rows, errors +def _normalize_field_staff_entries( + model: WaterLevelCsvRow, +) -> tuple[tuple[str, str], ...]: + """Normalize fixed staff columns into an iterable participant list.""" + participant_specs = ( + (model.field_staff, "Lead"), + (model.field_staff_2, "Participant"), + (model.field_staff_3, "Participant"), + ) + return tuple( + (staff_name, role) for staff_name, role in participant_specs if staff_name + ) + + def _resolve_measuring_point_height( well: Thing, csv_mp_height: float | None ) -> tuple[float | int | None, float | int | None, bool]: @@ -492,11 +502,6 @@ def _ensure_field_event_participants( session: Session, field_event: FieldEvent, row: _ValidatedRow ) -> list[FieldEventParticipant]: """Return event participants for imported staff names, creating any missing ones.""" - participant_specs = ( - (row.field_staff, "Lead"), - (row.field_staff_2, "Participant"), - (row.field_staff_3, "Participant"), - ) existing_participants = session.scalars( select(FieldEventParticipant) .options(selectinload(FieldEventParticipant.participant)) @@ -504,10 +509,7 @@ def _ensure_field_event_participants( .order_by(FieldEventParticipant.id.asc()) ).all() - for staff_name, role in participant_specs: - if not staff_name: - continue - + for staff_name, role in row.field_staff_entries: contact = _get_or_create_field_staff_contact(session, staff_name) participant = next( (