diff --git a/src/google/adk/flows/llm_flows/base_llm_flow.py b/src/google/adk/flows/llm_flows/base_llm_flow.py index d4af69378d..8bfaa31f68 100644 --- a/src/google/adk/flows/llm_flows/base_llm_flow.py +++ b/src/google/adk/flows/llm_flows/base_llm_flow.py @@ -369,14 +369,6 @@ def get_author_for_event(llm_response): while True: async with Aclosing(llm_connection.receive()) as agen: async for llm_response in agen: - if llm_response.live_session_resumption_update: - logger.info( - 'Update session resumption handle:' - f' {llm_response.live_session_resumption_update}.' - ) - invocation_context.live_session_resumption_handle = ( - llm_response.live_session_resumption_update.new_handle - ) model_response_event = Event( id=Event.new_id(), invocation_id=invocation_context.invocation_id, @@ -739,6 +731,25 @@ async def _postprocess_live( async for event in agen: yield event + # Handle session resumption updates for cross-connection resumption. + # Must be before skip condition - resumption updates have no content. + if llm_response.live_session_resumption_update: + # Update internal handle for auto-resumption within run_live() + logger.info( + 'Update session resumption handle: %s', + llm_response.live_session_resumption_update, + ) + invocation_context.live_session_resumption_handle = ( + llm_response.live_session_resumption_update.new_handle + ) + + # Expose update in event for application-level cross-connection resumption + model_response_event.live_session_resumption_update = ( + llm_response.live_session_resumption_update + ) + yield model_response_event + return + # Skip the model response event if there is no content and no error code. # This is needed for the code executor to trigger another loop. # But don't skip control events like turn_complete or transcription events. diff --git a/tests/unittests/flows/llm_flows/test_base_llm_flow_session_resumption.py b/tests/unittests/flows/llm_flows/test_base_llm_flow_session_resumption.py new file mode 100644 index 0000000000..e10b70ef8a --- /dev/null +++ b/tests/unittests/flows/llm_flows/test_base_llm_flow_session_resumption.py @@ -0,0 +1,114 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.adk.agents.llm_agent import Agent +from google.adk.events.event import Event +from google.adk.flows.llm_flows.base_llm_flow import BaseLlmFlow +from google.adk.models.llm_request import LlmRequest +from google.adk.models.llm_response import LlmResponse +from google.genai import types +import pytest + +from ... import testing_utils + + +class BaseLlmFlowForTesting(BaseLlmFlow): + """Test implementation of BaseLlmFlow for testing purposes.""" + + pass + + +@pytest.mark.asyncio +async def test_postprocess_live_yields_session_resumption_event(): + """Test _postprocess_live yields event for session resumption update.""" + # Create a resumption update as received from Gemini Live API + resumption_update = types.LiveServerSessionResumptionUpdate( + new_handle='test-handle-abc123', + resumable=True, + ) + + # Create LlmResponse with only a resumption update (no content) + llm_response = LlmResponse(live_session_resumption_update=resumption_update) + + # Set up invocation context + agent = Agent(name='test_agent', model='mock') + invocation_context = await testing_utils.create_invocation_context( + agent=agent, user_content='' + ) + + # Create the mutable event that _postprocess_live populates + model_response_event = Event( + id=Event.new_id(), + invocation_id=invocation_context.invocation_id, + author=agent.name, + ) + + flow = BaseLlmFlowForTesting() + llm_request = LlmRequest() + events = [] + + # Collect events from _postprocess_live + async for event in flow._postprocess_live( + invocation_context, llm_request, llm_response, model_response_event + ): + events.append(event) + + # Verify event is yielded with resumption update + assert len(events) == 1 + assert events[0].live_session_resumption_update == resumption_update + assert ( + events[0].live_session_resumption_update.new_handle + == 'test-handle-abc123' + ) + assert events[0].live_session_resumption_update.resumable is True + + # Verify invocation context handle is updated for auto-resumption + assert ( + invocation_context.live_session_resumption_handle == 'test-handle-abc123' + ) + + +@pytest.mark.asyncio +async def test_postprocess_live_skips_empty_response(): + """Test _postprocess_live skips response with no content or resumption.""" + # Create LlmResponse with no content and no resumption update + llm_response = LlmResponse() + + # Set up invocation context + agent = Agent(name='test_agent', model='mock') + invocation_context = await testing_utils.create_invocation_context( + agent=agent, user_content='' + ) + + model_response_event = Event( + id=Event.new_id(), + invocation_id=invocation_context.invocation_id, + author=agent.name, + ) + + flow = BaseLlmFlowForTesting() + llm_request = LlmRequest() + events = [] + + # Collect events from _postprocess_live + async for event in flow._postprocess_live( + invocation_context, llm_request, llm_response, model_response_event + ): + events.append(event) + + # Verify no event is yielded for empty response + assert len(events) == 0 + + # Verify handle remains unset + assert invocation_context.live_session_resumption_handle is None