fix: graceful stream shutdown on exceptions in streaming actions#680
fix: graceful stream shutdown on exceptions in streaming actions#680andreahlert wants to merge 1 commit intoapache:mainfrom
Conversation
When a streaming action catches an exception and yields a final state in a try/except/finally block, the stream now completes gracefully instead of propagating the exception and killing the connection. If the generator yields a valid state_update before the exception propagates, the exception is suppressed and the stream terminates normally. If no state was yielded, the exception propagates as before. Closes apache#581
|
@skrawcz @kajocina This PR fixes the broken stream issue reported in #581. The generator consumption loops now catch exceptions gracefully when the action has already yielded a final state (e.g. in a |
|
pre-commit issue |
skrawcz
left a comment
There was a problem hiding this comment.
Good fix for a real issue! The try/except approach in the single-step functions is clean — checking state_update is None correctly distinguishes between "generator crashed before completing" and "generator yielded a final state in its finally block."
Two items to address:
-
caught_excshould be logged in the multi-step functions — it's currently assigned (caught_exc = e) but never read afterward. Please log it (e.g.,logger.warning(...)) so that exceptions aren't silently swallowed. Users need visibility into what went wrong even when the stream completes gracefully. -
Tests needed. The reproduction case from #581 translates directly into a test. Please add tests covering:
- Streaming action with
try/except/finallythat yields state update infinally→ completes gracefully - Streaming action that raises without yielding state → exception still propagates
- Both single-step and multi-step variants
- Streaming action with
Also worth discussing: the multi-step guard (if result is None: raise) is more permissive than the single-step guard (if state_update is None: raise). In multi-step, if the generator crashes after yielding some items but before producing the "final" result, the reducer silently runs on partial data. Is this the intended behavior?
| partition_key=partition_key, | ||
| sequence_id=sequence_id, | ||
| ) | ||
| count += 1 |
There was a problem hiding this comment.
caught_exc = e is assigned but never read after this point. Please log it so exceptions aren't silently swallowed — e.g., logger.warning("Streaming action %s caught exception during graceful shutdown: %s", action.name, e). Same issue in the async variant below.
| action=action.name, | ||
| app_id=app_id, | ||
| partition_key=partition_key, | ||
| sequence_id=sequence_id, |
There was a problem hiding this comment.
Worth discussing: when the generator crashes after yielding some items but before its "final" result, this guard (if result is None) means the reducer runs on whatever partial item was last received. In contrast, the single-step guard (if state_update is None) verifies the generator actually fulfilled its contract. Should this also log a warning when the exception is caught with a non-None result, so users know the reducer ran on potentially incomplete data?
Summary
_run_single_step_streaming_action,_arun_single_step_streaming_action,_run_multi_step_streaming_action, and_arun_multi_step_streaming_actionwithtry/exceptfinallyblock), the stream now completes gracefully instead of propagating the exception and killing the connectionCloses #581
Test plan
curl: (18) transfer closed with outstanding read data remainingandValueErrorstack trace on servertest_application.py(all passing)test_application.py+test_action.pysuite: 175 tests, zero failures