Skip to content

Integrate long-term memory functionality and enhance tests#53

Closed
prashant4654 wants to merge 4 commits into10xHub:mainfrom
prashant4654:prashant-lib
Closed

Integrate long-term memory functionality and enhance tests#53
prashant4654 wants to merge 4 commits into10xHub:mainfrom
prashant4654:prashant-lib

Conversation

@prashant4654
Copy link
Contributor

This pull request introduces a comprehensive long-term memory integration for AgentFlow, enabling LLMs to interact with persistent memory through a new tool and supporting infrastructure. The changes include a new memory_tool for search/store/update/delete operations, a system for tracking pending memory writes to guarantee graceful shutdown, and enhancements to memory retrieval and prompt handling.LiteLLM resoning nlock and token issue fixed

Long-term memory integration:

Added agentflow/store/long_term_memory.py, providing:

memory_tool: an LLM-callable tool for searching, storing, updating, and deleting long-term memories.
create_memory_preload_node: a factory for injecting retrieved memories into the agent state before LLM calls.
get_memory_system_prompt: helper for system prompt fragments tailored to different retrieval modes.
tracks pending asynchronous memory writes to ensure completion during shutdown.
Updated agentflow/store/init.py to export new long-term memory components, making them available for import throughout the codebase.
Graceful shutdown improvements:

Modified agentflow/graph/compiled_graph.py to await pending memory writes before shutting down, using the new MemoryWriteTracker for robust resource management.

LLM response and content block handling:

Improved robustness in agentflow/adapters/llm/litellm_converter.py by:
Ensuring token usage fields are always integers, even if underlying data is missing or null.
Extracting reasoning content from thinking_blocks if not directly present, supporting more provider formats.
Test and schema updates:

Updated tests in tests/adapters/test_litellm_converter.py to use the correct field (completion_tokens_details) for reasoning tokens, reflecting upstream API changes.
Generated ltest_long_temrm_memory.py to verify the LTM.
Constants and configuration:

Added "task_manager" to the set of injectable node names in agentflow/graph/tool_node/constants.py, supporting new memory tool dependencies.

prashant4654 and others added 4 commits March 2, 2026 16:47
…oryWriteTracker

fix: Correct token details key in LiteLLMConverter test
test: Add comprehensive tests for long-term memory functionality
Signed-off-by: prashant4654 <ee23btech11218@iith.ac.in>
Copilot AI review requested due to automatic review settings March 2, 2026 12:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds long-term memory (LTM) integration to AgentFlow via a new LLM-callable tool and preload node, updates shutdown behavior to await pending memory writes, and improves LiteLLM response conversion (reasoning + token accounting) with corresponding test updates.

Changes:

  • Introduces agentflow.store.long_term_memory with memory_tool, preload-node factory, system prompt helper, and a pending-write tracker.
  • Updates CompiledGraph.aclose() to wait for pending memory writes before shutdown.
  • Improves LiteLLM conversion for reasoning extraction and reasoning token usage, and updates tests accordingly.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
agentflow/store/long_term_memory.py Adds LTM tool + preload node + write tracking infrastructure.
agentflow/graph/compiled_graph.py Awaits pending memory writes during aclose() for graceful shutdown.
agentflow/graph/tool_node/constants.py Adds task_manager to injectable params so tool schemas exclude it.
agentflow/store/__init__.py Exports new long-term memory APIs from the store package.
agentflow/adapters/llm/litellm_converter.py Improves reasoning extraction + reasoning token parsing robustness.
tests/store/test_long_term_memory.py Adds test coverage for the new long-term memory module behaviors.
tests/adapters/test_litellm_converter.py Updates test fixtures to match new reasoning token field location.

Comment on lines +63 to +89
tasks = list(self._pending)
if not tasks:
return {"status": "completed", "pending_writes": 0}

count = len(tasks)
logger.info("Waiting for %d pending memory writes to complete...", count)
try:
if timeout:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("All %d pending memory writes completed.", count)
return {"status": "completed", "pending_writes": 0, "completed": count}
except TimeoutError:
remaining = len(self._pending)
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending", remaining, count
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": count - remaining,
}

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryWriteTracker.wait_for_pending() snapshots self._pending without acquiring self._lock. If a new write task is tracked concurrently (e.g., a tool call races with shutdown), it can be missed and not awaited, breaking the “guaranteed completion on shutdown” behavior. Consider taking the lock when reading/snapshotting _pending (and optionally looping until the set is stable/empty) so wait_for_pending() can’t return while new tasks are being added.

Suggested change
tasks = list(self._pending)
if not tasks:
return {"status": "completed", "pending_writes": 0}
count = len(tasks)
logger.info("Waiting for %d pending memory writes to complete...", count)
try:
if timeout:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("All %d pending memory writes completed.", count)
return {"status": "completed", "pending_writes": 0, "completed": count}
except TimeoutError:
remaining = len(self._pending)
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending", remaining, count
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": count - remaining,
}
# Loop until there are no more pending tasks (or we hit the timeout),
# always snapshotting self._pending under self._lock.
loop = asyncio.get_running_loop()
start_time = loop.time() if timeout is not None else None
total_completed = 0
while True:
# Snapshot pending (not-yet-done) tasks under the lock for consistency.
async with self._lock:
tasks = [t for t in self._pending if not t.done()]
count = len(tasks)
if not tasks:
# Nothing left to wait for.
if total_completed == 0:
return {"status": "completed", "pending_writes": 0}
logger.info(
"All %d pending memory writes completed.", total_completed
)
return {
"status": "completed",
"pending_writes": 0,
"completed": total_completed,
}
logger.info(
"Waiting for %d pending memory writes to complete...", count
)
try:
if timeout is not None:
# Compute remaining timeout budget across iterations.
elapsed = loop.time() - start_time
remaining_timeout = timeout - elapsed
if remaining_timeout <= 0:
raise TimeoutError
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=remaining_timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
total_completed += count
except TimeoutError:
# On timeout, report how many tasks are still pending under the lock.
async with self._lock:
remaining = len([t for t in self._pending if not t.done()])
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending",
remaining,
total_completed + count,
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": total_completed + count - remaining,
}

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +99
_write_tracker = MemoryWriteTracker()


def get_write_tracker() -> MemoryWriteTracker:
"""Returns the global write-tracker instance."""
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module-level singleton _write_tracker = MemoryWriteTracker() creates an asyncio.Lock() during module import. Asyncio locks are loop-bound; if this tracker is first used in one event loop and later used from a different loop (common in test suites or apps that create multiple loops), it can raise “attached to a different loop”. Consider making the tracker loop-scoped (one per running loop) or lazily creating the lock/tracker within the active loop instead of at import time.

Copilot uses AI. Check for mistakes.
Comment on lines +300 to +306
if memory_types:
search_kwargs["memory_type"] = memory_types[0]
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens

try:
results = await store.asearch(config, query, **search_kwargs)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_memory_preload_node accepts memory_types: list[MemoryType] | None, but only the first element is ever used (memory_types[0]). This is misleading for callers and makes it easy to assume multiple types are supported when they aren’t. Consider changing the parameter to a single memory_type: MemoryType | None (matching BaseStore.asearch) or implementing multi-type retrieval semantics explicitly.

Suggested change
if memory_types:
search_kwargs["memory_type"] = memory_types[0]
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens
try:
results = await store.asearch(config, query, **search_kwargs)
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens
try:
# If multiple memory types are provided, perform one search per type
# and aggregate the results. This preserves existing behavior for
# 0- or 1-element lists while enabling true multi-type semantics.
results: list[MemorySearchResult] = []
if memory_types:
for memory_type in memory_types:
per_type_kwargs = dict(search_kwargs)
per_type_kwargs["memory_type"] = memory_type
per_type_results = await store.asearch(
config,
query,
**per_type_kwargs,
)
if per_type_results:
results.extend(per_type_results)
else:
results = await store.asearch(config, query, **search_kwargs)

Copilot uses AI. Check for mistakes.
Comment on lines 39 to 43
"publisher",
"checkpointer",
"store",
"task_manager",
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INJECTABLE_PARAMS now includes task_manager, but the docstring above still documents injectable params only up through store. Please update the docstring to mention task_manager so documentation matches behavior.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants