diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 39b758f..c7c1daa 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -20,6 +20,7 @@ get_latest_daily, get_monitoring_locations, get_samples, + get_statistics, get_time_series_metadata, ) from .types import ( @@ -38,6 +39,7 @@ "get_latest_daily", "get_monitoring_locations", "get_samples", + "get_statistics", "get_time_series_metadata", "_check_profiles", "CODE_SERVICES", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 63f7b81..5dea669 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -19,8 +19,13 @@ PROFILE_LOOKUP, PROFILES, SERVICES, + STATISTICS_SERVICES, +) +from dataretrieval.waterdata.utils import ( + SAMPLES_URL, + get_ogc_data, + get_stats_data ) -from dataretrieval.waterdata.utils import SAMPLES_URL, get_ogc_data # Set up logger for this module logger = logging.getLogger(__name__) @@ -1641,6 +1646,109 @@ def get_samples( return df, BaseMetadata(response) +def get_statistics( + service: STATISTICS_SERVICES = "observationNormals", + approval_status: Optional[str] = None, + computation_type: Optional[Union[str, list[str]]] = None, + country_code: Optional[Union[str, list[str]]] = None, + state_code: Optional[Union[str, list[str]]] = None, + county_code: Optional[Union[str, list[str]]] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + monitoring_location_id: Optional[Union[str, list[str]]] = None, + page_size: int = 1000, + parent_timeseries_id: Optional[Union[str, list[str]]] = None, + site_type_code: Optional[Union[str, list[str]]] = None, + site_type_name: Optional[Union[str, list[str]]] = None, + parameter_code: Optional[Union[str, list[str]]] = None, + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """Get water data statistics from the USGS Water Data API. + This service provides endpoints for access to computations on the + historical record regarding water conditions, including minimum, maximum, + mean, median, and percentiles for day of year, month, month-year, and + water/calendar years. For more information regarding the calculation of + statistics and other details, please visit the Statistics documentation + page: https://waterdata.usgs.gov/statistics-documentation/. + + Note: This API is under active beta development and subject to + change. Improved handling of significant figures will be + addressed in a future release. + + Parameters + ---------- + service: string, One of the following options: "observationNormals" + or "observationIntervals". "observationNormals" returns + day-of-year and month-of-year statistics matching your query, + while "observationIntervals" returns monthly and annual statistics + matching your query. + approval_status: string, optional + Whether to include approved and/or provisional observations. + At this time, only approved observations are returned. + computation_type: string, optional + Desired statistical computation method. Available values are: + arithmetic_mean, maximum, median, minimum, percentile. + country_code: string, optional + Country query parameter. API defaults to "US". + state_code: string, optional + State query parameter. Takes the format "US:XX", where XX is + the two-digit state code. API defaults to "US:42" (Pennsylvania). + county_code: string, optional + County query parameter. Takes the format "US:XX:YYY", where XX is + the two-digit state code and YYY is the three-digit county code. + API defaults to "US:42:103" (Pennsylvania, Pike County). + start_date: string or datetime, optional + Start date for the query. Its format depends upon the service: + for "observationNormals", it is in the month-day format (MM-DD), + for "observationIntervals", it is in the year-month-day format + (YYYY-MM-DD). + end_date: string or datetime, optional + End date for the query. Its format depends upon the service: + for "observationNormals", it is in the month-day format (MM-DD), + for "observationIntervals", it is in the year-month-day format + (YYYY-MM-DD). + monitoring_location_id : string or list of strings, optional + A unique identifier representing a single monitoring location. This + corresponds to the id field in the monitoring-locations endpoint. + Monitoring location IDs are created by combining the agency code of the + agency responsible for the monitoring location (e.g. USGS) with the ID + number of the monitoring location (e.g. 02238500), separated by a hyphen + (e.g. USGS-02238500). + page_size : int, optional + The number of results to return per page, where one result represents a + monitoring location. The default is 1000. + parent_time_series_id: string, optional + The parent_time_series_id returns statistics tied to a particular datbase entry. + site_type_code: string, optional + Site type code query parameter. You can see a list of valid site type codes here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "GW" (Groundwater site) + site_type_name: string, optional + Site type name query parameter. You can see a list of valid site type names here: + https://api.waterdata.usgs.gov/ogcapi/v0/collections/site-types/items. + Example: "Well" + parameter_code : string or list of strings, optional + Parameter codes are 5-digit codes used to identify the constituent + measured and the units of measure. A complete list of parameter codes + and associated groupings can be found at + https://help.waterdata.usgs.gov/codes-and-parameters/parameters. + """ + valid_services = get_args(STATISTICS_SERVICES) + if service not in valid_services: + raise ValueError( + f"Invalid service: '{service}'. Valid options are: {valid_services}." + ) + + params = { + k: v + for k, v in locals().items() + if k not in ["service", "valid_services"] and v is not None + } + + return get_stats_data( + args=params, + service=service + ) + def _check_profiles( service: SERVICES, diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index 65e7339..e5b3926 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -11,6 +11,11 @@ "states", ] +STATISTICS_SERVICES = Literal[ + "observationNormals", + "observationIntervals" +] + SERVICES = Literal[ "activities", "locations", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 46d58b6..9ab473e 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -27,6 +27,8 @@ OGC_API_VERSION = "v0" OGC_API_URL = f"{BASE_URL}/ogcapi/{OGC_API_VERSION}" SAMPLES_URL = f"{BASE_URL}/samples-data" +STATISTICS_API_VERSION = "v0" +STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}" def _switch_arg_id(ls: Dict[str, Any], id_name: str, service: str): @@ -542,7 +544,7 @@ def _walk_pages( Raises ------ Exception - If a request fails or returns a non-200 status code. + If a request fails/returns a non-200 status code. """ logger.info("Requesting: %s", req.url) @@ -579,14 +581,11 @@ def _walk_pages( headers=headers, data=content if method == "POST" else None, ) - if resp.status_code != 200: - error_text = _error_body(resp) - raise Exception(error_text) df1 = _get_resp_data(resp, geopd=geopd) dfs = pd.concat([dfs, df1], ignore_index=True) curr_url = _next_req_url(resp) except Exception: - warnings.warn(f"{error_text}. Data request incomplete.") + error_text = _error_body(resp) logger.error("Request incomplete. %s", error_text) logger.warning("Request failed for URL: %s. Data download interrupted.", curr_url) curr_url = None @@ -820,4 +819,145 @@ def get_ogc_data( metadata = BaseMetadata(response) return return_list, metadata +def _handle_stats_nesting( + body: Dict[str, Any], + geopd: bool = False, +) -> pd.DataFrame: + """ + Takes nested json from stats service and flattens into a dataframe with + one row per monitoring location, parameter, and statistic. + + Parameters + ---------- + body : Dict[str, Any] + The JSON response body from the statistics service containing nested data. + + Returns + ------- + pd.DataFrame + A DataFrame containing the flattened statistical data. + """ + if body is None: + return pd.DataFrame() + + if not geopd: + logger.info( + "Geopandas not installed. Geometries will be flattened into pandas DataFrames." + ) + + # If geopandas not installed, return a pandas dataframe + # otherwise return a geodataframe + if not geopd: + df = pd.json_normalize(body['features']).drop(columns=['type', 'properties.data']) + df.columns = df.columns.str.split('.').str[-1] + else: + df = gpd.GeoDataFrame.from_features(body["features"]).drop(columns=['data']) + + # Unnest json features, properties, data, and values while retaining necessary + # metadata to merge with main dataframe. + dat = pd.json_normalize( + body, + record_path=["features", "properties", "data", "values"], + meta=[ + ["features", "properties", "monitoring_location_id"], + ["features", "properties", "data", "parameter_code"], + ["features", "properties", "data", "unit_of_measure"], + ["features", "properties", "data", "parent_time_series_id"], + #["features", "geometry", "coordinates"], + ], + meta_prefix="", + errors="ignore", + ) + dat.columns = dat.columns.str.split('.').str[-1] + + return df.merge(dat, on='monitoring_location_id', how='left') + + +def get_stats_data( + args: Dict[str, Any], + service: str, + client: Optional[requests.Session] = None, + ) -> Tuple[pd.DataFrame, BaseMetadata]: + """ + Retrieves statistical data from a specified water data endpoint and returns it as a pandas DataFrame with metadata. + + This function prepares request arguments, constructs API requests, handles pagination, processes the results, + and formats the output DataFrame according to the specified parameters. + + Parameters + ---------- + args : Dict[str, Any] + Dictionary of request arguments for the statistics service. + service : str + The statistics service type (e.g., "observationNormals", "observationIntervals"). + + Returns + ------- + pd.DataFrame + A DataFrame containing the retrieved and processed statistical data. + BaseMetadata + A metadata object containing request information including URL and query time. + """ + + url = f"{STATISTICS_API_URL}/{service}" + + headers = _default_headers() + + request = requests.Request( + method="GET", + url=url, + headers=headers, + params=args, + ) + req = request.prepare() + logger.info("Request: %s", req.url) + + # create temp client if not provided + # and close it after the request is done + close_client = client is None + client = client or requests.Session() + + try: + resp = client.send(req) + if resp.status_code != 200: + raise Exception(_error_body(resp)) + + # Store the initial response for metadata + initial_response = resp + + # Grab some aspects of the original request: headers and the + # request type (GET or POST) + method = req.method.upper() + headers = dict(req.headers) + + body = resp.json() + dfs = _handle_stats_nesting(body, geopd=GEOPANDAS) + + # Look for a next code in the response body + next_token = body['next'] + + while next_token: + args['next_token'] = next_token + + try: + resp = client.request( + method, + url=url, + params=args, + headers=headers, + ) + body = resp.json() + df1 = _handle_stats_nesting(body, geopd=False) + dfs = pd.concat([dfs, df1], ignore_index=True) + next_token = body['next'] + except Exception: + error_text = _error_body(resp) + logger.error("Request incomplete. %s", error_text) + logger.warning("Request failed for URL: %s. Data download interrupted.", resp.url) + next_token = None + return dfs, BaseMetadata(initial_response) + finally: + if close_client: + client.close() +