Skip to content
147 changes: 137 additions & 10 deletions services/water_level_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@
from pathlib import Path
from typing import Any, BinaryIO, Iterable, List

from db import Thing, FieldEvent, FieldActivity, Sample, Observation, Parameter
from db import (
Thing,
FieldEvent,
FieldActivity,
Sample,
Observation,
Parameter,
Contact,
FieldEventParticipant,
)
from db.engine import session_ctx
from pydantic import ValidationError
from schemas.water_level_csv import (
Expand All @@ -35,6 +44,7 @@
)
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from services.contact_helper import add_contact
from services.thing_helper import find_water_wells_by_name

REQUIRED_FIELDS: List[str] = list(WATER_LEVEL_REQUIRED_FIELDS)
Expand All @@ -55,7 +65,7 @@ class _ValidatedRow:
row_index: int
raw: dict[str, str]
well: Thing
field_staff: str
field_staff_entries: tuple[tuple[str, str], ...]
sampler: str
sample_method_term: str
field_event_dt: datetime
Expand Down Expand Up @@ -290,7 +300,7 @@ def _validate_rows(
row_index=idx,
raw={**normalized},
well=well,
field_staff=model.field_staff,
field_staff_entries=_normalize_field_staff_entries(model),
sampler=model.measuring_person,
sample_method_term=model.sample_method,
field_event_dt=model.field_event_date_time,
Expand All @@ -309,6 +319,20 @@ def _validate_rows(
return valid_rows, errors


def _normalize_field_staff_entries(
model: WaterLevelCsvRow,
) -> tuple[tuple[str, str], ...]:
"""Normalize fixed staff columns into an iterable participant list."""
participant_specs = (
(model.field_staff, "Lead"),
(model.field_staff_2, "Participant"),
(model.field_staff_3, "Participant"),
)
return tuple(
(staff_name, role) for staff_name, role in participant_specs if staff_name
)


def _resolve_measuring_point_height(
well: Thing, csv_mp_height: float | None
) -> tuple[float | int | None, float | int | None, bool]:
Expand Down Expand Up @@ -352,6 +376,7 @@ def _validate_depth_to_water_against_well(
def _create_records(
session: Session, parameter_id: int, rows: list[_ValidatedRow]
) -> tuple[list[dict[str, Any]], list[str]]:
"""Create or update field-event, sample, and observation rows for each CSV row."""
created: list[dict[str, Any]] = []
errors: list[str] = []

Expand All @@ -371,7 +396,9 @@ def _create_records(
field_activity = FieldActivity(
field_event=field_event,
activity_type="groundwater level",
notes=f"Sampler: {row.sampler}",
# Measuring staff now lives on structured participants and the
# sample participant link, not in field_activity.notes.
notes=None,
)
sample = Sample(field_activity=field_activity)
observation = Observation(sample=sample)
Expand All @@ -389,10 +416,18 @@ def _create_records(

field_event.event_date = row.field_event_dt
field_event.notes = _build_field_event_notes(row)
field_activity.notes = f"Sampler: {row.sampler}"
# Clear any legacy sampler note text so downstream readers use
# structured participant data as the authoritative source.
field_activity.notes = None

_apply_sample_values(sample, row, sample_name)
_apply_observation_values(observation, row, parameter_id)
# Add participants after required sample/observation fields are populated
# so the contact lookup does not trigger an autoflush of incomplete rows.
participants = _ensure_field_event_participants(session, field_event, row)
sample.field_event_participant = _resolve_measuring_participant(
row, participants
)
session.flush()
savepoint.commit()

Expand Down Expand Up @@ -427,12 +462,14 @@ def _create_records(


def _build_sample_name(row: _ValidatedRow) -> str:
"""Build the deterministic sample identifier used for create/update matching."""
return f"{row.well.name}-WL-{row.measurement_dt.strftime('%Y%m%d%H%M')}"


def _find_existing_imported_sample(
session: Session, row: _ValidatedRow, sample_name: str
) -> Sample | None:
"""Return the previously imported groundwater-level sample for this row, if any."""
sql = (
select(Sample)
.join(FieldActivity, Sample.field_activity_id == FieldActivity.id)
Expand All @@ -454,13 +491,105 @@ def _find_existing_imported_sample(


def _find_existing_observation(sample: Sample, parameter_id: int) -> Observation | None:
"""Return the groundwater-level observation already linked to the sample, if any."""
for observation in sample.observations:
if observation.parameter_id == parameter_id:
return observation
return None


def _ensure_field_event_participants(
session: Session, field_event: FieldEvent, row: _ValidatedRow
) -> list[FieldEventParticipant]:
"""Return event participants for imported staff names, creating any missing ones."""
existing_participants = session.scalars(
select(FieldEventParticipant)
.options(selectinload(FieldEventParticipant.participant))
.where(FieldEventParticipant.field_event_id == field_event.id)
.order_by(FieldEventParticipant.id.asc())
).all()

for staff_name, role in row.field_staff_entries:
contact = _get_or_create_field_staff_contact(session, staff_name)
participant = next(
(
existing
for existing in existing_participants
if existing.contact_id == contact.id
and existing.participant_role == role
),
None,
)
if participant is None:
participant = FieldEventParticipant(
field_event=field_event,
contact_id=contact.id,
participant_role=role,
)
session.add(participant)
# Attach the resolved contact eagerly so downstream matching can use
# participant.participant.name without an extra lookup.
participant.participant = contact
existing_participants.append(participant)

return existing_participants


def _get_or_create_field_staff_contact(session: Session, staff_name: str) -> Contact:
"""Resolve or create the contact record used by field event participants."""
contact_type = "Field Event Participant"
organization = "NMBGMR"
# Contact uniqueness is enforced on (name, organization), so the lookup
# must use the same key to avoid missing an existing row with a different
# contact_type and attempting a duplicate insert.
contact = session.scalars(
select(Contact)
.where(Contact.name == staff_name)
.where(Contact.organization == organization)
).first()

if contact is None:
payload = {
"name": staff_name,
"role": "Technician",
"organization": organization,
"contact_type": contact_type,
}
contact = add_contact(session, payload, None, commit=False)

return contact


def _resolve_measuring_participant(
row: _ValidatedRow, participants: list[FieldEventParticipant]
) -> FieldEventParticipant:
"""Return the unique participant matching measuring_person or raise a row error."""
matching_participants = [
participant
for participant in participants
if participant.participant is not None
and participant.participant.name == row.sampler
]
if len(matching_participants) == 1:
return matching_participants[0]

if not matching_participants:
raise ValueError(
"measuring_person "
f"'{row.sampler}' could not be matched to a field event participant"
)

raise ValueError(
"measuring_person "
f"'{row.sampler}' matched multiple field event participants; "
# Ambiguous staff rows should fail so the importer never guesses which
# participant performed the measurement.
"field_staff values must identify exactly one measuring person"
)


def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -> None:
"""Apply normalized sample values from the validated CSV row."""
sample.sample_date = row.measurement_dt
sample.sample_name = sample_name
sample.sample_matrix = "groundwater"
Expand All @@ -472,6 +601,7 @@ def _apply_sample_values(sample: Sample, row: _ValidatedRow, sample_name: str) -
def _apply_observation_values(
observation: Observation, row: _ValidatedRow, parameter_id: int
) -> None:
"""Apply normalized observation values from the validated CSV row."""
observation.observation_datetime = row.measurement_dt
observation.parameter_id = parameter_id
observation.value = row.depth_to_water_ft
Expand All @@ -483,11 +613,8 @@ def _apply_observation_values(


def _build_field_event_notes(row: _ValidatedRow) -> str | None:
parts = [f"Field staff: {row.field_staff}"]
if row.water_level_notes:
parts.append(row.water_level_notes)
notes = " | ".join(part for part in parts if part)
return notes or None
"""Return only freeform field-event notes; staff lives in structured participants."""
return row.water_level_notes or None


def _get_groundwater_level_parameter_id(session: Session) -> int:
Expand Down
19 changes: 18 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from alembic import command
from alembic.config import Config
from dotenv import load_dotenv
from sqlalchemy import delete
from sqlalchemy import delete, select
from sqlalchemy import inspect as sa_inspect

from core.initializers import init_lexicon, init_parameter
Expand Down Expand Up @@ -174,6 +174,10 @@ def second_location():
@pytest.fixture()
def water_well_thing(location):
with session_ctx() as session:
# Some importer tests create participant contacts as a side effect. Keep
# a baseline so teardown can remove only the contacts introduced while
# this fixture-owned well existed.
existing_contact_ids = set(session.scalars(select(Contact.id)).all())
water_well = Thing(
name="Test Well",
first_visit_date="2023-03-03",
Expand Down Expand Up @@ -209,10 +213,23 @@ def water_well_thing(location):
session.refresh(water_well)
session.refresh(assoc)
yield water_well
# Capture participant contacts before deleting the well, because the
# field event rows cascade away with the well and would no longer be
# queryable afterward.
imported_contact_ids = set(
session.scalars(
select(FieldEventParticipant.contact_id)
.join(FieldEvent)
.where(FieldEvent.thing_id == water_well.id)
).all()
)
session.delete(water_well)
session.delete(assoc)
session.delete(measuring_point_history)
session.commit()
for contact_id in imported_contact_ids - existing_contact_ids:
_delete_if_present(session, session.get(Contact, contact_id))
session.commit()


@pytest.fixture()
Expand Down
27 changes: 23 additions & 4 deletions tests/test_cli_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@

from cli.cli import cli
from cli.service_adapter import WellInventoryResult
from db import FieldActivity, FieldEvent, Observation, Sample
from db import (
Contact,
FieldActivity,
FieldEvent,
FieldEventParticipant,
Observation,
Sample,
)
from db.engine import session_ctx


Expand Down Expand Up @@ -742,9 +749,8 @@ def _write_csv(path: Path, *, well_name: str, notes: str):
observation.nma_data_quality
== "Water level accurate to within two hundreths of a foot"
)
assert (
field_event.notes == f"Field staff: CLI Tester | {unique_notes}"
), "Field event notes should capture field staff and notes"
assert field_event.notes == unique_notes
assert field_activity.notes is None

created_ids = {
"observation_id": observation.id,
Expand All @@ -756,6 +762,14 @@ def _write_csv(path: Path, *, well_name: str, notes: str):
if created_ids:
# Clean up committed rows so other tests see a pristine database.
with session_ctx() as session:
# Collect participant contacts before deleting the field event so
# importer-created staff contacts do not leak into later tests.
participant_contact_ids = session.scalars(
select(FieldEventParticipant.contact_id).where(
FieldEventParticipant.field_event_id
== created_ids["field_event_id"]
)
).all()
observation = session.get(Observation, created_ids["observation_id"])
sample = session.get(Sample, created_ids["sample_id"])
field_activity = session.get(
Expand All @@ -775,6 +789,11 @@ def _write_csv(path: Path, *, well_name: str, notes: str):
if field_event:
session.delete(field_event)
session.flush()
for contact_id in participant_contact_ids:
contact = session.get(Contact, contact_id)
if contact:
session.delete(contact)
session.flush()

session.commit()

Expand Down
Loading
Loading