Skip to content

fix: graceful stream shutdown on exceptions in streaming actions#680

Open
andreahlert wants to merge 1 commit intoapache:mainfrom
andreahlert:issue-581-repro
Open

fix: graceful stream shutdown on exceptions in streaming actions#680
andreahlert wants to merge 1 commit intoapache:mainfrom
andreahlert:issue-581-repro

Conversation

@andreahlert
Copy link
Contributor

Summary

  • Wraps the generator consumption loop in _run_single_step_streaming_action, _arun_single_step_streaming_action, _run_multi_step_streaming_action, and _arun_multi_step_streaming_action with try/except
  • When a streaming action catches an exception and yields a final state update (e.g. in a finally block), the stream now completes gracefully instead of propagating the exception and killing the connection
  • If no state was yielded before the exception, the original behavior is preserved (exception propagates)

Closes #581

Test plan

  • Reproduced the bug using the exact snippet from broken stream when handling raised exceptions #581 in a Docker container (FastAPI + uvicorn). Confirmed curl: (18) transfer closed with outstanding read data remaining and ValueError stack trace on server
  • Applied fix and verified stream completes with exit code 0, server logs clean (no exceptions)
  • Ran all 29 streaming-related tests in test_application.py (all passing)
  • Ran full test_application.py + test_action.py suite: 175 tests, zero failures

When a streaming action catches an exception and yields a final state
in a try/except/finally block, the stream now completes gracefully
instead of propagating the exception and killing the connection.

If the generator yields a valid state_update before the exception
propagates, the exception is suppressed and the stream terminates
normally. If no state was yielded, the exception propagates as before.

Closes apache#581
@andreahlert
Copy link
Contributor Author

@skrawcz @kajocina This PR fixes the broken stream issue reported in #581. The generator consumption loops now catch exceptions gracefully when the action has already yielded a final state (e.g. in a finally block), instead of letting the exception kill the connection. All existing streaming tests pass. Would appreciate a review when you get a chance.

@skrawcz
Copy link
Contributor

skrawcz commented Mar 21, 2026

pre-commit issue

Copy link
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix for a real issue! The try/except approach in the single-step functions is clean — checking state_update is None correctly distinguishes between "generator crashed before completing" and "generator yielded a final state in its finally block."

Two items to address:

  1. caught_exc should be logged in the multi-step functions — it's currently assigned (caught_exc = e) but never read afterward. Please log it (e.g., logger.warning(...)) so that exceptions aren't silently swallowed. Users need visibility into what went wrong even when the stream completes gracefully.

  2. Tests needed. The reproduction case from #581 translates directly into a test. Please add tests covering:

    • Streaming action with try/except/finally that yields state update in finally → completes gracefully
    • Streaming action that raises without yielding state → exception still propagates
    • Both single-step and multi-step variants

Also worth discussing: the multi-step guard (if result is None: raise) is more permissive than the single-step guard (if state_update is None: raise). In multi-step, if the generator crashes after yielding some items but before producing the "final" result, the reducer silently runs on partial data. Is this the intended behavior?

partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caught_exc = e is assigned but never read after this point. Please log it so exceptions aren't silently swallowed — e.g., logger.warning("Streaming action %s caught exception during graceful shutdown: %s", action.name, e). Same issue in the async variant below.

action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth discussing: when the generator crashes after yielding some items but before its "final" result, this guard (if result is None) means the reducer runs on whatever partial item was last received. In contrast, the single-step guard (if state_update is None) verifies the generator actually fulfilled its contract. Should this also log a warning when the exception is caught with a non-None result, so users know the reducer ran on potentially incomplete data?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

broken stream when handling raised exceptions

2 participants