Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 113 additions & 92 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,31 +338,35 @@ def _run_single_step_streaming_action(
result = None
state_update = None
count = 0
for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
count += 1
try:
for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
count += 1
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
yield result, None
except Exception:
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
yield result, None
raise

if state_update is None:
raise ValueError(
Expand Down Expand Up @@ -391,31 +395,36 @@ async def _arun_single_step_streaming_action(
result = None
state_update = None
count = 0
async for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
try:
async for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield result, None
except Exception:
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield result, None
raise

if state_update is None:
raise ValueError(
f"Action {action.name} did not return a state update. For async actions, the last yield "
Expand Down Expand Up @@ -450,28 +459,34 @@ def _run_multi_step_streaming_action(
result = None
first_stream_start_time = None
count = 0
for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=next_result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
caught_exc = None
try:
for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=next_result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth discussing: when the generator crashes after yielding some items but before its "final" result, this guard (if result is None) means the reducer runs on whatever partial item was last received. In contrast, the single-step guard (if state_update is None) verifies the generator actually fulfilled its contract. Should this also log a warning when the exception is caught with a non-None result, so users know the reducer ran on potentially incomplete data?

)
count += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caught_exc = e is assigned but never read after this point. Please log it so exceptions aren't silently swallowed — e.g., logger.warning("Streaming action %s caught exception during graceful shutdown: %s", action.name, e). Same issue in the async variant below.

yield next_result, None
except Exception as e:
if result is None:
raise
caught_exc = e
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name, action.schema)
_validate_reducer_writes(action, state_update, action.name)
Expand All @@ -494,28 +509,34 @@ async def _arun_multi_step_streaming_action(
result = None
first_stream_start_time = None
count = 0
async for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=next_result,
stream_initialize_time=stream_initialize_time,
item_index=count,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
caught_exc = None
try:
async for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=next_result,
stream_initialize_time=stream_initialize_time,
item_index=count,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
except Exception as e:
if result is None:
raise
caught_exc = e
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name, action.schema)
_validate_reducer_writes(action, state_update, action.name)
Expand Down
Loading