Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,29 +669,30 @@ def store_timeseries(

# Store chunks concurrently
responses: List[Dict[str, Any]] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Initialize an empty list to store futures
futures = []
# Submit each chunk as a separate task to the executor
for chunk in chunks:
future = executor.submit(
api.post, # The function to execute
endpoint,
chunk, # The chunk of data to store
params,
)
futures.append(future) # Add the future to the list
errors: List[str] = []

with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor:
future_to_chunk = {
executor.submit(api.post, endpoint, chunk, params): chunk
for chunk in chunks
}

for future in concurrent.futures.as_completed(futures):
for future in concurrent.futures.as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
responses.append({"success:": future.result()})
responses.append({"success": future.result()})
except Exception as e:
start_time = chunk["values"][0][0]
end_time = chunk["values"][-1][0]
logging.error(
f"Error storing chunk from {start_time} to {end_time}: {e}"
)
responses.append({"error": str(e)})
error_msg = f"Error storing chunk from {start_time} to {end_time}: {e}"
logging.error(error_msg)
errors.append(error_msg)
responses.append({"error": error_msg})

if errors:
raise RuntimeError(
f"{len(errors)} chunk(s) failed to store:\n" + "\n".join(errors)
)

return

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "cwms-python"
repository = "https://github.com/HydrologicEngineeringCenter/cwms-python"

version = "1.0.7"
version = "1.0.8"

packages = [
{ include = "cwms" },
Expand Down
39 changes: 38 additions & 1 deletion tests/cda/timeseries/timeseries_CDA_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TEST_TSID_CHUNK_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Nulls"
TEST_TSID_COPY_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Copy-Nulls"
TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test")
TEST_TSID_CHUNK_PARTIAL = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Partial"
# Generate 15-minute interval timestamps
START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc)
END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc)
Expand All @@ -35,6 +36,8 @@
TEST_TSID_COPY,
TEST_TSID_CHUNK_NULLS,
TEST_TSID_COPY_NULLS,
TEST_TSID_CHUNK_PARTIAL,
TEST_TSID_DELETE,
]


Expand Down Expand Up @@ -123,7 +126,10 @@ def setup_data():
)
except Exception as e:
print(f"Failed to delete tsid {ts_id}: {e}")
cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True)
try:
cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True)
except Exception as e:
print(f"Failed to delete location {TEST_LOCATION_ID}: {e}")


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -296,6 +302,37 @@ def test_store_timeseries_chunk_ts():
), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}"


def test_store_timeseries_partial_chunk_fail_real_api():
"""One chunk with a corrupt value is rejected by the real API;
the rest succeed. RuntimeError must surface with exactly 1 failure."""
chunk_size = 2 * 7 * 24 * 4 # two weeks of 15-min data

ts_json = ts.timeseries_df_to_json(
DF_CHUNK_MULTI, TEST_TSID_CHUNK_PARTIAL, "m", TEST_OFFICE
)

# Confirm multiple chunks exist so the multithreaded path is taken
chunks = ts.chunk_timeseries_data(ts_json, chunk_size)
assert (
len(chunks) > 1
), "Test requires multiple chunks — increase DF_CHUNK_MULTI range"

# Corrupt the first value of the second chunk so only that chunk is rejected.
# chunk_size values fit in chunk 0 (indices 0..chunk_size-1),
# so index chunk_size is the first value of chunk 1.
corrupt_index = chunk_size
original = ts_json["values"][corrupt_index]
ts_json["values"][corrupt_index] = [original[0], "not_a_number", original[2]]

with pytest.raises(RuntimeError) as exc_info:
ts.store_timeseries(ts_json, multithread=True, chunk_size=chunk_size)

error_msg = str(exc_info.value)
print(error_msg)
assert "1 chunk(s) failed to store" in error_msg
assert "Error storing chunk from" in error_msg


def test_store_timesereis_chunk_to_with_null_values():
# Define parameters
ts_id = TEST_TSID_CHUNK_NULLS
Expand Down
Loading