Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 115 additions & 9 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
import queue
import sys
import threading
from typing import Any
from typing import AsyncGenerator
from typing import Callable
Expand Down Expand Up @@ -118,6 +119,19 @@ class Runner:
resumability_config: The resumability config for the application.
"""

# Default maximum number of concurrent compaction tasks allowed.
DEFAULT_MAX_CONCURRENT_COMPACTIONS = 10

# Semaphore to limit concurrent event compaction tasks to prevent resource
# exhaustion under high concurrency. Limits concurrent LLM calls and DB writes.
# Shared across all Runner instances for global concurrency control.
# Initialized lazily in async context to avoid RuntimeError when no event loop.
_compaction_semaphore: Optional[asyncio.Semaphore] = None
# Thread lock to protect semaphore initialization in multi-threaded scenarios
_compaction_semaphore_lock = threading.Lock()
# Stores the configured max concurrent compactions limit for lazy initialization
_max_concurrent_compactions_limit: int = DEFAULT_MAX_CONCURRENT_COMPACTIONS

app_name: str
"""The app name of the runner."""
agent: BaseAgent
Expand Down Expand Up @@ -150,6 +164,7 @@ def __init__(
credential_service: Optional[BaseCredentialService] = None,
plugin_close_timeout: float = 5.0,
auto_create_session: bool = False,
max_concurrent_compactions: int = DEFAULT_MAX_CONCURRENT_COMPACTIONS,
):
"""Initializes the Runner.

Expand Down Expand Up @@ -179,6 +194,12 @@ def __init__(
auto_create_session: Whether to automatically create a session when
not found. Defaults to False. If False, a missing session raises
ValueError with a helpful message.
max_concurrent_compactions: Maximum number of concurrent event
compaction tasks allowed. Defaults to
DEFAULT_MAX_CONCURRENT_COMPACTIONS (10). This limit is shared across
all Runner instances to prevent resource exhaustion. Higher values
allow more concurrent compactions but consume more resources (LLM
API calls, database connections).

Raises:
ValueError: If `app` is provided along with `agent` or `plugins`, or if
Expand Down Expand Up @@ -206,6 +227,31 @@ def __init__(
) = self._infer_agent_origin(self.agent)
self._app_name_alignment_hint: Optional[str] = None
self._enforce_app_name_alignment()
# Store the configured max concurrent compactions limit for lazy initialization
# Validate the limit here to fail fast on invalid configuration
if max_concurrent_compactions <= 0:
raise ValueError(
'max_concurrent_compactions must be positive, got'
f' {max_concurrent_compactions}'
)
# Warn if limit is changed after semaphore is already created, as the
# semaphore will retain its original value until recreated
if (
Runner._compaction_semaphore is not None
and Runner._max_concurrent_compactions_limit
!= max_concurrent_compactions
):
logger.warning(
'max_concurrent_compactions changed from %d to %d, but compaction'
' semaphore already exists with the old limit. The new limit will'
' take effect after the semaphore is recreated (e.g., in a new'
' process).',
Runner._max_concurrent_compactions_limit,
max_concurrent_compactions,
)
Runner._max_concurrent_compactions_limit = max_concurrent_compactions
Comment on lines +230 to +252
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve thread safety, it's a good practice to wrap the logic that configures the shared _max_concurrent_compactions_limit within the _compaction_semaphore_lock. This prevents potential race conditions when multiple Runner instances are initialized concurrently in different threads. While the GIL might mitigate some issues, explicitly locking ensures that checking for the semaphore's existence and updating the limit are performed as an atomic operation, making the behavior more predictable and robust in multi-threaded scenarios.

    # Validate and store the configured max concurrent compactions limit.
    # This is done under a lock to ensure thread safety when multiple Runner
    # instances are created concurrently.
    with Runner._compaction_semaphore_lock:
      # Validate the limit here to fail fast on invalid configuration.
      if max_concurrent_compactions <= 0:
        raise ValueError(
            'max_concurrent_compactions must be positive, got'
            f' {max_concurrent_compactions}'
        )
      # Warn if limit is changed after semaphore is already created, as the
      # semaphore will retain its original value until recreated.
      if (
          Runner._compaction_semaphore is not None
          and Runner._max_concurrent_compactions_limit
          != max_concurrent_compactions
      ):
        logger.warning(
            'max_concurrent_compactions changed from %d to %d, but compaction'
            ' semaphore already exists with the old limit. The new limit will'
            ' take effect after the semaphore is recreated (e.g., in a new'
            ' process).',
            Runner._max_concurrent_compactions_limit,
            max_concurrent_compactions,
        )
      Runner._max_concurrent_compactions_limit = max_concurrent_compactions

# Track background tasks to prevent premature garbage collection
self._background_tasks: set[asyncio.Task] = set()

def _validate_runner_params(
self,
Expand Down Expand Up @@ -320,6 +366,32 @@ def _infer_agent_origin(
return None, origin_dir
return origin_name, origin_dir

@classmethod
def _get_or_create_compaction_semaphore(cls) -> asyncio.Semaphore:
"""Gets or lazily creates the shared compaction semaphore.

This method ensures the class-level semaphore is initialized with the
configured limit. The semaphore is created lazily on first use within
an async context to avoid RuntimeError when no event loop is running.

Thread-safe: Uses double-checked locking pattern for efficient access.
The first check avoids lock overhead in the common case (semaphore exists).

Returns:
The shared asyncio.Semaphore for compaction concurrency control.
"""
# Fast path: semaphore already exists, no lock needed
if cls._compaction_semaphore is not None:
return cls._compaction_semaphore
# Slow path: need to create semaphore, use lock for thread safety
with cls._compaction_semaphore_lock:
# Double-check after acquiring lock to prevent multiple creations
if cls._compaction_semaphore is None:
cls._compaction_semaphore = asyncio.Semaphore(
cls._max_concurrent_compactions_limit
)
return cls._compaction_semaphore

def _enforce_app_name_alignment(self) -> None:
origin_name = self._agent_origin_app_name
origin_dir = self._agent_origin_dir
Expand Down Expand Up @@ -401,8 +473,8 @@ def run(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The generator will only finish iterating after event
compaction is complete.
yielded. Compaction runs as a background task and does not block the
generator from completing.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -464,9 +536,10 @@ async def run_async(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The async generator will only finish iterating after event
compaction is complete. However, this does not block new `run_async`
calls for subsequent user queries, which can be started concurrently.
yielded. Compaction runs as a background task and does not block the
generator from completing, allowing the frontend to receive responses
without delay. However, this does not block new `run_async` calls for
subsequent user queries, which can be started concurrently.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -552,11 +625,35 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]:
# Run compaction after all events are yielded from the agent.
# (We don't compact in the middle of an invocation, we only compact at
# the end of an invocation.)
# Run compaction as a background task to avoid blocking the generator
# completion, which causes delays on the frontend. Use a semaphore to
# limit concurrent compactions and prevent resource exhaustion under
# high concurrency.
if self.app and self.app.events_compaction_config:
logger.debug('Running event compactor.')
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
logger.debug('Scheduling event compactor in background.')

async def _run_compaction_with_error_handling():
try:
# Get or lazily create the semaphore within async context to avoid
# RuntimeError when Runner is created before event loop starts
semaphore = self._get_or_create_compaction_semaphore()
async with semaphore:
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
except asyncio.CancelledError:
logger.debug('Event compaction cancelled.')
raise
except Exception as e:
logger.error(
'Event compaction failed but not blocking response: %s',
e,
exc_info=True,
)

task = asyncio.create_task(_run_compaction_with_error_handling())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)

async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
async for event in agen:
Expand Down Expand Up @@ -1535,6 +1632,15 @@ async def close(self):
if self.plugin_manager:
await self.plugin_manager.close()

# Wait for background compaction tasks to complete
if self._background_tasks:
logger.debug(
'Waiting for %d background compaction tasks to complete...',
len(self._background_tasks),
)
await asyncio.gather(*self._background_tasks, return_exceptions=True)
self._background_tasks.clear()

logger.info('Runner closed.')

if sys.version_info < (3, 11):
Expand Down
Loading