Integrate long-term memory functionality and update CI configuration#55
Integrate long-term memory functionality and update CI configuration#55prashant4654 wants to merge 6 commits into10xHub:mainfrom
Conversation
…oryWriteTracker fix: Correct token details key in LiteLLMConverter test test: Add comprehensive tests for long-term memory functionality
Signed-off-by: prashant4654 <ee23btech11218@iith.ac.in>
…ndencies and adjust import handling in GoogleEmbedding
There was a problem hiding this comment.
Pull request overview
This PR adds long-term memory capabilities to AgentFlow (LLM-callable tool + preload node + system prompt helpers) and wires shutdown handling so pending memory writes can be awaited before graph teardown. It also improves LiteLLM reasoning extraction (including thinking_blocks support), updates CI to install Google GenAI extras, and exposes the new memory utilities via the store package.
Changes:
- Added
agentflow.store.long_term_memorywithmemory_tool, preload node factory, prompt helpers, and a pending-write tracker. - Updated shutdown flow in
CompiledGraph.aclose()to wait for pending memory writes before shutting down background tasks/store. - Improved LiteLLM reasoning + token parsing; updated CI to install
google-genaiextra.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
agentflow/store/long_term_memory.py |
New memory tool + preload node + write tracking utilities for persistent memory integration. |
agentflow/graph/compiled_graph.py |
Awaits pending memory writes during aclose() before shutting down task manager/store. |
agentflow/store/__init__.py |
Re-exports new memory utilities for easier access. |
agentflow/graph/tool_node/constants.py |
Adds task_manager to injectable params excluded from tool schema generation. |
agentflow/adapters/llm/litellm_converter.py |
Extracts reasoning from thinking_blocks and updates reasoning token parsing. |
tests/store/test_long_term_memory.py |
Adds unit tests for long-term memory helper functions, tool behavior, preload node, and write tracker. |
tests/adapters/test_litellm_converter.py |
Updates token usage test fixture to match new reasoning token source. |
agentflow/store/embedding/google_embedding.py |
Minor import/formatting adjustment related to Google GenAI compatibility. |
.github/workflows/ci.yml |
Installs .[litellm,google-genai] extras in CI. |
| """Wait for all pending writes. Returns stats dict.""" | ||
| tasks = list(self._pending) | ||
| if not tasks: | ||
| return {"status": "completed", "pending_writes": 0} | ||
|
|
||
| count = len(tasks) | ||
| logger.info("Waiting for %d pending memory writes to complete...", count) | ||
| try: | ||
| if timeout: | ||
| await asyncio.wait_for( | ||
| asyncio.gather(*tasks, return_exceptions=True), | ||
| timeout=timeout, | ||
| ) | ||
| else: | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
| logger.info("All %d pending memory writes completed.", count) | ||
| return {"status": "completed", "pending_writes": 0, "completed": count} | ||
| except TimeoutError: | ||
| remaining = len(self._pending) | ||
| logger.warning( | ||
| "Timeout waiting for memory writes: %d/%d still pending", remaining, count | ||
| ) | ||
| return { | ||
| "status": "timeout", | ||
| "pending_writes": remaining, | ||
| "completed": count - remaining, | ||
| } |
There was a problem hiding this comment.
wait_for_pending() snapshots self._pending into tasks = list(self._pending) without holding the lock or looping until empty. Any writes tracked after this snapshot won’t be awaited, which weakens the “guarantees all pending writes finish before shutdown” behavior. Consider acquiring the lock to snapshot+freeze, or loop until pending_count == 0 (with a timeout), so late-added tasks are also waited on.
| """Wait for all pending writes. Returns stats dict.""" | |
| tasks = list(self._pending) | |
| if not tasks: | |
| return {"status": "completed", "pending_writes": 0} | |
| count = len(tasks) | |
| logger.info("Waiting for %d pending memory writes to complete...", count) | |
| try: | |
| if timeout: | |
| await asyncio.wait_for( | |
| asyncio.gather(*tasks, return_exceptions=True), | |
| timeout=timeout, | |
| ) | |
| else: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| logger.info("All %d pending memory writes completed.", count) | |
| return {"status": "completed", "pending_writes": 0, "completed": count} | |
| except TimeoutError: | |
| remaining = len(self._pending) | |
| logger.warning( | |
| "Timeout waiting for memory writes: %d/%d still pending", remaining, count | |
| ) | |
| return { | |
| "status": "timeout", | |
| "pending_writes": remaining, | |
| "completed": count - remaining, | |
| } | |
| """Wait for all pending writes. Returns stats dict. | |
| This method will: | |
| - repeatedly snapshot the current set of pending tasks under the lock | |
| (to avoid races with `track`), waiting for them to finish, and | |
| - stop once there are no more pending tasks or the optional timeout | |
| has elapsed. | |
| """ | |
| # Track total number of tasks we have waited on across iterations. | |
| total_completed = 0 | |
| loop = asyncio.get_running_loop() | |
| deadline: float | None = None | |
| if timeout is not None: | |
| deadline = loop.time() + timeout | |
| while True: | |
| # Snapshot current pending tasks under the lock to avoid races with `track`. | |
| async with self._lock: | |
| tasks = [t for t in self._pending if not t.done()] | |
| if not tasks: | |
| logger.info("All pending memory writes completed.") | |
| return { | |
| "status": "completed", | |
| "pending_writes": 0, | |
| "completed": total_completed, | |
| } | |
| count = len(tasks) | |
| logger.info("Waiting for %d pending memory writes to complete...", count) | |
| try: | |
| if deadline is not None: | |
| remaining_timeout = deadline - loop.time() | |
| if remaining_timeout <= 0: | |
| raise asyncio.TimeoutError() | |
| await asyncio.wait_for( | |
| asyncio.gather(*tasks, return_exceptions=True), | |
| timeout=remaining_timeout, | |
| ) | |
| else: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| total_completed += count | |
| except asyncio.TimeoutError: | |
| # Compute how many tasks are still pending (best-effort, under the lock). | |
| async with self._lock: | |
| remaining_tasks = [t for t in self._pending if not t.done()] | |
| remaining = len(remaining_tasks) | |
| logger.warning( | |
| "Timeout waiting for memory writes: %d/%d still pending", | |
| remaining, | |
| total_completed + remaining, | |
| ) | |
| return { | |
| "status": "timeout", | |
| "pending_writes": remaining, | |
| "completed": total_completed, | |
| } |
| results = await store.asearch( | ||
| cfg, | ||
| query, | ||
| memory_type=mem_type, |
There was a problem hiding this comment.
In the search action, category (and any potential filter info) is ignored even though the tool exposes a category parameter and BaseStore.asearch(...) supports category. This makes it impossible for callers/LLMs to scope searches to a category. Consider passing category=category (when set) through to asearch.
| memory_type=mem_type, | |
| memory_type=mem_type, | |
| category=category, |
| if memory_types: | ||
| search_kwargs["memory_type"] = memory_types[0] | ||
| if max_tokens is not None: | ||
| search_kwargs["max_tokens"] = max_tokens | ||
|
|
||
| try: | ||
| results = await store.asearch(config, query, **search_kwargs) |
There was a problem hiding this comment.
create_memory_preload_node accepts memory_types: list[MemoryType] | None, but only ever uses the first element (memory_types[0]) and BaseStore.asearch only supports a single memory_type. This is misleading for callers and suggests multi-type support that isn’t implemented. Consider changing the parameter to memory_type: MemoryType | None (single) or extending store/search to accept multiple types explicitly.
| if memory_types: | |
| search_kwargs["memory_type"] = memory_types[0] | |
| if max_tokens is not None: | |
| search_kwargs["max_tokens"] = max_tokens | |
| try: | |
| results = await store.asearch(config, query, **search_kwargs) | |
| if max_tokens is not None: | |
| search_kwargs["max_tokens"] = max_tokens | |
| try: | |
| # If specific memory types are provided, search each type separately | |
| # and aggregate the results. Otherwise, perform a single search. | |
| results: list[MemorySearchResult] = [] | |
| if memory_types: | |
| for mt in memory_types: | |
| per_type_results = await store.asearch( | |
| config, | |
| query, | |
| memory_type=mt, | |
| **search_kwargs, | |
| ) | |
| if per_type_results: | |
| results.extend(per_type_results) | |
| else: | |
| results = await store.asearch(config, query, **search_kwargs) |
| # 0. Wait for pending memory writes to complete (no cancellation) | ||
| try: | ||
| from agentflow.store.long_term_memory import get_write_tracker | ||
|
|
||
| tracker = get_write_tracker() | ||
| if tracker.pending_count > 0: | ||
| logger.info( | ||
| "Waiting for %d pending memory writes before shutdown...", | ||
| tracker.pending_count, | ||
| ) | ||
| write_stats = await tracker.wait_for_pending(timeout=self._shutdown_timeout) | ||
| stats["memory_writes"] = write_stats |
There was a problem hiding this comment.
The new memory-write wait uses timeout=self._shutdown_timeout and then the rest of aclose() proceeds with additional shutdown steps that each have their own timeouts. This means total shutdown time can now exceed _shutdown_timeout by an extra full timeout window. Consider budgeting the shutdown time across phases (e.g., subtract elapsed time / cap remaining) so the overall shutdown stays within the intended limit.
| INJECTABLE_PARAMS = { | ||
| "tool_call_id", | ||
| "state", | ||
| "config", | ||
| "generated_id", | ||
| "context_manager", | ||
| "publisher", | ||
| "checkpointer", | ||
| "store", | ||
| "task_manager", | ||
| } |
There was a problem hiding this comment.
INJECTABLE_PARAMS now includes task_manager, but the module docstring’s parameter list above still stops at store and doesn’t mention task_manager. Please update the docstring so schema/exclusion behavior remains accurately documented.
| _write_tracker = MemoryWriteTracker() | ||
|
|
||
|
|
||
| def get_write_tracker() -> MemoryWriteTracker: | ||
| """Returns the global write-tracker instance.""" |
There was a problem hiding this comment.
_write_tracker is instantiated at import time, which creates an asyncio.Lock immediately. This can bind the lock to the wrong event loop (or raise at import), and later async with self._lock can fail with “bound to a different event loop”. Consider lazy-initializing the global tracker inside get_write_tracker() (e.g., create it on first call from within an active loop) or avoid asyncio primitives in module-level singletons.
| _write_tracker = MemoryWriteTracker() | |
| def get_write_tracker() -> MemoryWriteTracker: | |
| """Returns the global write-tracker instance.""" | |
| _write_tracker: MemoryWriteTracker | None = None | |
| def get_write_tracker() -> MemoryWriteTracker: | |
| """Returns the global write-tracker instance.""" | |
| global _write_tracker | |
| if _write_tracker is None: | |
| _write_tracker = MemoryWriteTracker() |
This pull request introduces long-term memory integration into AgentFlow, enabling LLMs and agent graphs to search, store, update, and delete persistent memories. The main additions are a new
memory_toolfor LLMs, a memory preload node factory, system prompt helpers, and a write tracker to ensure all pending memory writes are completed before shutdown. It also adds support for Google GenAI extras, improves reasoning extraction in LiteLLM responses, and updates exports for the new memory features.Long-term memory integration:
Added
agentflow/store/long_term_memory.py, which provides:memory_tool: an LLM-callable tool for interacting with long-term memory stores.create_memory_preload_node: a factory for injecting retrieved memories as context before LLM calls.get_memory_system_prompt: prompt helpers for different memory read modes.MemoryWriteTrackerandget_write_tracker: track and await pending memory writes for graceful shutdown.Exported new memory-related utilities in
agentflow/store/__init__.pyfor easier access throughout the codebase.Shutdown and resource management:
agentflow/graph/compiled_graph.pyto wait for pending memory writes before shutdown by using the new write tracker, ensuring no data loss.LiteLLM response parsing improvements:
agentflow/adapters/llm/litellm_converter.pyto support providers that usethinking_blocksfor reasoning content, improving compatibility and accuracy.completion_tokens_detailsinstead ofprompt_tokens_detailsfor reasoning tokens.Dependency and compatibility updates:
google-genaiextra for tests, enabling Google GenAI support.agentflow/store/embedding/google_embedding.py.Miscellaneous:
"task_manager"to the tool node constants for broader tool support in graphs.