Integrate long-term memory functionality and enhance tests#53
Integrate long-term memory functionality and enhance tests#53prashant4654 wants to merge 4 commits into10xHub:mainfrom
Conversation
…oryWriteTracker fix: Correct token details key in LiteLLMConverter test test: Add comprehensive tests for long-term memory functionality
Signed-off-by: prashant4654 <ee23btech11218@iith.ac.in>
There was a problem hiding this comment.
Pull request overview
This PR adds long-term memory (LTM) integration to AgentFlow via a new LLM-callable tool and preload node, updates shutdown behavior to await pending memory writes, and improves LiteLLM response conversion (reasoning + token accounting) with corresponding test updates.
Changes:
- Introduces
agentflow.store.long_term_memorywithmemory_tool, preload-node factory, system prompt helper, and a pending-write tracker. - Updates
CompiledGraph.aclose()to wait for pending memory writes before shutdown. - Improves LiteLLM conversion for reasoning extraction and reasoning token usage, and updates tests accordingly.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
agentflow/store/long_term_memory.py |
Adds LTM tool + preload node + write tracking infrastructure. |
agentflow/graph/compiled_graph.py |
Awaits pending memory writes during aclose() for graceful shutdown. |
agentflow/graph/tool_node/constants.py |
Adds task_manager to injectable params so tool schemas exclude it. |
agentflow/store/__init__.py |
Exports new long-term memory APIs from the store package. |
agentflow/adapters/llm/litellm_converter.py |
Improves reasoning extraction + reasoning token parsing robustness. |
tests/store/test_long_term_memory.py |
Adds test coverage for the new long-term memory module behaviors. |
tests/adapters/test_litellm_converter.py |
Updates test fixtures to match new reasoning token field location. |
| tasks = list(self._pending) | ||
| if not tasks: | ||
| return {"status": "completed", "pending_writes": 0} | ||
|
|
||
| count = len(tasks) | ||
| logger.info("Waiting for %d pending memory writes to complete...", count) | ||
| try: | ||
| if timeout: | ||
| await asyncio.wait_for( | ||
| asyncio.gather(*tasks, return_exceptions=True), | ||
| timeout=timeout, | ||
| ) | ||
| else: | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
| logger.info("All %d pending memory writes completed.", count) | ||
| return {"status": "completed", "pending_writes": 0, "completed": count} | ||
| except TimeoutError: | ||
| remaining = len(self._pending) | ||
| logger.warning( | ||
| "Timeout waiting for memory writes: %d/%d still pending", remaining, count | ||
| ) | ||
| return { | ||
| "status": "timeout", | ||
| "pending_writes": remaining, | ||
| "completed": count - remaining, | ||
| } | ||
|
|
There was a problem hiding this comment.
MemoryWriteTracker.wait_for_pending() snapshots self._pending without acquiring self._lock. If a new write task is tracked concurrently (e.g., a tool call races with shutdown), it can be missed and not awaited, breaking the “guaranteed completion on shutdown” behavior. Consider taking the lock when reading/snapshotting _pending (and optionally looping until the set is stable/empty) so wait_for_pending() can’t return while new tasks are being added.
| tasks = list(self._pending) | |
| if not tasks: | |
| return {"status": "completed", "pending_writes": 0} | |
| count = len(tasks) | |
| logger.info("Waiting for %d pending memory writes to complete...", count) | |
| try: | |
| if timeout: | |
| await asyncio.wait_for( | |
| asyncio.gather(*tasks, return_exceptions=True), | |
| timeout=timeout, | |
| ) | |
| else: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| logger.info("All %d pending memory writes completed.", count) | |
| return {"status": "completed", "pending_writes": 0, "completed": count} | |
| except TimeoutError: | |
| remaining = len(self._pending) | |
| logger.warning( | |
| "Timeout waiting for memory writes: %d/%d still pending", remaining, count | |
| ) | |
| return { | |
| "status": "timeout", | |
| "pending_writes": remaining, | |
| "completed": count - remaining, | |
| } | |
| # Loop until there are no more pending tasks (or we hit the timeout), | |
| # always snapshotting self._pending under self._lock. | |
| loop = asyncio.get_running_loop() | |
| start_time = loop.time() if timeout is not None else None | |
| total_completed = 0 | |
| while True: | |
| # Snapshot pending (not-yet-done) tasks under the lock for consistency. | |
| async with self._lock: | |
| tasks = [t for t in self._pending if not t.done()] | |
| count = len(tasks) | |
| if not tasks: | |
| # Nothing left to wait for. | |
| if total_completed == 0: | |
| return {"status": "completed", "pending_writes": 0} | |
| logger.info( | |
| "All %d pending memory writes completed.", total_completed | |
| ) | |
| return { | |
| "status": "completed", | |
| "pending_writes": 0, | |
| "completed": total_completed, | |
| } | |
| logger.info( | |
| "Waiting for %d pending memory writes to complete...", count | |
| ) | |
| try: | |
| if timeout is not None: | |
| # Compute remaining timeout budget across iterations. | |
| elapsed = loop.time() - start_time | |
| remaining_timeout = timeout - elapsed | |
| if remaining_timeout <= 0: | |
| raise TimeoutError | |
| await asyncio.wait_for( | |
| asyncio.gather(*tasks, return_exceptions=True), | |
| timeout=remaining_timeout, | |
| ) | |
| else: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| total_completed += count | |
| except TimeoutError: | |
| # On timeout, report how many tasks are still pending under the lock. | |
| async with self._lock: | |
| remaining = len([t for t in self._pending if not t.done()]) | |
| logger.warning( | |
| "Timeout waiting for memory writes: %d/%d still pending", | |
| remaining, | |
| total_completed + count, | |
| ) | |
| return { | |
| "status": "timeout", | |
| "pending_writes": remaining, | |
| "completed": total_completed + count - remaining, | |
| } |
| _write_tracker = MemoryWriteTracker() | ||
|
|
||
|
|
||
| def get_write_tracker() -> MemoryWriteTracker: | ||
| """Returns the global write-tracker instance.""" |
There was a problem hiding this comment.
The module-level singleton _write_tracker = MemoryWriteTracker() creates an asyncio.Lock() during module import. Asyncio locks are loop-bound; if this tracker is first used in one event loop and later used from a different loop (common in test suites or apps that create multiple loops), it can raise “attached to a different loop”. Consider making the tracker loop-scoped (one per running loop) or lazily creating the lock/tracker within the active loop instead of at import time.
| if memory_types: | ||
| search_kwargs["memory_type"] = memory_types[0] | ||
| if max_tokens is not None: | ||
| search_kwargs["max_tokens"] = max_tokens | ||
|
|
||
| try: | ||
| results = await store.asearch(config, query, **search_kwargs) |
There was a problem hiding this comment.
create_memory_preload_node accepts memory_types: list[MemoryType] | None, but only the first element is ever used (memory_types[0]). This is misleading for callers and makes it easy to assume multiple types are supported when they aren’t. Consider changing the parameter to a single memory_type: MemoryType | None (matching BaseStore.asearch) or implementing multi-type retrieval semantics explicitly.
| if memory_types: | |
| search_kwargs["memory_type"] = memory_types[0] | |
| if max_tokens is not None: | |
| search_kwargs["max_tokens"] = max_tokens | |
| try: | |
| results = await store.asearch(config, query, **search_kwargs) | |
| if max_tokens is not None: | |
| search_kwargs["max_tokens"] = max_tokens | |
| try: | |
| # If multiple memory types are provided, perform one search per type | |
| # and aggregate the results. This preserves existing behavior for | |
| # 0- or 1-element lists while enabling true multi-type semantics. | |
| results: list[MemorySearchResult] = [] | |
| if memory_types: | |
| for memory_type in memory_types: | |
| per_type_kwargs = dict(search_kwargs) | |
| per_type_kwargs["memory_type"] = memory_type | |
| per_type_results = await store.asearch( | |
| config, | |
| query, | |
| **per_type_kwargs, | |
| ) | |
| if per_type_results: | |
| results.extend(per_type_results) | |
| else: | |
| results = await store.asearch(config, query, **search_kwargs) |
| "publisher", | ||
| "checkpointer", | ||
| "store", | ||
| "task_manager", | ||
| } |
There was a problem hiding this comment.
INJECTABLE_PARAMS now includes task_manager, but the docstring above still documents injectable params only up through store. Please update the docstring to mention task_manager so documentation matches behavior.
This pull request introduces a comprehensive long-term memory integration for AgentFlow, enabling LLMs to interact with persistent memory through a new tool and supporting infrastructure. The changes include a new memory_tool for search/store/update/delete operations, a system for tracking pending memory writes to guarantee graceful shutdown, and enhancements to memory retrieval and prompt handling.LiteLLM resoning nlock and token issue fixed
Long-term memory integration:
Added agentflow/store/long_term_memory.py, providing:
memory_tool: an LLM-callable tool for searching, storing, updating, and deleting long-term memories.
create_memory_preload_node: a factory for injecting retrieved memories into the agent state before LLM calls.
get_memory_system_prompt: helper for system prompt fragments tailored to different retrieval modes.
tracks pending asynchronous memory writes to ensure completion during shutdown.
Updated agentflow/store/init.py to export new long-term memory components, making them available for import throughout the codebase.
Graceful shutdown improvements:
Modified agentflow/graph/compiled_graph.py to await pending memory writes before shutting down, using the new MemoryWriteTracker for robust resource management.
LLM response and content block handling:
Improved robustness in agentflow/adapters/llm/litellm_converter.py by:
Ensuring token usage fields are always integers, even if underlying data is missing or null.
Extracting reasoning content from thinking_blocks if not directly present, supporting more provider formats.
Test and schema updates:
Updated tests in tests/adapters/test_litellm_converter.py to use the correct field (completion_tokens_details) for reasoning tokens, reflecting upstream API changes.
Generated ltest_long_temrm_memory.py to verify the LTM.
Constants and configuration:
Added "task_manager" to the set of injectable node names in agentflow/graph/tool_node/constants.py, supporting new memory tool dependencies.