Skip to content

Commit 653c9bb

Browse files
committed
feat: add copy_temporal_columns function to preserve acquisition timestamps and update relevant parsers
1 parent 737681f commit 653c9bb

5 files changed

Lines changed: 226 additions & 24 deletions

File tree

services/aem_parsers/agf.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import pandas as pd
2525

26-
from services.aem_parsers.common import finalize_parsed_dataframe
26+
from services.aem_parsers.common import copy_temporal_columns, finalize_parsed_dataframe
2727

2828
logger = logging.getLogger(__name__)
2929

@@ -78,6 +78,7 @@ def parse_agf_lci(filepath: str, system: str) -> pd.DataFrame:
7878
raise ValueError(f"Expected column '{col}' not found in AGF file.")
7979

8080
layer_df = df[["record_id", "Line", "E_UTM13Nm", "N_UTM13Nm", "DEM_m"]].copy()
81+
layer_df = copy_temporal_columns(df, layer_df)
8182

8283
# Sounding-level columns (constant across layers)
8384
if "ALTITUDE__m_" in df.columns:

services/aem_parsers/common.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@
2020
# to 26913 in-memory before deriving the stored WGS84 geometry.
2121
TARGET_EPSG = 26913
2222

23+
TEMPORAL_DATETIME_COLUMNS = [
24+
"acquisition_datetime",
25+
"datetime_acquired",
26+
"acquired_at",
27+
"timestamp",
28+
"datetime_utc",
29+
"utc_datetime",
30+
]
31+
TEMPORAL_TIME_COLUMNS = [
32+
"acquisition_time",
33+
"time_acquired",
34+
"time_utc",
35+
"utc_time",
36+
]
37+
2338
# Canonical column names for the parser/Parquet contract.
2439
# The database only persists the non-coordinate fields plus a WGS84 geometry.
2540
CANONICAL_COLUMNS = [
@@ -66,6 +81,17 @@ def ensure_canonical_columns(df: pd.DataFrame) -> pd.DataFrame:
6681
return df
6782

6883

84+
def copy_temporal_columns(
85+
source_df: pd.DataFrame,
86+
target_df: pd.DataFrame,
87+
) -> pd.DataFrame:
88+
"""Preserve likely acquisition timestamp columns through parser reshaping."""
89+
for col in TEMPORAL_DATETIME_COLUMNS + TEMPORAL_TIME_COLUMNS:
90+
if col in source_df.columns and col not in target_df.columns:
91+
target_df[col] = source_df[col]
92+
return target_df
93+
94+
6995
def finalize_parsed_dataframe(
7096
df: pd.DataFrame,
7197
source_label: str,

services/aem_parsers/seogi.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import pandas as pd
2727

28-
from services.aem_parsers.common import finalize_parsed_dataframe
28+
from services.aem_parsers.common import copy_temporal_columns, finalize_parsed_dataframe
2929
from services.aem_parsers.detect import extract_flight_id
3030

3131
logger = logging.getLogger(__name__)
@@ -89,6 +89,7 @@ def parse_seogi_rho(filepath: str, flight_id: Optional[str] = None) -> pd.DataFr
8989
)
9090

9191
layer_df = df[["record_id", "line_no", "utmx", "utmy", "elevation"]].copy()
92+
layer_df = copy_temporal_columns(df, layer_df)
9293

9394
# Include plm if present (some Seogi outputs include it)
9495
if "plm" in df.columns:

services/aem_stac.py

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
import pandas as pd
1010
import re
1111
from collections.abc import Callable, Iterable
12-
from datetime import date, datetime, timezone
12+
from datetime import date, datetime, time, timezone
1313
from google.cloud import storage
1414
from schemas.aem import SURVEY_METADATA, IngestConfig
15+
from services.aem_parsers.common import (
16+
TEMPORAL_DATETIME_COLUMNS,
17+
TEMPORAL_TIME_COLUMNS,
18+
)
1519
from services.util import transform_srid
1620
from shapely.geometry import box, mapping
17-
from urllib.parse import quote_plus, urlencode
21+
from urllib.parse import quote, quote_plus, urlencode
1822

1923
logger = logging.getLogger(__name__)
2024

@@ -42,8 +46,75 @@ def _stac_datetime_or_none(value) -> str | None:
4246
return f"{text_value}Z"
4347

4448

49+
def _combine_date_and_time(date_value, time_value):
50+
if date_value is None or pd.isna(date_value):
51+
return None
52+
if time_value is None or pd.isna(time_value):
53+
return date_value
54+
55+
if isinstance(date_value, pd.Timestamp):
56+
date_value = date_value.to_pydatetime()
57+
if isinstance(time_value, pd.Timestamp):
58+
time_value = time_value.to_pydatetime()
59+
60+
if isinstance(date_value, datetime):
61+
if date_value.time() != datetime.min.time():
62+
return date_value
63+
date_part = date_value.date()
64+
elif isinstance(date_value, date):
65+
date_part = date_value
66+
else:
67+
parsed_date = pd.to_datetime(date_value, errors="coerce")
68+
if pd.isna(parsed_date):
69+
return date_value
70+
date_part = parsed_date.date()
71+
72+
if isinstance(time_value, datetime):
73+
time_part = time_value.timetz()
74+
elif isinstance(time_value, time):
75+
time_part = time_value
76+
else:
77+
parsed_time = pd.to_datetime(time_value, errors="coerce")
78+
if pd.isna(parsed_time):
79+
return date_value
80+
time_part = parsed_time.timetz()
81+
82+
return datetime.combine(date_part, time_part)
83+
84+
85+
def _stac_datetimes_from_frame(df: pd.DataFrame) -> list[str]:
86+
for col in TEMPORAL_DATETIME_COLUMNS:
87+
if col not in df.columns:
88+
continue
89+
values = [_stac_datetime_or_none(value) for value in df[col]]
90+
cleaned = sorted({value for value in values if value is not None})
91+
if cleaned:
92+
return cleaned
93+
94+
if "date_acquired" not in df.columns:
95+
return []
96+
97+
for time_col in TEMPORAL_TIME_COLUMNS:
98+
if time_col not in df.columns:
99+
continue
100+
values = [
101+
_stac_datetime_or_none(_combine_date_and_time(date_value, time_value))
102+
for date_value, time_value in zip(
103+
df["date_acquired"], df[time_col], strict=False
104+
)
105+
]
106+
cleaned = sorted({value for value in values if value is not None})
107+
if cleaned:
108+
return cleaned
109+
110+
values = [_stac_datetime_or_none(value) for value in df["date_acquired"]]
111+
return sorted({value for value in values if value is not None})
112+
113+
45114
def _gcs_href(bucket: str, path: str) -> str:
46-
return f"gs://{bucket}/{path}"
115+
return (
116+
f"https://storage.googleapis.com/{bucket}/{quote(path.lstrip('/'), safe='/')}"
117+
)
47118

48119

49120
def _get_env_or_none(name: str) -> str | None:
@@ -56,12 +127,30 @@ def _build_geoserver_endpoint(
56127
default_path: str,
57128
override_env_name: str,
58129
) -> str:
130+
def _join_url_path(base_url: str, path: str) -> str:
131+
normalized_base = base_url.rstrip("/")
132+
normalized_path = f"/{path.lstrip('/')}"
133+
if normalized_base.endswith(normalized_path):
134+
return normalized_base
135+
base_parts = normalized_base.split("/")
136+
path_parts = normalized_path.lstrip("/").split("/")
137+
overlap = 0
138+
max_overlap = min(len(base_parts), len(path_parts))
139+
for size in range(max_overlap, 0, -1):
140+
if base_parts[-size:] == path_parts[:size]:
141+
overlap = size
142+
break
143+
if overlap:
144+
suffix = "/".join(path_parts[overlap:])
145+
return normalized_base if not suffix else f"{normalized_base}/{suffix}"
146+
return f"{normalized_base}{normalized_path}"
147+
59148
override = _get_env_or_none(override_env_name)
60149
if override is None:
61-
return f"{public_url.rstrip('/')}{default_path}"
150+
return _join_url_path(public_url, default_path)
62151
if override.startswith("http://") or override.startswith("https://"):
63152
return override
64-
return f"{public_url.rstrip('/')}/{override.lstrip('/')}"
153+
return _join_url_path(public_url, override)
65154

66155

67156
def _geoserver_layer_name(collection_id: str, workspace: str) -> str:
@@ -158,17 +247,7 @@ def _stac_temporal_extent(
158247
df: pd.DataFrame,
159248
config: IngestConfig,
160249
) -> tuple[str, str]:
161-
temporal_values = (
162-
sorted(
163-
{
164-
_stac_datetime_or_none(value)
165-
for value in df.get("date_acquired", pd.Series())
166-
}
167-
)
168-
if "date_acquired" in df.columns
169-
else []
170-
)
171-
temporal_values = [value for value in temporal_values if value is not None]
250+
temporal_values = _stac_datetimes_from_frame(df)
172251
if not temporal_values:
173252
fallback = _fallback_stac_datetime(config)
174253
return fallback, fallback
@@ -316,11 +395,7 @@ def build_stac_items(
316395
)
317396
geometry = mapping(geometry_point)
318397
bbox = [round(float(value), 6) for value in geometry_point.bounds]
319-
datetimes = [
320-
_stac_datetime_or_none(value)
321-
for value in group.get("date_acquired", pd.Series())
322-
]
323-
datetimes = sorted({value for value in datetimes if value is not None})
398+
datetimes = _stac_datetimes_from_frame(group)
324399
if datetimes:
325400
datetime_value = datetimes[0] if len(datetimes) == 1 else None
326401
start_datetime = datetimes[0]

tests/test_aem.py

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,8 @@ def test_build_stac_payloads_are_deterministic():
10171017
]
10181018
assert (
10191019
collection["assets"]["parquet"]["href"]
1020-
== "gs://example-bucket/surveys/gila_animas_2025/out.parquet"
1020+
== "https://storage.googleapis.com/example-bucket/"
1021+
"surveys/gila_animas_2025/out.parquet"
10211022
)
10221023
assert [item["id"] for item in items] == [
10231024
"aem-gila_animas_2025-preliminary_inversion-L1-R1",
@@ -1030,6 +1031,65 @@ def test_build_stac_payloads_are_deterministic():
10301031
assert "wcs" not in collection["assets"]
10311032

10321033

1034+
def test_build_stac_payloads_preserve_acquisition_time():
1035+
df = pd.DataFrame(
1036+
[
1037+
{
1038+
"line_id": "L1",
1039+
"record_id": "R1",
1040+
"easting": 500000,
1041+
"northing": 3800000,
1042+
"source_epsg": 32613,
1043+
"acquisition_datetime": datetime.datetime(
1044+
2025, 3, 1, 14, 15, tzinfo=datetime.timezone.utc
1045+
),
1046+
},
1047+
{
1048+
"line_id": "L2",
1049+
"record_id": "R2",
1050+
"easting": 500100,
1051+
"northing": 3800100,
1052+
"source_epsg": 32613,
1053+
"acquisition_datetime": datetime.datetime(
1054+
2025, 3, 2, 9, 45, tzinfo=datetime.timezone.utc
1055+
),
1056+
},
1057+
]
1058+
)
1059+
config = IngestConfig(
1060+
filepath="/tmp/input.csv",
1061+
survey_id="gila_animas_2025",
1062+
processing_stage=ProcessingStage.PRELIMINARY,
1063+
inversion_code=InversionCode.SEOGI_PYTHON,
1064+
contractor="GeoTech/Seogi",
1065+
gcs_bucket="example-bucket",
1066+
source_gcs_path="surveys/gila_animas_2025/source.csv",
1067+
)
1068+
1069+
collection = aem_stac_service.build_stac_collection(
1070+
df=df,
1071+
config=config,
1072+
parquet_gcs_path="surveys/gila_animas_2025/out.parquet",
1073+
raw_manifest_gcs_path="surveys/gila_animas_2025/raw_files.json",
1074+
)
1075+
items = aem_stac_service.build_stac_items(
1076+
df=df,
1077+
config=config,
1078+
parquet_gcs_path="surveys/gila_animas_2025/out.parquet",
1079+
raw_manifest_gcs_path="surveys/gila_animas_2025/raw_files.json",
1080+
)
1081+
1082+
assert collection["extent"]["temporal"]["interval"] == [
1083+
[
1084+
"2025-03-01T14:15:00Z",
1085+
"2025-03-02T09:45:00Z",
1086+
]
1087+
]
1088+
assert items[0]["properties"]["datetime"] == "2025-03-01T14:15:00Z"
1089+
assert items[0]["properties"]["start_datetime"] == "2025-03-01T14:15:00Z"
1090+
assert items[0]["properties"]["end_datetime"] == "2025-03-01T14:15:00Z"
1091+
1092+
10331093
def test_build_stac_collection_includes_survey_level_geoserver_assets(monkeypatch):
10341094
monkeypatch.setenv("GEOSERVER_PUBLIC_URL", "https://maps.example.com")
10351095
monkeypatch.setenv("GEOSERVER_WORKSPACE", "aem")
@@ -1080,6 +1140,45 @@ def test_build_stac_collection_includes_survey_level_geoserver_assets(monkeypatc
10801140
assert collection["assets"]["wms"]["geoserver:layer"] == "aem:aem-gila_animas_2025"
10811141

10821142

1143+
def test_build_stac_collection_deduplicates_geoserver_base_path(monkeypatch):
1144+
monkeypatch.setenv("GEOSERVER_PUBLIC_URL", "https://maps.example.com/geoserver")
1145+
monkeypatch.setenv("GEOSERVER_WORKSPACE", "aem")
1146+
1147+
df = pd.DataFrame(
1148+
[
1149+
{
1150+
"line_id": "L1",
1151+
"record_id": "R1",
1152+
"easting": 500000,
1153+
"northing": 3800000,
1154+
"source_epsg": 32613,
1155+
}
1156+
]
1157+
)
1158+
config = IngestConfig(
1159+
filepath="/tmp/input.csv",
1160+
survey_id="gila_animas_2025",
1161+
processing_stage=ProcessingStage.PRELIMINARY,
1162+
inversion_code=InversionCode.SEOGI_PYTHON,
1163+
contractor="GeoTech/Seogi",
1164+
gcs_bucket="example-bucket",
1165+
source_gcs_path="surveys/gila_animas_2025/source.csv",
1166+
)
1167+
1168+
collection = aem_stac_service.build_stac_collection(
1169+
df=df,
1170+
config=config,
1171+
parquet_gcs_path="surveys/gila_animas_2025/out.parquet",
1172+
raw_manifest_gcs_path="surveys/gila_animas_2025/raw_files.json",
1173+
)
1174+
1175+
assert collection["assets"]["wcs"]["href"] == (
1176+
"https://maps.example.com/geoserver/ows"
1177+
"?service=WCS&version=2.0.1&request=DescribeCoverage"
1178+
"&coverageId=aem%3Aaem-gila_animas_2025"
1179+
)
1180+
1181+
10831182
def test_load_stac_to_pgstac_uses_upsert(monkeypatch):
10841183
calls = []
10851184

0 commit comments

Comments
 (0)