From 3087d3537ab342514d362a781a4e2b3604b4982f Mon Sep 17 00:00:00 2001 From: msweier Date: Mon, 6 Apr 2026 14:46:57 -0500 Subject: [PATCH] fix error suppresion in store_timeseries chunk --- cwms/timeseries/timeseries.py | 37 +++++++++---------- pyproject.toml | 2 +- tests/cda/timeseries/timeseries_CDA_test.py | 39 ++++++++++++++++++++- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 563b485..070d866 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -669,29 +669,30 @@ def store_timeseries( # Store chunks concurrently responses: List[Dict[str, Any]] = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Initialize an empty list to store futures - futures = [] - # Submit each chunk as a separate task to the executor - for chunk in chunks: - future = executor.submit( - api.post, # The function to execute - endpoint, - chunk, # The chunk of data to store - params, - ) - futures.append(future) # Add the future to the list + errors: List[str] = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor: + future_to_chunk = { + executor.submit(api.post, endpoint, chunk, params): chunk + for chunk in chunks + } - for future in concurrent.futures.as_completed(futures): + for future in concurrent.futures.as_completed(future_to_chunk): + chunk = future_to_chunk[future] try: - responses.append({"success:": future.result()}) + responses.append({"success": future.result()}) except Exception as e: start_time = chunk["values"][0][0] end_time = chunk["values"][-1][0] - logging.error( - f"Error storing chunk from {start_time} to {end_time}: {e}" - ) - responses.append({"error": str(e)}) + error_msg = f"Error storing chunk from {start_time} to {end_time}: {e}" + logging.error(error_msg) + errors.append(error_msg) + responses.append({"error": error_msg}) + + if errors: + raise RuntimeError( + f"{len(errors)} chunk(s) failed to store:\n" + "\n".join(errors) + ) return diff --git a/pyproject.toml b/pyproject.toml index 358d35f..45e7ed9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "cwms-python" repository = "https://github.com/HydrologicEngineeringCenter/cwms-python" -version = "1.0.7" +version = "1.0.8" packages = [ { include = "cwms" }, diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index c0f9cc9..ca5ea92 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -21,6 +21,7 @@ TEST_TSID_CHUNK_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Nulls" TEST_TSID_COPY_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Copy-Nulls" TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") +TEST_TSID_CHUNK_PARTIAL = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Partial" # Generate 15-minute interval timestamps START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) @@ -35,6 +36,8 @@ TEST_TSID_COPY, TEST_TSID_CHUNK_NULLS, TEST_TSID_COPY_NULLS, + TEST_TSID_CHUNK_PARTIAL, + TEST_TSID_DELETE, ] @@ -123,7 +126,10 @@ def setup_data(): ) except Exception as e: print(f"Failed to delete tsid {ts_id}: {e}") - cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) + try: + cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) + except Exception as e: + print(f"Failed to delete location {TEST_LOCATION_ID}: {e}") @pytest.fixture(autouse=True) @@ -296,6 +302,37 @@ def test_store_timeseries_chunk_ts(): ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" +def test_store_timeseries_partial_chunk_fail_real_api(): + """One chunk with a corrupt value is rejected by the real API; + the rest succeed. RuntimeError must surface with exactly 1 failure.""" + chunk_size = 2 * 7 * 24 * 4 # two weeks of 15-min data + + ts_json = ts.timeseries_df_to_json( + DF_CHUNK_MULTI, TEST_TSID_CHUNK_PARTIAL, "m", TEST_OFFICE + ) + + # Confirm multiple chunks exist so the multithreaded path is taken + chunks = ts.chunk_timeseries_data(ts_json, chunk_size) + assert ( + len(chunks) > 1 + ), "Test requires multiple chunks — increase DF_CHUNK_MULTI range" + + # Corrupt the first value of the second chunk so only that chunk is rejected. + # chunk_size values fit in chunk 0 (indices 0..chunk_size-1), + # so index chunk_size is the first value of chunk 1. + corrupt_index = chunk_size + original = ts_json["values"][corrupt_index] + ts_json["values"][corrupt_index] = [original[0], "not_a_number", original[2]] + + with pytest.raises(RuntimeError) as exc_info: + ts.store_timeseries(ts_json, multithread=True, chunk_size=chunk_size) + + error_msg = str(exc_info.value) + print(error_msg) + assert "1 chunk(s) failed to store" in error_msg + assert "Error storing chunk from" in error_msg + + def test_store_timesereis_chunk_to_with_null_values(): # Define parameters ts_id = TEST_TSID_CHUNK_NULLS