From c06759756270a71858ad5aeeb86e7bb0936e4685 Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Mon, 29 Dec 2025 21:10:37 -0600 Subject: [PATCH 1/6] initial code for stats service --- dataretrieval/waterdata/api.py | 110 ++++++++++++++++++++++- dataretrieval/waterdata/types.py | 5 ++ dataretrieval/waterdata/utils.py | 150 +++++++++++++++++++++++++++++++ 3 files changed, 264 insertions(+), 1 deletion(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 63f7b81..03c9d6b 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -19,8 +19,13 @@ PROFILE_LOOKUP, PROFILES, SERVICES, + STATISTICS_SERVICES, +) +from dataretrieval.waterdata.utils import ( + SAMPLES_URL, + get_ogc_data, + get_stats_data ) -from dataretrieval.waterdata.utils import SAMPLES_URL, get_ogc_data # Set up logger for this module logger = logging.getLogger(__name__) @@ -1641,6 +1646,109 @@ def get_samples( return df, BaseMetadata(response) +def get_statistics( + service: STATISTICS_SERVICES = "observationNormals", + approval_status: Optional[str] = None, + computation_type: Optional[str] = None, + country_code: Optional[str] = None, + state_code: Optional[str] = None, + county_code: Optional[str] = None, + start_date: Optional[Union[str, datetime]] = None, + end_date: Optional[Union[str, datetime]] = None, + monitoring_location_id: Optional[str] = None, + page_size: int = 1000, + parent_timeseries_id: Optional[str] = None, + site_type_code: Optional[str] = None, + site_type_name: Optional[str] = None, + parameter_code: Optional[str] = None, + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """Get water data statistics from the USGS Water Data API. + This service provides endpoints for access to computations on the + historical record regarding water conditions, including minimum, maximum, + mean, median, and percentiles for day of year, month, month-year, and + water/calendar years. For more information regarding the calculation of + statistics and other details, please visit the Statistics documentation + page: https://waterdata.usgs.gov/statistics-documentation/. + + Note: This API is under active beta development and subject to + change. Improved handling of significant figures will be + addressed in a future release. + + Parameters + ---------- + service: string, One of the following options: "observationNormals" + or "observationIntervals". "observationNormals" returns + day-of-year and month-of-year statistics matching your query, + while "observationIntervals" returns monthly and annual statistics + matching your query. + approval_status: string, optional + Whether to include approved and/or provisional observations. + At this time, only approved observations are returned. + computation_type: string, optional + Desired statistical computation method. Available values are: + arithmetic_mean, maximum, median, minimum, percentile. + country_code: string, optional + Country query parameter. API defaults to "US". + state_code: string, optional + State query parameter. Takes the format "US:XX", where XX is + the two-digit state code. API defaults to "US:42" (Pennsylvania). + county_code: string, optional + County query parameter. Takes the format "US:XX:YYY", where XX is + the two-digit state code and YYY is the three-digit county code. + API defaults to "US:42:103" (Pennsylvania, Pike County). + start_date: string or datetime, optional + Start date for the query. Its format depends upon the service: + for "observationNormals", it is in the month-day format (MM-DD), + for "observationIntervals", it is in the year-month-day format + (YYYY-MM-DD). + end_date: string or datetime, optional + End date for the query. Its format depends upon the service: + for "observationNormals", it is in the month-day format (MM-DD), + for "observationIntervals", it is in the year-month-day format + (YYYY-MM-DD). + monitoring_location_id : string or list of strings, optional + A unique identifier representing a single monitoring location. This + corresponds to the id field in the monitoring-locations endpoint. + Monitoring location IDs are created by combining the agency code of the + agency responsible for the monitoring location (e.g. USGS) with the ID + number of the monitoring location (e.g. 02238500), separated by a hyphen + (e.g. USGS-02238500). + page_size : int, optional + The number of results to return per page, where one result represents a + monitoring location. The default is 1000. + parent_time_series_id: string, optional + The parent_time_series_id returns statistics tied to a particular datbase entry. + site_type_code: string, optional + Site type code query parameter. You can see a list of valid site type codes here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "GW" (Groundwater site) + site_type_name: string, optional + Site type name query parameter. You can see a list of valid site type names here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "Well" + parameter_code : string or list of strings, optional + Parameter codes are 5-digit codes used to identify the constituent + measured and the units of measure. A complete list of parameter codes + and associated groupings can be found at + https://help.waterdata.usgs.gov/codes-and-parameters/parameters. + """ + valid_services = get_args(STATISTICS_SERVICES) + if service not in valid_services: + raise ValueError( + f"Invalid service: '{service}'. Valid options are: {valid_services}." + ) + + params = { + k: v + for k, v in locals().items() + if k not in ["service"] and v is not None + } + + return get_stats_data( + args=params, + service=service, + ) + def _check_profiles( service: SERVICES, diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index 65e7339..e5b3926 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -11,6 +11,11 @@ "states", ] +STATISTICS_SERVICES = Literal[ + "observationNormals", + "observationIntervals" +] + SERVICES = Literal[ "activities", "locations", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 46d58b6..1236bd0 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -27,6 +27,8 @@ OGC_API_VERSION = "v0" OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}" SAMPLES_URL = f"{BASE_URL}/samples-data" +STATISTICS_API_VERSION = "v0" +STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}" def _switch_arg_id(ls: Dict[str, Any], id_name: str, service: str): @@ -820,4 +822,152 @@ def get_ogc_data( metadata = BaseMetadata(response) return return_list, metadata +def get_stats_data( + args: Dict[str, Any], + service: str, + geopd: bool, + client: Optional[requests.Session] = None, + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """ + Retrieves statistical data from a specified water data endpoint and returns it as a pandas DataFrame with metadata. + + This function prepares request arguments, constructs API requests, handles pagination, processes the results, + and formats the output DataFrame according to the specified parameters. + + Parameters + ---------- + args : Dict[str, Any] + Dictionary of request arguments for the statistics service. + service : str + The statistics service type (e.g., "observationNormals", "observationIntervals"). + geopd : bool, optional + If True, returns a GeoDataFrame if geometries are present; otherwise, returns a pandas DataFrame. + Defaults to False. + + Returns + ------- + pd.DataFrame + A DataFrame containing the retrieved and processed statistical data. + BaseMetadata + A metadata object containing request information including URL and query time. + """ + + url = f"{STATISTICS_API_URL}/{service}" + + if not geopd: + logger.info( + "Geopandas not installed. Geometries will be flattened into pandas DataFrames." + ) + + headers = _default_headers() + + request = requests.Request( + method="GET", + url=url, + headers=headers, + params=args, + ) + + req = request.prepare() + logger.info("Request: %s", req.url) + + # Get first response from client + # using GET or POST call + close_client = client is None + client = client or requests.Session() + + try: + resp = client.send(req) + if resp.status_code != 200: + raise Exception(_error_body(resp)) + + # Store the initial response for metadata + initial_response = resp + + # Grab some aspects of the original request: headers and the + # request type (GET or POST) + method = req.method.upper() + headers = dict(req.headers) + + # Check if it's an empty response + body = resp.json() + if body is None: + return pd.DataFrame() + + # If geopandas not installed, return a pandas dataframe + # otherwise return a geodataframe + if not geopd: + df = pd.json_normalize(resp['features']) + else: + df = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data']) + + dat = pd.json_normalize( + resp, + record_path=["features", "properties", "data", "values"], + meta=[ + ["features", "properties", "monitoring_location_id"], + ["features", "properties", "data", "parameter_code"], + ["features", "properties", "data", "unit_of_measure"], + ["features", "properties", "data", "parent_time_series_id"], + ["features", "geometry", "coordinates"], + ], + meta_prefix="", + errors="ignore", + ) + dat.columns = dat.columns.str.split('.').str[-1] + + dfs = df.merge(dat, on='monitoring_location_id', how='left') + + curr_url = body['next'] + + while curr_url: + try: + resp = client.request( + method, + curr_url, + headers=headers, + ) + if resp.status_code != 200: + error_text = _error_body(resp) + raise Exception(error_text) + # Check if it's an empty response + body = resp.json() + if body is None: + return pd.DataFrame() + + # If geopandas not installed, return a pandas dataframe + # otherwise return a geodataframe + if not geopd: + df1 = pd.json_normalize(resp['features']) + else: + df1 = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data']) + + dat = pd.json_normalize( + resp, + record_path=["features", "properties", "data", "values"], + meta=[ + ["features", "properties", "monitoring_location_id"], + ["features", "properties", "data", "parameter_code"], + ["features", "properties", "data", "unit_of_measure"], + ["features", "properties", "data", "parent_time_series_id"], + ["features", "geometry", "coordinates"], + ], + meta_prefix="", + errors="ignore", + ) + dat.columns = dat.columns.str.split('.').str[-1] + + df1 = df1.merge(dat, on='monitoring_location_id', how='left') + dfs = pd.concat([dfs, df1], ignore_index=True) + curr_url = body['next'] + except Exception: + warnings.warn(f"{error_text}. Data request incomplete.") + logger.error("Request incomplete. %s", error_text) + logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) + curr_url = None + return dfs, BaseMetadata(initial_response) + finally: + if close_client: + client.close() + From e00bb698da38e77351210952c4f523b942f22a07 Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Tue, 30 Dec 2025 10:58:37 -0600 Subject: [PATCH 2/6] modularize stats function, fix paging --- dataretrieval/waterdata/__init__.py | 2 + dataretrieval/waterdata/api.py | 8 +- dataretrieval/waterdata/utils.py | 142 +++++++++++++--------------- 3 files changed, 74 insertions(+), 78 deletions(-) diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 39b758f..c7c1daa 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -20,6 +20,7 @@ get_latest_daily, get_monitoring_locations, get_samples, + get_statistics, get_time_series_metadata, ) from .types import ( @@ -38,6 +39,7 @@ "get_latest_daily", "get_monitoring_locations", "get_samples", + "get_statistics", "get_time_series_metadata", "_check_profiles", "CODE_SERVICES", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 03c9d6b..79858ce 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -1653,8 +1653,8 @@ def get_statistics( country_code: Optional[str] = None, state_code: Optional[str] = None, county_code: Optional[str] = None, - start_date: Optional[Union[str, datetime]] = None, - end_date: Optional[Union[str, datetime]] = None, + start_date: Optional[Union[str, pd.datetime]] = None, + end_date: Optional[Union[str, pd.datetime]] = None, monitoring_location_id: Optional[str] = None, page_size: int = 1000, parent_timeseries_id: Optional[str] = None, @@ -1741,12 +1741,12 @@ def get_statistics( params = { k: v for k, v in locals().items() - if k not in ["service"] and v is not None + if k not in ["service", "valid_services"] and v is not None } return get_stats_data( args=params, - service=service, + service=service ) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 1236bd0..73bb7ae 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -588,6 +588,7 @@ def _walk_pages( dfs = pd.concat([dfs, df1], ignore_index=True) curr_url = _next_req_url(resp) except Exception: + error_text = _error_body(resp) warnings.warn(f"{error_text}. Data request incomplete.") logger.error("Request incomplete. %s", error_text) logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) @@ -822,10 +823,62 @@ def get_ogc_data( metadata = BaseMetadata(response) return return_list, metadata +def _handle_stats_nesting( + body: Dict[str, Any], + geopd: bool = False, +) -> pd.DataFrame: + """ + Takes nested json from stats service and flattens into a dataframe with + one row per monitoring location, parameter, and statistic. + + Parameters + ---------- + body : Dict[str, Any] + The JSON response body from the statistics service containing nested data. + + Returns + ------- + pd.DataFrame + A DataFrame containing the flattened statistical data. + """ + if body is None: + return pd.DataFrame() + + if not geopd: + logger.info( + "Geopandas not installed. Geometries will be flattened into pandas DataFrames." + ) + + # If geopandas not installed, return a pandas dataframe + # otherwise return a geodataframe + if not geopd: + df = pd.json_normalize(body['features']) + else: + df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data']) + + # Unnest json features, properties, data, and values while retaining necessary + # metadata to merge with main dataframe. + dat = pd.json_normalize( + body, + record_path=["features", "properties", "data", "values"], + meta=[ + ["features", "properties", "monitoring_location_id"], + ["features", "properties", "data", "parameter_code"], + ["features", "properties", "data", "unit_of_measure"], + ["features", "properties", "data", "parent_time_series_id"], + ["features", "geometry", "coordinates"], + ], + meta_prefix="", + errors="ignore", + ) + dat.columns = dat.columns.str.split('.').str[-1] + + return df.merge(dat, on='monitoring_location_id', how='left') + + def get_stats_data( args: Dict[str, Any], service: str, - geopd: bool, client: Optional[requests.Session] = None, ) -> Tuple[pd.DataFrame, BaseMetadata]: """ @@ -840,9 +893,6 @@ def get_stats_data( Dictionary of request arguments for the statistics service. service : str The statistics service type (e.g., "observationNormals", "observationIntervals"). - geopd : bool, optional - If True, returns a GeoDataFrame if geometries are present; otherwise, returns a pandas DataFrame. - Defaults to False. Returns ------- @@ -854,11 +904,6 @@ def get_stats_data( url = f"{STATISTICS_API_URL}/{service}" - if not geopd: - logger.info( - "Geopandas not installed. Geometries will be flattened into pandas DataFrames." - ) - headers = _default_headers() request = requests.Request( @@ -867,12 +912,11 @@ def get_stats_data( headers=headers, params=args, ) - req = request.prepare() logger.info("Request: %s", req.url) - # Get first response from client - # using GET or POST call + # create temp client if not provided + # and close it after the request is done close_client = client is None client = client or requests.Session() @@ -889,82 +933,32 @@ def get_stats_data( method = req.method.upper() headers = dict(req.headers) - # Check if it's an empty response body = resp.json() - if body is None: - return pd.DataFrame() - - # If geopandas not installed, return a pandas dataframe - # otherwise return a geodataframe - if not geopd: - df = pd.json_normalize(resp['features']) - else: - df = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data']) - - dat = pd.json_normalize( - resp, - record_path=["features", "properties", "data", "values"], - meta=[ - ["features", "properties", "monitoring_location_id"], - ["features", "properties", "data", "parameter_code"], - ["features", "properties", "data", "unit_of_measure"], - ["features", "properties", "data", "parent_time_series_id"], - ["features", "geometry", "coordinates"], - ], - meta_prefix="", - errors="ignore", - ) - dat.columns = dat.columns.str.split('.').str[-1] + dfs = _handle_stats_nesting(body, geopd=GEOPANDAS) - dfs = df.merge(dat, on='monitoring_location_id', how='left') + # Look for a next code in the response body + next_token = body['next'] - curr_url = body['next'] + while next_token: + args['next_token'] = next_token - while curr_url: try: resp = client.request( method, - curr_url, + url=url, + params=args, headers=headers, ) - if resp.status_code != 200: - error_text = _error_body(resp) - raise Exception(error_text) - # Check if it's an empty response body = resp.json() - if body is None: - return pd.DataFrame() - - # If geopandas not installed, return a pandas dataframe - # otherwise return a geodataframe - if not geopd: - df1 = pd.json_normalize(resp['features']) - else: - df1 = gpd.GeoDataFrame.from_features(resp["features"]).drop(columns=['data']) - - dat = pd.json_normalize( - resp, - record_path=["features", "properties", "data", "values"], - meta=[ - ["features", "properties", "monitoring_location_id"], - ["features", "properties", "data", "parameter_code"], - ["features", "properties", "data", "unit_of_measure"], - ["features", "properties", "data", "parent_time_series_id"], - ["features", "geometry", "coordinates"], - ], - meta_prefix="", - errors="ignore", - ) - dat.columns = dat.columns.str.split('.').str[-1] - - df1 = df1.merge(dat, on='monitoring_location_id', how='left') + df1 = _handle_stats_nesting(body, geopd=GEOPANDAS) dfs = pd.concat([dfs, df1], ignore_index=True) - curr_url = body['next'] + next_token = body['next'] except Exception: + error_text = _error_body(resp) warnings.warn(f"{error_text}. Data request incomplete.") logger.error("Request incomplete. %s", error_text) - logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) - curr_url = None + logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url) + next_token = None return dfs, BaseMetadata(initial_response) finally: if close_client: From 00d0cf92c39bd743d23cc585c6fc585b9beddf6f Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Tue, 30 Dec 2025 11:12:37 -0600 Subject: [PATCH 3/6] fix input types --- dataretrieval/waterdata/api.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 79858ce..5dea669 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -1649,18 +1649,18 @@ def get_samples( def get_statistics( service: STATISTICS_SERVICES = "observationNormals", approval_status: Optional[str] = None, - computation_type: Optional[str] = None, - country_code: Optional[str] = None, - state_code: Optional[str] = None, - county_code: Optional[str] = None, - start_date: Optional[Union[str, pd.datetime]] = None, - end_date: Optional[Union[str, pd.datetime]] = None, - monitoring_location_id: Optional[str] = None, + computation_type: Optional[Union[str, list[str]]] = None, + country_code: Optional[Union[str, list[str]]] = None, + state_code: Optional[Union[str, list[str]]] = None, + county_code: Optional[Union[str, list[str]]] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + monitoring_location_id: Optional[Union[str, list[str]]] = None, page_size: int = 1000, - parent_timeseries_id: Optional[str] = None, - site_type_code: Optional[str] = None, - site_type_name: Optional[str] = None, - parameter_code: Optional[str] = None, + parent_timeseries_id: Optional[Union[str, list[str]]] = None, + site_type_code: Optional[Union[str, list[str]]] = None, + site_type_name: Optional[Union[str, list[str]]] = None, + parameter_code: Optional[Union[str, list[str]]] = None, ) -> Tuple[pd.DataFrame, BaseMetadata]: """Get water data statistics from the USGS Water Data API. This service provides endpoints for access to computations on the From 65dff7fac07c3345fbe2711a2e2a919a27639ffd Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Tue, 30 Dec 2025 11:30:21 -0600 Subject: [PATCH 4/6] I don't think this is necessary here, as non-200's are treated as errors --- dataretrieval/waterdata/utils.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 73bb7ae..5efa85e 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -544,7 +544,7 @@ def _walk_pages( Raises ------ Exception - If a request fails or returns a non-200 status code. + If a request fails/returns a non-200 status code. """ logger.info("Requesting: %s", req.url) @@ -581,9 +581,6 @@ def _walk_pages( headers=headers, data=content if method == "POST" else None, ) - if resp.status_code != 200: - error_text = _error_body(resp) - raise Exception(error_text) df1 = _get_resp_data(resp, geopd=geopd) dfs = pd.concat([dfs, df1], ignore_index=True) curr_url = _next_req_url(resp) From c17a07f35bc760d08466dfcf156498800f684131 Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Tue, 30 Dec 2025 11:32:16 -0600 Subject: [PATCH 5/6] get rid of warnings.warn --- dataretrieval/waterdata/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 5efa85e..55ac5e6 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -586,7 +586,6 @@ def _walk_pages( curr_url = _next_req_url(resp) except Exception: error_text = _error_body(resp) - warnings.warn(f"{error_text}. Data request incomplete.") logger.error("Request incomplete. %s", error_text) logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) curr_url = None @@ -952,7 +951,6 @@ def get_stats_data( next_token = body['next'] except Exception: error_text = _error_body(resp) - warnings.warn(f"{error_text}. Data request incomplete.") logger.error("Request incomplete. %s", error_text) logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url) next_token = None From f7ee05344215a2118abaf1af3345388f56144079 Mon Sep 17 00:00:00 2001 From: Elise Hinman Date: Tue, 30 Dec 2025 15:17:32 -0600 Subject: [PATCH 6/6] fix non geopandas flattening --- dataretrieval/waterdata/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 55ac5e6..9ab473e 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -848,7 +848,8 @@ def _handle_stats_nesting( # If geopandas not installed, return a pandas dataframe # otherwise return a geodataframe if not geopd: - df = pd.json_normalize(body['features']) + df = pd.json_normalize(body['features']).drop(columns=['type', 'properties.data']) + df.columns = df.columns.str.split('.').str[-1] else: df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data']) @@ -862,7 +863,7 @@ def _handle_stats_nesting( ["features", "properties", "data", "parameter_code"], ["features", "properties", "data", "unit_of_measure"], ["features", "properties", "data", "parent_time_series_id"], - ["features", "geometry", "coordinates"], + #["features", "geometry", "coordinates"], ], meta_prefix="", errors="ignore", @@ -946,7 +947,7 @@ def get_stats_data( headers=headers, ) body = resp.json() - df1 = _handle_stats_nesting(body, geopd=GEOPANDAS) + df1 = _handle_stats_nesting(body, geopd=False) dfs = pd.concat([dfs, df1], ignore_index=True) next_token = body['next'] except Exception: