diff --git a/services/water_level_csv.py b/services/water_level_csv.py index 9faa5af2..a9f4198d 100644 --- a/services/water_level_csv.py +++ b/services/water_level_csv.py @@ -24,7 +24,16 @@ from pathlib import Path from typing import Any, BinaryIO, Iterable, List -from db import Thing, FieldEvent, FieldActivity, Sample, Observation, Parameter +from db import ( + Thing, + FieldEvent, + FieldActivity, + Sample, + Observation, + Parameter, + Contact, + FieldEventParticipant, +) from db.engine import session_ctx from pydantic import ValidationError from schemas.water_level_csv import ( @@ -35,6 +44,7 @@ ) from sqlalchemy import select from sqlalchemy.orm import Session, selectinload +from services.contact_helper import add_contact from services.thing_helper import find_water_wells_by_name REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS) @@ -55,7 +65,7 @@ class _ValidatedRow: row_index: int raw: dict[str, str] well: Thing - field_staff: str + field_staff_entries: tuple[tuple[str, str], ...] sampler: str sample_method_term: str field_event_dt: datetime @@ -290,7 +300,7 @@ def _validate_rows( row_index=idx, raw={**normalized}, well=well, - field_staff=model.field_staff, + field_staff_entries=_normalize_field_staff_entries(model), sampler=model.measuring_person, sample_method_term=model.sample_method, field_event_dt=model.field_event_date_time, @@ -309,6 +319,20 @@ def _validate_rows( return valid_rows, errors +def _normalize_field_staff_entries( + model: WaterLevelCsvRow, +) -> tuple[tuple[str, str], ...]: + """Normalize fixed staff columns into an iterable participant list.""" + participant_specs = ( + (model.field_staff, "Lead"), + (model.field_staff_2, "Participant"), + (model.field_staff_3, "Participant"), + ) + return tuple( + (staff_name, role) for staff_name, role in participant_specs if staff_name + ) + + def _resolve_measuring_point_height( well: Thing, csv_mp_height: float | None ) -> tuple[float | int | None, float | int | None, bool]: @@ -352,6 +376,7 @@ def _validate_depth_to_water_against_well( def _create_records( session: Session, parameter_id: int, rows: list[_ValidatedRow] ) -> tuple[list[dict[str, Any]], list[str]]: + """Create or update field-event, sample, and observation rows for each CSV row.""" created: list[dict[str, Any]] = [] errors: list[str] = [] @@ -371,7 +396,9 @@ def _create_records( field_activity = FieldActivity( field_event=field_event, activity_type="groundwater level", - notes=f"Sampler: {row.sampler}", + # Measuring staff now lives on structured participants and the + # sample participant link, not in field_activity.notes. + notes=None, ) sample = Sample(field_activity=field_activity) observation = Observation(sample=sample) @@ -389,10 +416,18 @@ def _create_records( field_event.event_date = row.field_event_dt field_event.notes = _build_field_event_notes(row) - field_activity.notes = f"Sampler: {row.sampler}" + # Clear any legacy sampler note text so downstream readers use + # structured participant data as the authoritative source. + field_activity.notes = None _apply_sample_values(sample, row, sample_name) _apply_observation_values(observation, row, parameter_id) + # Add participants after required sample/observation fields are populated + # so the contact lookup does not trigger an autoflush of incomplete rows. + participants = _ensure_field_event_participants(session, field_event, row) + sample.field_event_participant = _resolve_measuring_participant( + row, participants + ) session.flush() savepoint.commit() @@ -427,12 +462,14 @@ def _create_records( def _build_sample_name(row: _ValidatedRow) -> str: + """Build the deterministic sample identifier used for create/update matching.""" return f"{row.well.name}-WL-{row.measurement_dt.strftime('%Y%m%d%H%M')}" def _find_existing_imported_sample( session: Session, row: _ValidatedRow, sample_name: str ) -> Sample | None: + """Return the previously imported groundwater-level sample for this row, if any.""" sql = ( select(Sample) .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) @@ -454,13 +491,105 @@ def _find_existing_imported_sample( def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation | None: + """Return the groundwater-level observation already linked to the sample, if any.""" for observation in sample.observations: if observation.parameter_id == parameter_id: return observation return None +def _ensure_field_event_participants( + session: Session, field_event: FieldEvent, row: _ValidatedRow +) -> list[FieldEventParticipant]: + """Return event participants for imported staff names, creating any missing ones.""" + existing_participants = session.scalars( + select(FieldEventParticipant) + .options(selectinload(FieldEventParticipant.participant)) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + + for staff_name, role in row.field_staff_entries: + contact = _get_or_create_field_staff_contact(session, staff_name) + participant = next( + ( + existing + for existing in existing_participants + if existing.contact_id == contact.id + and existing.participant_role == role + ), + None, + ) + if participant is None: + participant = FieldEventParticipant( + field_event=field_event, + contact_id=contact.id, + participant_role=role, + ) + session.add(participant) + # Attach the resolved contact eagerly so downstream matching can use + # participant.participant.name without an extra lookup. + participant.participant = contact + existing_participants.append(participant) + + return existing_participants + + +def _get_or_create_field_staff_contact(session: Session, staff_name: str) -> Contact: + """Resolve or create the contact record used by field event participants.""" + contact_type = "Field Event Participant" + organization = "NMBGMR" + # Contact uniqueness is enforced on (name, organization), so the lookup + # must use the same key to avoid missing an existing row with a different + # contact_type and attempting a duplicate insert. + contact = session.scalars( + select(Contact) + .where(Contact.name == staff_name) + .where(Contact.organization == organization) + ).first() + + if contact is None: + payload = { + "name": staff_name, + "role": "Technician", + "organization": organization, + "contact_type": contact_type, + } + contact = add_contact(session, payload, None, commit=False) + + return contact + + +def _resolve_measuring_participant( + row: _ValidatedRow, participants: list[FieldEventParticipant] +) -> FieldEventParticipant: + """Return the unique participant matching measuring_person or raise a row error.""" + matching_participants = [ + participant + for participant in participants + if participant.participant is not None + and participant.participant.name == row.sampler + ] + if len(matching_participants) == 1: + return matching_participants[0] + + if not matching_participants: + raise ValueError( + "measuring_person " + f"'{row.sampler}' could not be matched to a field event participant" + ) + + raise ValueError( + "measuring_person " + f"'{row.sampler}' matched multiple field event participants; " + # Ambiguous staff rows should fail so the importer never guesses which + # participant performed the measurement. + "field_staff values must identify exactly one measuring person" + ) + + def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None: + """Apply normalized sample values from the validated CSV row.""" sample.sample_date = row.measurement_dt sample.sample_name = sample_name sample.sample_matrix = "groundwater" @@ -472,6 +601,7 @@ def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) - def _apply_observation_values( observation: Observation, row: _ValidatedRow, parameter_id: int ) -> None: + """Apply normalized observation values from the validated CSV row.""" observation.observation_datetime = row.measurement_dt observation.parameter_id = parameter_id observation.value = row.depth_to_water_ft @@ -483,11 +613,8 @@ def _apply_observation_values( def _build_field_event_notes(row: _ValidatedRow) -> str | None: - parts = [f"Field staff: {row.field_staff}"] - if row.water_level_notes: - parts.append(row.water_level_notes) - notes = " | ".join(part for part in parts if part) - return notes or None + """Return only freeform field-event notes; staff lives in structured participants.""" + return row.water_level_notes or None def _get_groundwater_level_parameter_id(session: Session) -> int: diff --git a/tests/conftest.py b/tests/conftest.py index 5818707b..9eb1afd1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,7 @@ from alembic import command from alembic.config import Config from dotenv import load_dotenv -from sqlalchemy import delete +from sqlalchemy import delete, select from sqlalchemy import inspect as sa_inspect from core.initializers import init_lexicon, init_parameter @@ -174,6 +174,10 @@ def second_location(): @pytest.fixture() def water_well_thing(location): with session_ctx() as session: + # Some importer tests create participant contacts as a side effect. Keep + # a baseline so teardown can remove only the contacts introduced while + # this fixture-owned well existed. + existing_contact_ids = set(session.scalars(select(Contact.id)).all()) water_well = Thing( name="Test Well", first_visit_date="2023-03-03", @@ -209,10 +213,23 @@ def water_well_thing(location): session.refresh(water_well) session.refresh(assoc) yield water_well + # Capture participant contacts before deleting the well, because the + # field event rows cascade away with the well and would no longer be + # queryable afterward. + imported_contact_ids = set( + session.scalars( + select(FieldEventParticipant.contact_id) + .join(FieldEvent) + .where(FieldEvent.thing_id == water_well.id) + ).all() + ) session.delete(water_well) session.delete(assoc) session.delete(measuring_point_history) session.commit() + for contact_id in imported_contact_ids - existing_contact_ids: + _delete_if_present(session, session.get(Contact, contact_id)) + session.commit() @pytest.fixture() diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index a1d4515f..bc9ff031 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -27,7 +27,14 @@ from cli.cli import cli from cli.service_adapter import WellInventoryResult -from db import FieldActivity, FieldEvent, Observation, Sample +from db import ( + Contact, + FieldActivity, + FieldEvent, + FieldEventParticipant, + Observation, + Sample, +) from db.engine import session_ctx @@ -742,9 +749,8 @@ def _write_csv(path: Path, *, well_name: str, notes: str): observation.nma_data_quality == "Water level accurate to within two hundreths of a foot" ) - assert ( - field_event.notes == f"Field staff: CLI Tester | {unique_notes}" - ), "Field event notes should capture field staff and notes" + assert field_event.notes == unique_notes + assert field_activity.notes is None created_ids = { "observation_id": observation.id, @@ -756,6 +762,14 @@ def _write_csv(path: Path, *, well_name: str, notes: str): if created_ids: # Clean up committed rows so other tests see a pristine database. with session_ctx() as session: + # Collect participant contacts before deleting the field event so + # importer-created staff contacts do not leak into later tests. + participant_contact_ids = session.scalars( + select(FieldEventParticipant.contact_id).where( + FieldEventParticipant.field_event_id + == created_ids["field_event_id"] + ) + ).all() observation = session.get(Observation, created_ids["observation_id"]) sample = session.get(Sample, created_ids["sample_id"]) field_activity = session.get( @@ -775,6 +789,11 @@ def _write_csv(path: Path, *, well_name: str, notes: str): if field_event: session.delete(field_event) session.flush() + for contact_id in participant_contact_ids: + contact = session.get(Contact, contact_id) + if contact: + session.delete(contact) + session.flush() session.commit() diff --git a/tests/test_thing.py b/tests/test_thing.py index 9201dfbe..fd9c86d9 100644 --- a/tests/test_thing.py +++ b/tests/test_thing.py @@ -32,6 +32,7 @@ from schemas import DT_FMT from schemas.location import LocationResponse from schemas.thing import UpdateWell, ValidateWell +from services.water_level_csv import bulk_upload_water_levels from tests import ( client, override_authentication, @@ -721,6 +722,63 @@ def test_get_water_well_details_payload_uses_latest_observation_sample( session.commit() +def test_get_water_well_details_payload_includes_imported_water_level_staff( + water_well_thing, +): + """Imported water-level rows should expose measuring staff via structured detail payload fields.""" + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Imported measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + # `/details` is the primary frontend payload for latest water-level staff. + response = client.get(f"/thing/water-well/{water_well_thing.id}/details") + + assert response.status_code == 200 + data = response.json() + assert data["latest_field_event_sample"]["contact"]["name"] == "A Lopez" + assert { + participant["participant"]["name"] + for participant in data["field_event_participants"] + } == {"A Lopez", "B Chen"} + + def test_get_water_well_details_payload_404_not_found(): response = client.get("/thing/water-well/999999/details") diff --git a/tests/test_water_level_csv_service.py b/tests/test_water_level_csv_service.py index e4b01d9c..fb6fc233 100644 --- a/tests/test_water_level_csv_service.py +++ b/tests/test_water_level_csv_service.py @@ -2,7 +2,15 @@ from decimal import Decimal from types import SimpleNamespace -from db import FieldActivity, FieldEvent, Observation, Sample, Thing +from db import ( + Contact, + FieldActivity, + FieldEvent, + FieldEventParticipant, + Observation, + Sample, + Thing, +) from db.measuring_point_history import MeasuringPointHistory from db.engine import session_ctx from tests import get_parameter_id @@ -218,6 +226,8 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert len(observations) == 1 assert samples[0].sample_name == "Test Well-WL-202502151730" assert samples[0].sample_matrix == "groundwater" + assert samples[0].field_event_participant is not None + assert samples[0].field_event_participant.participant.name == "A Lopez" assert observations[0].groundwater_level_reason == "Water level not affected" assert ( observations[0].nma_data_quality @@ -226,6 +236,324 @@ def test_bulk_upload_water_levels_is_idempotent(water_well_thing): assert observations[0].measuring_point_height == 1.5 +def test_bulk_upload_water_levels_creates_field_event_participants(water_well_thing): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "field_staff_3", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + "C Diaz", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + field_event = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).one() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + contacts = session.scalars( + select(Contact) + .where( + Contact.name.in_(["A Lopez", "B Chen", "C Diaz"]), + Contact.organization == "NMBGMR", + Contact.contact_type == "Field Event Participant", + ) + .order_by(Contact.name.asc()) + ).all() + + assert len(participants) == 3 + assert [participant.participant_role for participant in participants] == [ + "Lead", + "Participant", + "Participant", + ] + assert {participant.field_event_id for participant in participants} == { + field_event.id + } + # Notes now carry only freeform text; staff identity should come from the + # structured participant records and the sample participant link. + assert field_event.notes == "Initial measurement" + assert len(contacts) == 3 + field_activity = session.scalars( + select(FieldActivity).where(FieldActivity.field_event_id == field_event.id) + ).one() + assert field_activity.notes is None + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_event.id) + ).one() + assert sample.field_event_participant_id == participants[0].id + assert sample.field_event_participant.participant.name == "A Lopez" + + +def test_bulk_upload_water_levels_does_not_duplicate_field_event_participants_on_rerun( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "B Chen", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + first = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert first.exit_code == 0, first.payload + + with session_ctx() as session: + field_event = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).one() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_event.id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_event.id) + ).one() + + # Capture the exact participant/contact linkage from the first import so + # the rerun can prove the importer reused those records rather than + # creating replacements. + first_participant_ids = [participant.id for participant in participants] + first_contact_ids = [participant.contact_id for participant in participants] + first_sample_participant_id = sample.field_event_participant_id + + second = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert second.exit_code == 0, second.payload + + with session_ctx() as session: + field_events = session.scalars( + select(FieldEvent) + .join(Thing, FieldEvent.thing_id == Thing.id) + .where(Thing.id == water_well_thing.id) + ).all() + participants = session.scalars( + select(FieldEventParticipant) + .where(FieldEventParticipant.field_event_id == field_events[0].id) + .order_by(FieldEventParticipant.id.asc()) + ).all() + sample = session.scalars( + select(Sample) + .join(FieldActivity, Sample.field_activity_id == FieldActivity.id) + .where(FieldActivity.field_event_id == field_events[0].id) + ).one() + + assert len(field_events) == 1 + assert len(participants) == 2 + assert [participant.id for participant in participants] == first_participant_ids + assert [ + participant.contact_id for participant in participants + ] == first_contact_ids + assert sample.field_event_participant_id == first_sample_participant_id + assert sample.field_event_participant is not None + assert sample.field_event_participant.participant.name == "A Lopez" + + +def test_bulk_upload_water_levels_fails_when_measuring_person_is_ambiguous( + water_well_thing, +): + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "field_staff_2", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + "A Lopez", + "A Lopez", + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + "A Lopez", + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 1 + assert result.payload["summary"]["total_rows_imported"] == 0 + assert result.payload["validation_errors"] == [ + "Row 1: measuring_person 'A Lopez' matched multiple field event " + "participants; field_staff values must identify exactly one measuring " + "person" + ] + + with session_ctx() as session: + samples = session.scalars(select(Sample)).all() + participants = session.scalars(select(FieldEventParticipant)).all() + + assert samples == [] + assert participants == [] + + +def test_bulk_upload_water_levels_reuses_contact_with_same_name_and_organization( + water_well_thing, +): + staff_name = "Z Vega" + + with session_ctx() as session: + existing_contact = Contact( + name=staff_name, + organization="NMBGMR", + role="Technician", + contact_type="Primary", + ) + session.add(existing_contact) + session.commit() + existing_contact_id = existing_contact.id + + csv_content = "\n".join( + [ + ",".join( + [ + "field_staff", + "well_name_point_id", + "field_event_date_time", + "measurement_date_time", + "sampler", + "sample_method", + "mp_height", + "level_status", + "depth_to_water_ft", + "data_quality", + "water_level_notes", + ] + ), + ",".join( + [ + staff_name, + water_well_thing.name, + "2025-02-15T08:00:00-07:00", + "2025-02-15T10:30:00-07:00", + staff_name, + "electric tape", + "1.5", + "Water level not affected", + "7.0", + "Water level accurate to within two hundreths of a foot", + "Initial measurement", + ] + ), + ] + ) + + result = bulk_upload_water_levels(csv_content.encode("utf-8")) + + assert result.exit_code == 0, result.payload + + with session_ctx() as session: + contacts = session.scalars( + select(Contact) + .where(Contact.name == staff_name) + .where(Contact.organization == "NMBGMR") + ).all() + participants = session.scalars(select(FieldEventParticipant)).all() + + assert len(contacts) == 1 + assert contacts[0].id == existing_contact_id + assert len(participants) == 1 + assert participants[0].contact_id == existing_contact_id + + def test_bulk_upload_water_levels_warns_when_mp_height_differs_from_history( water_well_thing, ):