Skip to content

Integrate long-term memory functionality and update CI configuration#55

Open
prashant4654 wants to merge 6 commits into10xHub:mainfrom
prashant4654:prashant-lib
Open

Integrate long-term memory functionality and update CI configuration#55
prashant4654 wants to merge 6 commits into10xHub:mainfrom
prashant4654:prashant-lib

Conversation

@prashant4654
Copy link
Contributor

This pull request introduces long-term memory integration into AgentFlow, enabling LLMs and agent graphs to search, store, update, and delete persistent memories. The main additions are a new memory_tool for LLMs, a memory preload node factory, system prompt helpers, and a write tracker to ensure all pending memory writes are completed before shutdown. It also adds support for Google GenAI extras, improves reasoning extraction in LiteLLM responses, and updates exports for the new memory features.

Long-term memory integration:

  • Added agentflow/store/long_term_memory.py, which provides:

    • memory_tool: an LLM-callable tool for interacting with long-term memory stores.
    • create_memory_preload_node: a factory for injecting retrieved memories as context before LLM calls.
    • get_memory_system_prompt: prompt helpers for different memory read modes.
    • MemoryWriteTracker and get_write_tracker: track and await pending memory writes for graceful shutdown.
  • Exported new memory-related utilities in agentflow/store/__init__.py for easier access throughout the codebase.

Shutdown and resource management:

  • Updated agentflow/graph/compiled_graph.py to wait for pending memory writes before shutdown by using the new write tracker, ensuring no data loss.

LiteLLM response parsing improvements:

  • Enhanced reasoning extraction in agentflow/adapters/llm/litellm_converter.py to support providers that use thinking_blocks for reasoning content, improving compatibility and accuracy.
  • Updated token usage parsing to use completion_tokens_details instead of prompt_tokens_details for reasoning tokens.

Dependency and compatibility updates:

  • Updated the CI workflow to install the google-genai extra for tests, enabling Google GenAI support.
  • Added compatibility handling for Google GenAI imports in agentflow/store/embedding/google_embedding.py.

Miscellaneous:

  • Added "task_manager" to the tool node constants for broader tool support in graphs.

prashant4654 and others added 6 commits March 2, 2026 16:47
…oryWriteTracker

fix: Correct token details key in LiteLLMConverter test
test: Add comprehensive tests for long-term memory functionality
Signed-off-by: prashant4654 <ee23btech11218@iith.ac.in>
…ndencies and adjust import handling in GoogleEmbedding
Copilot AI review requested due to automatic review settings March 2, 2026 12:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds long-term memory capabilities to AgentFlow (LLM-callable tool + preload node + system prompt helpers) and wires shutdown handling so pending memory writes can be awaited before graph teardown. It also improves LiteLLM reasoning extraction (including thinking_blocks support), updates CI to install Google GenAI extras, and exposes the new memory utilities via the store package.

Changes:

  • Added agentflow.store.long_term_memory with memory_tool, preload node factory, prompt helpers, and a pending-write tracker.
  • Updated shutdown flow in CompiledGraph.aclose() to wait for pending memory writes before shutting down background tasks/store.
  • Improved LiteLLM reasoning + token parsing; updated CI to install google-genai extra.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
agentflow/store/long_term_memory.py New memory tool + preload node + write tracking utilities for persistent memory integration.
agentflow/graph/compiled_graph.py Awaits pending memory writes during aclose() before shutting down task manager/store.
agentflow/store/__init__.py Re-exports new memory utilities for easier access.
agentflow/graph/tool_node/constants.py Adds task_manager to injectable params excluded from tool schema generation.
agentflow/adapters/llm/litellm_converter.py Extracts reasoning from thinking_blocks and updates reasoning token parsing.
tests/store/test_long_term_memory.py Adds unit tests for long-term memory helper functions, tool behavior, preload node, and write tracker.
tests/adapters/test_litellm_converter.py Updates token usage test fixture to match new reasoning token source.
agentflow/store/embedding/google_embedding.py Minor import/formatting adjustment related to Google GenAI compatibility.
.github/workflows/ci.yml Installs .[litellm,google-genai] extras in CI.

Comment on lines +62 to +88
"""Wait for all pending writes. Returns stats dict."""
tasks = list(self._pending)
if not tasks:
return {"status": "completed", "pending_writes": 0}

count = len(tasks)
logger.info("Waiting for %d pending memory writes to complete...", count)
try:
if timeout:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("All %d pending memory writes completed.", count)
return {"status": "completed", "pending_writes": 0, "completed": count}
except TimeoutError:
remaining = len(self._pending)
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending", remaining, count
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": count - remaining,
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_pending() snapshots self._pending into tasks = list(self._pending) without holding the lock or looping until empty. Any writes tracked after this snapshot won’t be awaited, which weakens the “guarantees all pending writes finish before shutdown” behavior. Consider acquiring the lock to snapshot+freeze, or loop until pending_count == 0 (with a timeout), so late-added tasks are also waited on.

Suggested change
"""Wait for all pending writes. Returns stats dict."""
tasks = list(self._pending)
if not tasks:
return {"status": "completed", "pending_writes": 0}
count = len(tasks)
logger.info("Waiting for %d pending memory writes to complete...", count)
try:
if timeout:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("All %d pending memory writes completed.", count)
return {"status": "completed", "pending_writes": 0, "completed": count}
except TimeoutError:
remaining = len(self._pending)
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending", remaining, count
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": count - remaining,
}
"""Wait for all pending writes. Returns stats dict.
This method will:
- repeatedly snapshot the current set of pending tasks under the lock
(to avoid races with `track`), waiting for them to finish, and
- stop once there are no more pending tasks or the optional timeout
has elapsed.
"""
# Track total number of tasks we have waited on across iterations.
total_completed = 0
loop = asyncio.get_running_loop()
deadline: float | None = None
if timeout is not None:
deadline = loop.time() + timeout
while True:
# Snapshot current pending tasks under the lock to avoid races with `track`.
async with self._lock:
tasks = [t for t in self._pending if not t.done()]
if not tasks:
logger.info("All pending memory writes completed.")
return {
"status": "completed",
"pending_writes": 0,
"completed": total_completed,
}
count = len(tasks)
logger.info("Waiting for %d pending memory writes to complete...", count)
try:
if deadline is not None:
remaining_timeout = deadline - loop.time()
if remaining_timeout <= 0:
raise asyncio.TimeoutError()
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=remaining_timeout,
)
else:
await asyncio.gather(*tasks, return_exceptions=True)
total_completed += count
except asyncio.TimeoutError:
# Compute how many tasks are still pending (best-effort, under the lock).
async with self._lock:
remaining_tasks = [t for t in self._pending if not t.done()]
remaining = len(remaining_tasks)
logger.warning(
"Timeout waiting for memory writes: %d/%d still pending",
remaining,
total_completed + remaining,
)
return {
"status": "timeout",
"pending_writes": remaining,
"completed": total_completed,
}

Copilot uses AI. Check for mistakes.
results = await store.asearch(
cfg,
query,
memory_type=mem_type,
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the search action, category (and any potential filter info) is ignored even though the tool exposes a category parameter and BaseStore.asearch(...) supports category. This makes it impossible for callers/LLMs to scope searches to a category. Consider passing category=category (when set) through to asearch.

Suggested change
memory_type=mem_type,
memory_type=mem_type,
category=category,

Copilot uses AI. Check for mistakes.
Comment on lines +300 to +306
if memory_types:
search_kwargs["memory_type"] = memory_types[0]
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens

try:
results = await store.asearch(config, query, **search_kwargs)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_memory_preload_node accepts memory_types: list[MemoryType] | None, but only ever uses the first element (memory_types[0]) and BaseStore.asearch only supports a single memory_type. This is misleading for callers and suggests multi-type support that isn’t implemented. Consider changing the parameter to memory_type: MemoryType | None (single) or extending store/search to accept multiple types explicitly.

Suggested change
if memory_types:
search_kwargs["memory_type"] = memory_types[0]
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens
try:
results = await store.asearch(config, query, **search_kwargs)
if max_tokens is not None:
search_kwargs["max_tokens"] = max_tokens
try:
# If specific memory types are provided, search each type separately
# and aggregate the results. Otherwise, perform a single search.
results: list[MemorySearchResult] = []
if memory_types:
for mt in memory_types:
per_type_results = await store.asearch(
config,
query,
memory_type=mt,
**search_kwargs,
)
if per_type_results:
results.extend(per_type_results)
else:
results = await store.asearch(config, query, **search_kwargs)

Copilot uses AI. Check for mistakes.
Comment on lines +535 to +546
# 0. Wait for pending memory writes to complete (no cancellation)
try:
from agentflow.store.long_term_memory import get_write_tracker

tracker = get_write_tracker()
if tracker.pending_count > 0:
logger.info(
"Waiting for %d pending memory writes before shutdown...",
tracker.pending_count,
)
write_stats = await tracker.wait_for_pending(timeout=self._shutdown_timeout)
stats["memory_writes"] = write_stats
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new memory-write wait uses timeout=self._shutdown_timeout and then the rest of aclose() proceeds with additional shutdown steps that each have their own timeouts. This means total shutdown time can now exceed _shutdown_timeout by an extra full timeout window. Consider budgeting the shutdown time across phases (e.g., subtract elapsed time / cap remaining) so the overall shutdown stays within the intended limit.

Copilot uses AI. Check for mistakes.
Comment on lines 33 to 43
INJECTABLE_PARAMS = {
"tool_call_id",
"state",
"config",
"generated_id",
"context_manager",
"publisher",
"checkpointer",
"store",
"task_manager",
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INJECTABLE_PARAMS now includes task_manager, but the module docstring’s parameter list above still stops at store and doesn’t mention task_manager. Please update the docstring so schema/exclusion behavior remains accurately documented.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +99
_write_tracker = MemoryWriteTracker()


def get_write_tracker() -> MemoryWriteTracker:
"""Returns the global write-tracker instance."""
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_write_tracker is instantiated at import time, which creates an asyncio.Lock immediately. This can bind the lock to the wrong event loop (or raise at import), and later async with self._lock can fail with “bound to a different event loop”. Consider lazy-initializing the global tracker inside get_write_tracker() (e.g., create it on first call from within an active loop) or avoid asyncio primitives in module-level singletons.

Suggested change
_write_tracker = MemoryWriteTracker()
def get_write_tracker() -> MemoryWriteTracker:
"""Returns the global write-tracker instance."""
_write_tracker: MemoryWriteTracker | None = None
def get_write_tracker() -> MemoryWriteTracker:
"""Returns the global write-tracker instance."""
global _write_tracker
if _write_tracker is None:
_write_tracker = MemoryWriteTracker()

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants