Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bedrock_agentcore/memory/integrations/strands/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class AgentCoreMemoryConfig(BaseModel):
retrieval_config: Optional dictionary mapping namespaces to retrieval configurations
batch_size: Number of messages to batch before sending to AgentCore Memory.
Default of 1 means immediate sending (no batching). Max 100.
flush_interval_seconds: Optional interval in seconds for automatic buffer flushing.
Useful for long-running agents to ensure messages are persisted regularly.
Default is None (disabled).
context_tag: XML tag name used to wrap retrieved memory context injected into messages.
Default is "user_context".
"""
Expand All @@ -40,4 +43,5 @@ class AgentCoreMemoryConfig(BaseModel):
actor_id: str = Field(min_length=1)
retrieval_config: Optional[Dict[str, RetrievalConfig]] = None
batch_size: int = Field(default=1, ge=1, le=100)
flush_interval_seconds: Optional[float] = Field(default=None, gt=0)
context_tag: str = Field(default="user_context", min_length=1)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import boto3
from botocore.config import Config as BotocoreConfig
from strands.hooks import MessageAddedEvent
from strands.hooks import AfterInvocationEvent, MessageAddedEvent
from strands.hooks.registry import HookRegistry
from strands.session.repository_session_manager import RepositorySessionManager
from strands.session.session_repository import SessionRepository
Expand Down Expand Up @@ -125,6 +125,11 @@ def __init__(
# Cache for agent created_at timestamps to avoid fetching on every update
self._agent_created_at_cache: dict[str, datetime] = {}

# Interval-based flushing support
self._flush_timer: Optional[threading.Timer] = None
self._timer_lock = threading.Lock()
self._shutdown = False

# Add strands-agents to the request user agent
if boto_client_config:
existing_user_agent = getattr(boto_client_config, "user_agent_extra", None)
Expand All @@ -145,6 +150,10 @@ def __init__(
)
super().__init__(session_id=self.config.session_id, session_repository=self)

# Start interval-based flush timer if configured
if self.config.flush_interval_seconds:
self._start_flush_timer()

# region SessionRepository interface implementation
def create_session(self, session: Session, **kwargs: Any) -> Session:
"""Create a new session in AgentCore Memory.
Expand Down Expand Up @@ -672,6 +681,10 @@ def register_hooks(self, registry: HookRegistry, **kwargs) -> None:
RepositorySessionManager.register_hooks(self, registry, **kwargs)
registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event))

# Only register AfterInvocationEvent hook when batching is enabled
if self.config.batch_size > 1:
registry.add_callback(AfterInvocationEvent, lambda event: self._flush_messages())

@override
def initialize(self, agent: "Agent", **kwargs: Any) -> None:
if self.has_existing_agent:
Expand Down Expand Up @@ -784,6 +797,7 @@ def close(self) -> None:
messages are sent to AgentCore Memory. Alternatively, use the context
manager protocol (with statement) for automatic cleanup.
"""
self._stop_flush_timer()
self._flush_messages()

def __enter__(self) -> "AgentCoreMemorySessionManager":
Expand All @@ -803,6 +817,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
exc_tb: Exception traceback if an exception occurred.
"""
try:
self._stop_flush_timer()
self._flush_messages()
except Exception as e:
if exc_type is not None:
Expand All @@ -811,3 +826,70 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
raise

# endregion Batching support

# region Interval-based flushing support

def _start_flush_timer(self) -> None:
"""Start the interval-based flush timer.
This method schedules a recurring timer that flushes the message buffer
at regular intervals if flush_interval_seconds is configured.
"""
with self._timer_lock:
if self._shutdown:
return

# Cancel existing timer if any
if self._flush_timer is not None:
self._flush_timer.cancel()

# Schedule next flush
self._flush_timer = threading.Timer(
self.config.flush_interval_seconds,
self._interval_flush_callback,
)
self._flush_timer.daemon = True
self._flush_timer.start()
logger.debug(
"Scheduled interval flush in %.1f seconds",
self.config.flush_interval_seconds,
)

def _interval_flush_callback(self) -> None:
"""Callback executed by the flush timer.
Flushes the buffer if it contains messages, then reschedules the timer.
"""
try:
# Only flush if there are messages in the buffer
pending = self.pending_message_count()
if pending > 0:
logger.debug("Interval flush triggered: %d message(s) pending", pending)
self._flush_messages()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: __flush_messages will return early if the buffer is empty. Therefore, we don't need to precheck the buffer.

else:
logger.debug("Interval flush skipped: buffer is empty")

# Reschedule the timer (unless shutdown)
if not self._shutdown and self.config.flush_interval_seconds:
self._start_flush_timer()

except Exception as e:
logger.error("Error during interval flush: %s", e)
# Attempt to reschedule even after error
if not self._shutdown and self.config.flush_interval_seconds:
self._start_flush_timer()

def _stop_flush_timer(self) -> None:
"""Stop the interval-based flush timer.
This method cancels the timer and prevents it from rescheduling.
Should be called during cleanup (close() or __exit__).
"""
with self._timer_lock:
self._shutdown = True
if self._flush_timer is not None:
self._flush_timer.cancel()
self._flush_timer = None
logger.debug("Stopped interval flush timer")

# endregion Interval-based flushing support
Loading
Loading