From 62c0e63f4d5091180ffc3cce0ca0fb735d185f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Wed, 18 Mar 2026 07:57:42 -0300 Subject: [PATCH] fix: allow streaming actions to gracefully handle raised exceptions When a streaming action catches an exception and yields a final state in a try/except/finally block, the stream now completes gracefully instead of propagating the exception and killing the connection. If the generator yields a valid state_update before the exception propagates, the exception is suppressed and the stream terminates normally. If no state was yielded, the exception propagates as before. Closes #581 --- burr/core/application.py | 205 +++++++++++++++++++++------------------ 1 file changed, 113 insertions(+), 92 deletions(-) diff --git a/burr/core/application.py b/burr/core/application.py index dc8067c4b..44d4a462f 100644 --- a/burr/core/application.py +++ b/burr/core/application.py @@ -338,31 +338,35 @@ def _run_single_step_streaming_action( result = None state_update = None count = 0 - for item in generator: - if not isinstance(item, tuple): - # TODO -- consider adding support for just returning a result. - raise ValueError( - f"Action {action.name} must yield a tuple of (result, state_update). " - f"For all non-final results (intermediate)," - f"the state update must be None" - ) - result, state_update = item - count += 1 + try: + for item in generator: + if not isinstance(item, tuple): + # TODO -- consider adding support for just returning a result. + raise ValueError( + f"Action {action.name} must yield a tuple of (result, state_update). " + f"For all non-final results (intermediate)," + f"the state update must be None" + ) + result, state_update = item + count += 1 + if state_update is None: + if first_stream_start_time is None: + first_stream_start_time = system.now() + lifecycle_adapters.call_all_lifecycle_hooks_sync( + "post_stream_item", + item=result, + item_index=count, + stream_initialize_time=stream_initialize_time, + first_stream_item_start_time=first_stream_start_time, + action=action.name, + app_id=app_id, + partition_key=partition_key, + sequence_id=sequence_id, + ) + yield result, None + except Exception: if state_update is None: - if first_stream_start_time is None: - first_stream_start_time = system.now() - lifecycle_adapters.call_all_lifecycle_hooks_sync( - "post_stream_item", - item=result, - item_index=count, - stream_initialize_time=stream_initialize_time, - first_stream_item_start_time=first_stream_start_time, - action=action.name, - app_id=app_id, - partition_key=partition_key, - sequence_id=sequence_id, - ) - yield result, None + raise if state_update is None: raise ValueError( @@ -391,31 +395,36 @@ async def _arun_single_step_streaming_action( result = None state_update = None count = 0 - async for item in generator: - if not isinstance(item, tuple): - # TODO -- consider adding support for just returning a result. - raise ValueError( - f"Action {action.name} must yield a tuple of (result, state_update). " - f"For all non-final results (intermediate)," - f"the state update must be None" - ) - result, state_update = item + try: + async for item in generator: + if not isinstance(item, tuple): + # TODO -- consider adding support for just returning a result. + raise ValueError( + f"Action {action.name} must yield a tuple of (result, state_update). " + f"For all non-final results (intermediate)," + f"the state update must be None" + ) + result, state_update = item + if state_update is None: + if first_stream_start_time is None: + first_stream_start_time = system.now() + await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async( + "post_stream_item", + item=result, + item_index=count, + stream_initialize_time=stream_initialize_time, + first_stream_item_start_time=first_stream_start_time, + action=action.name, + app_id=app_id, + partition_key=partition_key, + sequence_id=sequence_id, + ) + count += 1 + yield result, None + except Exception: if state_update is None: - if first_stream_start_time is None: - first_stream_start_time = system.now() - await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async( - "post_stream_item", - item=result, - item_index=count, - stream_initialize_time=stream_initialize_time, - first_stream_item_start_time=first_stream_start_time, - action=action.name, - app_id=app_id, - partition_key=partition_key, - sequence_id=sequence_id, - ) - count += 1 - yield result, None + raise + if state_update is None: raise ValueError( f"Action {action.name} did not return a state update. For async actions, the last yield " @@ -450,28 +459,34 @@ def _run_multi_step_streaming_action( result = None first_stream_start_time = None count = 0 - for item in generator: - # We want to peek ahead so we can return the last one - # This is slightly eager, but only in the case in which we - # are using a multi-step streaming action - next_result = result - result = item - if next_result is not None: - if first_stream_start_time is None: - first_stream_start_time = system.now() - lifecycle_adapters.call_all_lifecycle_hooks_sync( - "post_stream_item", - item=next_result, - item_index=count, - stream_initialize_time=stream_initialize_time, - first_stream_item_start_time=first_stream_start_time, - action=action.name, - app_id=app_id, - partition_key=partition_key, - sequence_id=sequence_id, - ) - count += 1 - yield next_result, None + caught_exc = None + try: + for item in generator: + # We want to peek ahead so we can return the last one + # This is slightly eager, but only in the case in which we + # are using a multi-step streaming action + next_result = result + result = item + if next_result is not None: + if first_stream_start_time is None: + first_stream_start_time = system.now() + lifecycle_adapters.call_all_lifecycle_hooks_sync( + "post_stream_item", + item=next_result, + item_index=count, + stream_initialize_time=stream_initialize_time, + first_stream_item_start_time=first_stream_start_time, + action=action.name, + app_id=app_id, + partition_key=partition_key, + sequence_id=sequence_id, + ) + count += 1 + yield next_result, None + except Exception as e: + if result is None: + raise + caught_exc = e state_update = _run_reducer(action, state, result, action.name) _validate_result(result, action.name, action.schema) _validate_reducer_writes(action, state_update, action.name) @@ -494,28 +509,34 @@ async def _arun_multi_step_streaming_action( result = None first_stream_start_time = None count = 0 - async for item in generator: - # We want to peek ahead so we can return the last one - # This is slightly eager, but only in the case in which we - # are using a multi-step streaming action - next_result = result - result = item - if next_result is not None: - if first_stream_start_time is None: - first_stream_start_time = system.now() - await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async( - "post_stream_item", - item=next_result, - stream_initialize_time=stream_initialize_time, - item_index=count, - first_stream_item_start_time=first_stream_start_time, - action=action.name, - app_id=app_id, - partition_key=partition_key, - sequence_id=sequence_id, - ) - count += 1 - yield next_result, None + caught_exc = None + try: + async for item in generator: + # We want to peek ahead so we can return the last one + # This is slightly eager, but only in the case in which we + # are using a multi-step streaming action + next_result = result + result = item + if next_result is not None: + if first_stream_start_time is None: + first_stream_start_time = system.now() + await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async( + "post_stream_item", + item=next_result, + stream_initialize_time=stream_initialize_time, + item_index=count, + first_stream_item_start_time=first_stream_start_time, + action=action.name, + app_id=app_id, + partition_key=partition_key, + sequence_id=sequence_id, + ) + count += 1 + yield next_result, None + except Exception as e: + if result is None: + raise + caught_exc = e state_update = _run_reducer(action, state, result, action.name) _validate_result(result, action.name, action.schema) _validate_reducer_writes(action, state_update, action.name)