Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataretrieval/waterdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
get_latest_daily,
get_monitoring_locations,
get_samples,
get_statistics,
get_time_series_metadata,
)
from .types import (
Expand All @@ -38,6 +39,7 @@
"get_latest_daily",
"get_monitoring_locations",
"get_samples",
"get_statistics",
"get_time_series_metadata",
"_check_profiles",
"CODE_SERVICES",
Expand Down
110 changes: 109 additions & 1 deletion dataretrieval/waterdata/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
PROFILE_LOOKUP,
PROFILES,
SERVICES,
STATISTICS_SERVICES,
)
from dataretrieval.waterdata.utils import (
SAMPLES_URL,
get_ogc_data,
get_stats_data
)
from dataretrieval.waterdata.utils import SAMPLES_URL, get_ogc_data

# Set up logger for this module
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1641,6 +1646,109 @@ def get_samples(

return df, BaseMetadata(response)

def get_statistics(
service: STATISTICS_SERVICES = "observationNormals",
approval_status: Optional[str] = None,
computation_type: Optional[Union[str, list[str]]] = None,
country_code: Optional[Union[str, list[str]]] = None,
state_code: Optional[Union[str, list[str]]] = None,
county_code: Optional[Union[str, list[str]]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
monitoring_location_id: Optional[Union[str, list[str]]] = None,
page_size: int = 1000,
parent_timeseries_id: Optional[Union[str, list[str]]] = None,
site_type_code: Optional[Union[str, list[str]]] = None,
site_type_name: Optional[Union[str, list[str]]] = None,
parameter_code: Optional[Union[str, list[str]]] = None,
) -> Tuple[pd.DataFrame, BaseMetadata]:
"""Get water data statistics from the USGS Water Data API.
This service provides endpoints for access to computations on the
historical record regarding water conditions, including minimum, maximum,
mean, median, and percentiles for day of year, month, month-year, and
water/calendar years. For more information regarding the calculation of
statistics and other details, please visit the Statistics documentation
page: https://waterdata.usgs.gov/statistics-documentation/.

Note: This API is under active beta development and subject to
change. Improved handling of significant figures will be
addressed in a future release.

Parameters
----------
service: string, One of the following options: "observationNormals"
or "observationIntervals". "observationNormals" returns
day-of-year and month-of-year statistics matching your query,
while "observationIntervals" returns monthly and annual statistics
matching your query.
approval_status: string, optional
Whether to include approved and/or provisional observations.
At this time, only approved observations are returned.
computation_type: string, optional
Desired statistical computation method. Available values are:
arithmetic_mean, maximum, median, minimum, percentile.
country_code: string, optional
Country query parameter. API defaults to "US".
state_code: string, optional
State query parameter. Takes the format "US:XX", where XX is
the two-digit state code. API defaults to "US:42" (Pennsylvania).
county_code: string, optional
County query parameter. Takes the format "US:XX:YYY", where XX is
the two-digit state code and YYY is the three-digit county code.
API defaults to "US:42:103" (Pennsylvania, Pike County).
start_date: string or datetime, optional
Start date for the query. Its format depends upon the service:
for "observationNormals", it is in the month-day format (MM-DD),
for "observationIntervals", it is in the year-month-day format
(YYYY-MM-DD).
end_date: string or datetime, optional
End date for the query. Its format depends upon the service:
for "observationNormals", it is in the month-day format (MM-DD),
for "observationIntervals", it is in the year-month-day format
(YYYY-MM-DD).
monitoring_location_id : string or list of strings, optional
A unique identifier representing a single monitoring location. This
corresponds to the id field in the monitoring-locations endpoint.
Monitoring location IDs are created by combining the agency code of the
agency responsible for the monitoring location (e.g. USGS) with the ID
number of the monitoring location (e.g. 02238500), separated by a hyphen
(e.g. USGS-02238500).
page_size : int, optional
The number of results to return per page, where one result represents a
monitoring location. The default is 1000.
parent_time_series_id: string, optional
The parent_time_series_id returns statistics tied to a particular datbase entry.
site_type_code: string, optional
Site type code query parameter. You can see a list of valid site type codes here:
https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items.
Example: "GW" (Groundwater site)
site_type_name: string, optional
Site type name query parameter. You can see a list of valid site type names here:
https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items.
Example: "Well"
parameter_code : string or list of strings, optional
Parameter codes are 5-digit codes used to identify the constituent
measured and the units of measure. A complete list of parameter codes
and associated groupings can be found at
https://help.waterdata.usgs.gov/codes-and-parameters/parameters.
"""
valid_services = get_args(STATISTICS_SERVICES)
if service not in valid_services:
raise ValueError(
f"Invalid service: '{service}'. Valid options are: {valid_services}."
)

params = {
k: v
for k, v in locals().items()
if k not in ["service", "valid_services"] and v is not None
}

return get_stats_data(
args=params,
service=service
)


def _check_profiles(
service: SERVICES,
Expand Down
5 changes: 5 additions & 0 deletions dataretrieval/waterdata/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
"states",
]

STATISTICS_SERVICES = Literal[
"observationNormals",
"observationIntervals"
]

SERVICES = Literal[
"activities",
"locations",
Expand Down
150 changes: 145 additions & 5 deletions dataretrieval/waterdata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
OGC_API_VERSION = "v0"
OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}"
SAMPLES_URL = f"{BASE_URL}/samples-data"
STATISTICS_API_VERSION = "v0"
STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}"


def _switch_arg_id(ls: Dict[str, Any], id_name: str, service: str):
Expand Down Expand Up @@ -542,7 +544,7 @@ def _walk_pages(
Raises
------
Exception
If a request fails or returns a non-200 status code.
If a request fails/returns a non-200 status code.
"""
logger.info("Requesting: %s", req.url)

Expand Down Expand Up @@ -579,14 +581,11 @@ def _walk_pages(
headers=headers,
data=content if method == "POST" else None,
)
if resp.status_code != 200:
error_text = _error_body(resp)
raise Exception(error_text)
df1 = _get_resp_data(resp, geopd=geopd)
dfs = pd.concat([dfs, df1], ignore_index=True)
curr_url = _next_req_url(resp)
except Exception:
warnings.warn(f"{error_text}. Data request incomplete.")
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url)
curr_url = None
Expand Down Expand Up @@ -820,4 +819,145 @@ def get_ogc_data(
metadata = BaseMetadata(response)
return return_list, metadata

def _handle_stats_nesting(
body: Dict[str, Any],
geopd: bool = False,
) -> pd.DataFrame:
"""
Takes nested json from stats service and flattens into a dataframe with
one row per monitoring location, parameter, and statistic.

Parameters
----------
body : Dict[str, Any]
The JSON response body from the statistics service containing nested data.

Returns
-------
pd.DataFrame
A DataFrame containing the flattened statistical data.
"""
if body is None:
return pd.DataFrame()

if not geopd:
logger.info(
"Geopandas not installed. Geometries will be flattened into pandas DataFrames."
)

# If geopandas not installed, return a pandas dataframe
# otherwise return a geodataframe
if not geopd:
df = pd.json_normalize(body['features']).drop(columns=['type', 'properties.data'])
df.columns = df.columns.str.split('.').str[-1]
else:
df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data'])

# Unnest json features, properties, data, and values while retaining necessary
# metadata to merge with main dataframe.
dat = pd.json_normalize(
body,
record_path=["features", "properties", "data", "values"],
meta=[
["features", "properties", "monitoring_location_id"],
["features", "properties", "data", "parameter_code"],
["features", "properties", "data", "unit_of_measure"],
["features", "properties", "data", "parent_time_series_id"],
#["features", "geometry", "coordinates"],
],
meta_prefix="",
errors="ignore",
)
dat.columns = dat.columns.str.split('.').str[-1]

return df.merge(dat, on='monitoring_location_id', how='left')


def get_stats_data(
args: Dict[str, Any],
service: str,
client: Optional[requests.Session] = None,
) -> Tuple[pd.DataFrame, BaseMetadata]:
"""
Retrieves statistical data from a specified water data endpoint and returns it as a pandas DataFrame with metadata.

This function prepares request arguments, constructs API requests, handles pagination, processes the results,
and formats the output DataFrame according to the specified parameters.

Parameters
----------
args : Dict[str, Any]
Dictionary of request arguments for the statistics service.
service : str
The statistics service type (e.g., "observationNormals", "observationIntervals").

Returns
-------
pd.DataFrame
A DataFrame containing the retrieved and processed statistical data.
BaseMetadata
A metadata object containing request information including URL and query time.
"""

url = f"{STATISTICS_API_URL}/{service}"

headers = _default_headers()

request = requests.Request(
method="GET",
url=url,
headers=headers,
params=args,
)
req = request.prepare()
logger.info("Request: %s", req.url)

# create temp client if not provided
# and close it after the request is done
close_client = client is None
client = client or requests.Session()

try:
resp = client.send(req)
if resp.status_code != 200:
raise Exception(_error_body(resp))

# Store the initial response for metadata
initial_response = resp

# Grab some aspects of the original request: headers and the
# request type (GET or POST)
method = req.method.upper()
headers = dict(req.headers)

body = resp.json()
dfs = _handle_stats_nesting(body, geopd=GEOPANDAS)

# Look for a next code in the response body
next_token = body['next']

while next_token:
args['next_token'] = next_token

try:
resp = client.request(
method,
url=url,
params=args,
headers=headers,
)
body = resp.json()
df1 = _handle_stats_nesting(body, geopd=False)
dfs = pd.concat([dfs, df1], ignore_index=True)
next_token = body['next']
except Exception:
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url)
next_token = None
return dfs, BaseMetadata(initial_response)
finally:
if close_client:
client.close()