From 3d9ebbeb725f7f4f1f3870dbf0e0b86dd85f1c09 Mon Sep 17 00:00:00 2001 From: yuvrajangadsingh Date: Sun, 1 Mar 2026 04:06:05 +0530 Subject: [PATCH 1/4] fix: skip unnecessary FOR UPDATE locks in append_event when no app/user state delta DatabaseSessionService.append_event() unconditionally acquires SELECT ... FOR UPDATE on app_states and user_states even when the event has no delta for those scopes. Since app_states is keyed by app_name alone, all concurrent append_event calls within the same app serialize on this single row lock, even when most events only carry session-scoped state. Pre-analyze the event's state delta before the transaction and only pass use_row_level_locking=True for scopes that actually have pending writes. Fixes #4655 --- .../adk/sessions/database_session_service.py | 14 +- .../sessions/test_session_service.py | 201 ++++++++++++++++++ 2 files changed, 213 insertions(+), 2 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 24f525bae0..2bab710197 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -531,6 +531,16 @@ async def append_event(self, session: Session, event: Event) -> Event: schema = self._get_schema_classes() is_sqlite = self.db_engine.dialect.name == _SQLITE_DIALECT use_row_level_locking = self._supports_row_level_locking() + + # Pre-analyze which state scopes have deltas so we only acquire + # FOR UPDATE locks on rows that will actually be written. + has_app_delta = False + has_user_delta = False + if event.actions and event.actions.state_delta: + pre_deltas = _session_util.extract_state_delta(event.actions.state_delta) + has_app_delta = bool(pre_deltas.get("app")) + has_user_delta = bool(pre_deltas.get("user")) + async with self._with_session_lock( app_name=session.app_name, user_id=session.user_id, @@ -554,7 +564,7 @@ async def append_event(self, session: Session, event: Event) -> Event: sql_session=sql_session, state_model=schema.StorageAppState, predicates=(schema.StorageAppState.app_name == session.app_name,), - use_row_level_locking=use_row_level_locking, + use_row_level_locking=use_row_level_locking and has_app_delta, missing_message=( "App state missing for app_name=" f"{session.app_name!r}. Session state tables should be " @@ -568,7 +578,7 @@ async def append_event(self, session: Session, event: Event) -> Event: schema.StorageUserState.app_name == session.app_name, schema.StorageUserState.user_id == session.user_id, ), - use_row_level_locking=use_row_level_locking, + use_row_level_locking=use_row_level_locking and has_user_delta, missing_message=( "User state missing for app_name=" f"{session.app_name!r}, user_id={session.user_id!r}. " diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 25530bed89..e9e9cec661 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -1153,3 +1153,204 @@ async def test_prepare_tables_idempotent_after_creation(): assert session.id == 's1' finally: await service.close() + + +@pytest.mark.asyncio +async def test_append_event_skips_for_update_when_no_app_or_user_delta(): + """FOR UPDATE locks on app_states/user_states should be skipped when the + event carries no delta for those scopes.""" + service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') + try: + session = await service.create_session( + app_name='my_app', user_id='user', session_id='s1' + ) + + # Patch _supports_row_level_locking to simulate PostgreSQL behavior + with mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ): + original_fn = database_session_service._select_required_state + calls = [] + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ): + # Event with only session-scoped state (no app: or user: prefix) + event_no_scoped_delta = Event( + invocation_id='inv1', + author='user', + actions=EventActions(state_delta={'session_key': 'v1'}), + ) + await service.append_event(session, event_no_scoped_delta) + + # Two calls: one for app_states, one for user_states + assert len(calls) == 2 + # Both should be False since there's no app: or user: delta + assert calls[0] is False + assert calls[1] is False + finally: + await service.close() + + +@pytest.mark.asyncio +async def test_append_event_skips_for_update_when_no_state_delta(): + """FOR UPDATE locks should be skipped when event has no state_delta at all.""" + service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') + try: + session = await service.create_session( + app_name='my_app', user_id='user', session_id='s1' + ) + + with mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ): + original_fn = database_session_service._select_required_state + calls = [] + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ): + event_no_delta = Event( + invocation_id='inv2', + author='user', + ) + await service.append_event(session, event_no_delta) + + assert len(calls) == 2 + assert calls[0] is False + assert calls[1] is False + finally: + await service.close() + + +@pytest.mark.asyncio +async def test_append_event_acquires_for_update_when_app_delta_present(): + """FOR UPDATE lock on app_states should be acquired when event has app: + prefixed state delta.""" + service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') + try: + session = await service.create_session( + app_name='my_app', user_id='user', session_id='s1' + ) + + with mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ): + original_fn = database_session_service._select_required_state + calls = [] + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ): + event_app_delta = Event( + invocation_id='inv3', + author='user', + actions=EventActions(state_delta={'app:key1': 'v1'}), + ) + await service.append_event(session, event_app_delta) + + assert len(calls) == 2 + # app_states lock: True (has app: delta) + assert calls[0] is True + # user_states lock: False (no user: delta) + assert calls[1] is False + finally: + await service.close() + + +@pytest.mark.asyncio +async def test_append_event_acquires_for_update_when_user_delta_present(): + """FOR UPDATE lock on user_states should be acquired when event has user: + prefixed state delta.""" + service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') + try: + session = await service.create_session( + app_name='my_app', user_id='user', session_id='s1' + ) + + with mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ): + original_fn = database_session_service._select_required_state + calls = [] + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ): + event_user_delta = Event( + invocation_id='inv4', + author='user', + actions=EventActions(state_delta={'user:key1': 'v1'}), + ) + await service.append_event(session, event_user_delta) + + assert len(calls) == 2 + # app_states lock: False (no app: delta) + assert calls[0] is False + # user_states lock: True (has user: delta) + assert calls[1] is True + finally: + await service.close() + + +@pytest.mark.asyncio +async def test_append_event_acquires_for_update_when_both_deltas_present(): + """FOR UPDATE locks on both app_states and user_states should be acquired + when the event has both app: and user: prefixed state deltas.""" + service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') + try: + session = await service.create_session( + app_name='my_app', user_id='user', session_id='s1' + ) + + with mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ): + original_fn = database_session_service._select_required_state + calls = [] + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ): + event_both = Event( + invocation_id='inv5', + author='user', + actions=EventActions(state_delta={'app:ak': 'av', 'user:uk': 'uv'}), + ) + await service.append_event(session, event_both) + + assert len(calls) == 2 + assert calls[0] is True + assert calls[1] is True + finally: + await service.close() From 0604d153e24c53a1b4a9f8c3f6d904e3c2cd9769 Mon Sep 17 00:00:00 2001 From: yuvrajangadsingh Date: Sun, 1 Mar 2026 04:10:47 +0530 Subject: [PATCH 2/4] refactor: extract lock spy fixture and parametrize FOR UPDATE tests --- .../sessions/test_session_service.py | 242 ++++-------------- 1 file changed, 55 insertions(+), 187 deletions(-) diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index e9e9cec661..9d326f85d2 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -1155,202 +1155,70 @@ async def test_prepare_tables_idempotent_after_creation(): await service.close() -@pytest.mark.asyncio -async def test_append_event_skips_for_update_when_no_app_or_user_delta(): - """FOR UPDATE locks on app_states/user_states should be skipped when the - event carries no delta for those scopes.""" - service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') - try: - session = await service.create_session( - app_name='my_app', user_id='user', session_id='s1' - ) - - # Patch _supports_row_level_locking to simulate PostgreSQL behavior - with mock.patch.object( - service, '_supports_row_level_locking', return_value=True - ): - original_fn = database_session_service._select_required_state - calls = [] - - async def spy(**kwargs): - calls.append(kwargs.get('use_row_level_locking')) - return await original_fn(**kwargs) - - with mock.patch.object( - database_session_service, - '_select_required_state', - side_effect=spy, - ): - # Event with only session-scoped state (no app: or user: prefix) - event_no_scoped_delta = Event( - invocation_id='inv1', - author='user', - actions=EventActions(state_delta={'session_key': 'v1'}), - ) - await service.append_event(session, event_no_scoped_delta) - - # Two calls: one for app_states, one for user_states - assert len(calls) == 2 - # Both should be False since there's no app: or user: delta - assert calls[0] is False - assert calls[1] is False - finally: - await service.close() - - -@pytest.mark.asyncio -async def test_append_event_skips_for_update_when_no_state_delta(): - """FOR UPDATE locks should be skipped when event has no state_delta at all.""" - service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') - try: - session = await service.create_session( - app_name='my_app', user_id='user', session_id='s1' - ) - - with mock.patch.object( - service, '_supports_row_level_locking', return_value=True - ): - original_fn = database_session_service._select_required_state - calls = [] - - async def spy(**kwargs): - calls.append(kwargs.get('use_row_level_locking')) - return await original_fn(**kwargs) - - with mock.patch.object( - database_session_service, - '_select_required_state', - side_effect=spy, - ): - event_no_delta = Event( - invocation_id='inv2', - author='user', - ) - await service.append_event(session, event_no_delta) - - assert len(calls) == 2 - assert calls[0] is False - assert calls[1] is False - finally: - await service.close() - - -@pytest.mark.asyncio -async def test_append_event_acquires_for_update_when_app_delta_present(): - """FOR UPDATE lock on app_states should be acquired when event has app: - prefixed state delta.""" +@pytest.fixture +async def lock_spy_harness(): + """Sets up a DatabaseSessionService with a spy on _select_required_state.""" service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') try: session = await service.create_session( app_name='my_app', user_id='user', session_id='s1' ) - with mock.patch.object( - service, '_supports_row_level_locking', return_value=True + calls = [] + original_fn = database_session_service._select_required_state + + async def spy(**kwargs): + calls.append(kwargs.get('use_row_level_locking')) + return await original_fn(**kwargs) + + with ( + mock.patch.object( + service, '_supports_row_level_locking', return_value=True + ), + mock.patch.object( + database_session_service, + '_select_required_state', + side_effect=spy, + ), ): - original_fn = database_session_service._select_required_state - calls = [] - - async def spy(**kwargs): - calls.append(kwargs.get('use_row_level_locking')) - return await original_fn(**kwargs) - - with mock.patch.object( - database_session_service, - '_select_required_state', - side_effect=spy, - ): - event_app_delta = Event( - invocation_id='inv3', - author='user', - actions=EventActions(state_delta={'app:key1': 'v1'}), - ) - await service.append_event(session, event_app_delta) - - assert len(calls) == 2 - # app_states lock: True (has app: delta) - assert calls[0] is True - # user_states lock: False (no user: delta) - assert calls[1] is False + yield service, session, calls finally: await service.close() @pytest.mark.asyncio -async def test_append_event_acquires_for_update_when_user_delta_present(): - """FOR UPDATE lock on user_states should be acquired when event has user: - prefixed state delta.""" - service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') - try: - session = await service.create_session( - app_name='my_app', user_id='user', session_id='s1' - ) - - with mock.patch.object( - service, '_supports_row_level_locking', return_value=True - ): - original_fn = database_session_service._select_required_state - calls = [] - - async def spy(**kwargs): - calls.append(kwargs.get('use_row_level_locking')) - return await original_fn(**kwargs) - - with mock.patch.object( - database_session_service, - '_select_required_state', - side_effect=spy, - ): - event_user_delta = Event( - invocation_id='inv4', - author='user', - actions=EventActions(state_delta={'user:key1': 'v1'}), - ) - await service.append_event(session, event_user_delta) - - assert len(calls) == 2 - # app_states lock: False (no app: delta) - assert calls[0] is False - # user_states lock: True (has user: delta) - assert calls[1] is True - finally: - await service.close() - - -@pytest.mark.asyncio -async def test_append_event_acquires_for_update_when_both_deltas_present(): - """FOR UPDATE locks on both app_states and user_states should be acquired - when the event has both app: and user: prefixed state deltas.""" - service = DatabaseSessionService('sqlite+aiosqlite:///:memory:') - try: - session = await service.create_session( - app_name='my_app', user_id='user', session_id='s1' - ) - - with mock.patch.object( - service, '_supports_row_level_locking', return_value=True - ): - original_fn = database_session_service._select_required_state - calls = [] - - async def spy(**kwargs): - calls.append(kwargs.get('use_row_level_locking')) - return await original_fn(**kwargs) - - with mock.patch.object( - database_session_service, - '_select_required_state', - side_effect=spy, - ): - event_both = Event( - invocation_id='inv5', - author='user', - actions=EventActions(state_delta={'app:ak': 'av', 'user:uk': 'uv'}), - ) - await service.append_event(session, event_both) - - assert len(calls) == 2 - assert calls[0] is True - assert calls[1] is True - finally: - await service.close() +@pytest.mark.parametrize( + 'state_delta, expected_app_lock, expected_user_lock', + [ + pytest.param( + {'session_key': 'v1'}, + False, + False, + id='session_only_delta', + ), + pytest.param(None, False, False, id='no_state_delta'), + pytest.param({'app:key1': 'v1'}, True, False, id='app_delta_only'), + pytest.param({'user:key1': 'v1'}, False, True, id='user_delta_only'), + pytest.param( + {'app:ak': 'av', 'user:uk': 'uv'}, + True, + True, + id='both_app_and_user_delta', + ), + ], +) +async def test_append_event_conditional_for_update_locking( + lock_spy_harness, state_delta, expected_app_lock, expected_user_lock +): + """FOR UPDATE locks should only be acquired for scopes that have deltas.""" + service, session, calls = lock_spy_harness + + kwargs = {'invocation_id': 'inv', 'author': 'user'} + if state_delta is not None: + kwargs['actions'] = EventActions(state_delta=state_delta) + event = Event(**kwargs) + await service.append_event(session, event) + + assert len(calls) == 2 + assert calls[0] is expected_app_lock + assert calls[1] is expected_user_lock From abf02f2c32b57d35cc205f3f50c315094aafa424 Mon Sep 17 00:00:00 2001 From: yuvrajangadsingh Date: Sun, 1 Mar 2026 04:13:24 +0530 Subject: [PATCH 3/4] refactor: reuse pre-analyzed state_deltas to avoid duplicate extract_state_delta call --- .../adk/sessions/database_session_service.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 2bab710197..543f5976c6 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -534,12 +534,14 @@ async def append_event(self, session: Session, event: Event) -> Event: # Pre-analyze which state scopes have deltas so we only acquire # FOR UPDATE locks on rows that will actually be written. - has_app_delta = False - has_user_delta = False + # The result is reused later to avoid calling extract_state_delta twice. + state_deltas = None if event.actions and event.actions.state_delta: - pre_deltas = _session_util.extract_state_delta(event.actions.state_delta) - has_app_delta = bool(pre_deltas.get("app")) - has_user_delta = bool(pre_deltas.get("user")) + state_deltas = _session_util.extract_state_delta( + event.actions.state_delta + ) + has_app_delta = bool(state_deltas and state_deltas.get("app")) + has_user_delta = bool(state_deltas and state_deltas.get("user")) async with self._with_session_lock( app_name=session.app_name, @@ -609,11 +611,8 @@ async def append_event(self, session: Session, event: Event) -> Event: storage_events = [e async for e in result] session.events = [e.to_event() for e in storage_events] - # Extract state delta - if event.actions and event.actions.state_delta: - state_deltas = _session_util.extract_state_delta( - event.actions.state_delta - ) + # Apply state deltas (reusing pre-analyzed result from above) + if state_deltas: app_state_delta = state_deltas["app"] user_state_delta = state_deltas["user"] session_state_delta = state_deltas["session"] From b40170a03811eae19ea558ab6237ed77709a83ad Mon Sep 17 00:00:00 2001 From: yuvrajangadsingh Date: Sun, 1 Mar 2026 04:18:37 +0530 Subject: [PATCH 4/4] style: move has_app/user_delta checks inside if block for readability --- src/google/adk/sessions/database_session_service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 543f5976c6..f193d40d75 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -536,12 +536,13 @@ async def append_event(self, session: Session, event: Event) -> Event: # FOR UPDATE locks on rows that will actually be written. # The result is reused later to avoid calling extract_state_delta twice. state_deltas = None + has_app_delta, has_user_delta = False, False if event.actions and event.actions.state_delta: state_deltas = _session_util.extract_state_delta( event.actions.state_delta ) - has_app_delta = bool(state_deltas and state_deltas.get("app")) - has_user_delta = bool(state_deltas and state_deltas.get("user")) + has_app_delta = bool(state_deltas.get("app")) + has_user_delta = bool(state_deltas.get("user")) async with self._with_session_lock( app_name=session.app_name,