From f40034e1ee657b652a815e1a1c88e9d53ff6e0b2 Mon Sep 17 00:00:00 2001 From: matrix Date: Tue, 21 Apr 2026 03:32:33 +0800 Subject: [PATCH 1/2] [Doc] Introduce Mem0 based Long-Term Memory Add documentation for integrating Mem0 as a Long-Term Memory backend in Flink Agents, including: - Mem0 overview, concepts, and prerequisites - BaseLongTermMemory implementation example - Agent usage examples with correct API - Configuration options for multiple LLM providers - Multi-user memory management and async operations - Best practices and limitations Closes #621 --- .../memory/mem0_long_term_memory.md | 434 ++++++++++++++++++ 1 file changed, 434 insertions(+) create mode 100644 docs/content/docs/development/memory/mem0_long_term_memory.md diff --git a/docs/content/docs/development/memory/mem0_long_term_memory.md b/docs/content/docs/development/memory/mem0_long_term_memory.md new file mode 100644 index 000000000..8969c3998 --- /dev/null +++ b/docs/content/docs/development/memory/mem0_long_term_memory.md @@ -0,0 +1,434 @@ +--- +title: Mem0-based Long-Term Memory +weight: 6 +type: docs +--- + + +## Overview + +Flink Agents provides built-in support for [Mem0](https://github.com/mem0ai/mem0) as a Long-Term Memory backend. Mem0 is an intelligent memory layer for AI agents that provides automatic memory extraction, consolidation, and semantic retrieval. + +{{< hint info >}} +Mem0 replaces the previous VectorStore-based Long-Term Memory implementation since Flink Agents 0.3.0. +{{< /hint >}} + +## Architecture + +The integration uses a three-layer adapter pattern. `Mem0LongTermMemory` orchestrates a Mem0 `Memory` instance, and three adapters (`FlinkAgentsLLM`, `FlinkAgentsEmbedding`, `FlinkAgentsMem0VectorStore`) bridge Flink Agents resources (ChatModel, EmbeddingModel, VectorStore) to Mem0's factory system under the `flink_agents` provider. See [Adapter Mechanism](#adapter-mechanism-advanced) for details. + +## Prerequisites + +1. **Install Mem0 Python SDK**: + ```bash + pip install mem0ai + ``` + +2. **Declare required resources** in your agent plan: + - A [ChatModel]({{< ref "docs/development/chat_models" >}}) for Mem0's fact extraction and memory management + - An [EmbeddingModel]({{< ref "docs/development/embedding_models" >}}) for vector generation + - A [VectorStore]({{< ref "docs/development/vector_stores" >}}) for persistent storage (any `CollectionManageableVectorStore`) + +## Configuration + +Mem0 Long-Term Memory is enabled by setting three configuration options: + +| Key | Type | Description | +|------------------------------------------------------------|--------|---------------------------------| +| `long-term-memory.mem0.chat-model-setup` | String | Resource name of the chat model | +| `long-term-memory.mem0.embedding-model-setup` | String | Resource name of the embedding model | +| `long-term-memory.mem0.vector-store` | String | Resource name of the vector store | + +When all three options are configured, the framework automatically creates a `Mem0LongTermMemory` instance and attaches it to the `RunnerContext`. + +### Configuration Example + +{{< tabs "Mem0 Config" >}} + +{{< tab "Python" >}} + +```python +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions + +env = AgentsExecutionEnvironment.get_execution_environment() +agents_config = env.get_config() + +# Set job identifier (maps to Mem0 user_id) +agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job") + +# Configure Mem0 Long-Term Memory +agents_config.set( + LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, + "my_chat_model" +) +agents_config.set( + LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, + "my_embedding_model" +) +agents_config.set( + LongTermMemoryOptions.Mem0.VECTOR_STORE, + "my_vector_store" +) +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Data Model + +### MemorySetItem + +Represents a single memory item stored by Mem0: + +| Field | Type | Description | +|---------------------|-----------------------|--------------------------------------------| +| `memory_set_name` | String | Name of the memory set this item belongs to | +| `id` | String | Unique identifier of the item | +| `value` | String | The memory content (extracted by Mem0) | +| `created_at` | Optional[datetime] | When the item was created | +| `updated_at` | Optional[datetime] | When the item was last updated | +| `additional_metadata` | Optional[Dict] | Additional metadata associated with the item | + +### MemorySet + +A named collection of memory items. Memory sets are isolated through Mem0's hierarchical scoping (`user_id`, `agent_id`, `run_id`). In Flink Agents: +- `user_id` = job identifier +- `agent_id` = keyed partition key +- `run_id` = memory set name + +## Operations + +### Getting a Memory Set + +Unlike the VectorStore-based LTM, Mem0 does not require explicit `get_or_create_memory_set` with capacity and compaction config — simply call `get_memory_set`: + +{{< tabs "Get Memory Set" >}} + +{{< tab "Python" >}} + +```python +from flink_agents.api.decorators import action +from flink_agents.api.events.event import InputEvent +from flink_agents.api.runner_context import RunnerContext + +@action(InputEvent) +def process_event(event: InputEvent, ctx: RunnerContext) -> None: + ltm = ctx.long_term_memory + + # Get (or create) a memory set + memory_set = ltm.get_memory_set(name="conversations") +``` + +{{< /tab >}} + +{{< /tabs >}} + +### Adding Items + +Add text items to a memory set. Mem0 automatically extracts and consolidates facts: + +{{< tabs "Adding Items" >}} + +{{< tab "Python" >}} + +```python +# Add a single item +ids = memory_set.add(items="The user prefers Python over Java.") + +# Add multiple items +ids = memory_set.add(items=[ + "User likes coffee in the morning.", + "User works from home on Fridays.", +]) + +# Add with metadata +ids = memory_set.add( + items="Important meeting tomorrow.", + metadatas={"category": "work"} +) + +# Add multiple items with metadata +ids = memory_set.add( + items=[ + "Python is great for data science.", + "The weather in Paris is lovely in spring.", + ], + metadatas=[ + {"topic": "programming"}, + {"topic": "travel"}, + ] +) +``` + +{{< /tab >}} + +{{< /tabs >}} + +### Retrieving Items + +Retrieve items by ID or list all items with optional filters: + +{{< tabs "Retrieving Items" >}} + +{{< tab "Python" >}} + +```python +# Get a specific item by ID +items = memory_set.get(ids="mem_123abc") + +# Get multiple items by IDs +items = memory_set.get(ids=["mem_123abc", "mem_456def"]) + +# Get all items (up to 100 by default) +all_items = memory_set.get() + +# Get all items with metadata filter +work_items = memory_set.get(filters={"category": "work"}) + +# Get all items with custom limit +items = memory_set.get(limit=50) + +# Access item properties +for item in items: + print(f"ID: {item.id}") + print(f"Value: {item.value}") + print(f"Created: {item.created_at}") + print(f"Updated: {item.updated_at}") + print(f"Metadata: {item.additional_metadata}") +``` + +{{< /tab >}} + +{{< /tabs >}} + +### Semantic Search + +Search for relevant memories using natural language: + +{{< tabs "Semantic Search" >}} + +{{< tab "Python" >}} + +```python +# Basic search +results = memory_set.search( + query="What does the user like?", + limit=5, +) + +# Search with metadata filter +results = memory_set.search( + query="programming languages", + limit=5, + filters={"topic": "programming"}, +) +``` + +{{< /tab >}} + +{{< /tabs >}} + +### Deleting Items + +{{< tabs "Deleting Items" >}} + +{{< tab "Python" >}} + +```python +# Delete specific items by ID +memory_set.delete(ids="mem_123abc") + +# Delete multiple items +memory_set.delete(ids=["mem_123abc", "mem_456def"]) + +# Delete all items in the memory set +memory_set.delete() +``` + +{{< /tab >}} + +{{< /tabs >}} + +### Deleting a Memory Set + +{{< tabs "Delete Memory Set" >}} + +{{< tab "Python" >}} + +```python +ltm = ctx.long_term_memory +deleted = ltm.delete_memory_set(name="conversations") +# Returns True +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Usage in Agent + +### Complete Example + +Here's a complete example of a personalized assistant using Mem0 Long-Term Memory: + +{{< tabs "Complete Example" >}} + +{{< tab "Python" >}} + +```python +from flink_agents.api.decorators import action +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.events.event import InputEvent, OutputEvent +from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions +from flink_agents.api.runner_context import RunnerContext + +class PersonalizedAssistant: + + @action(InputEvent) + @staticmethod + def process_event(event: InputEvent, ctx: RunnerContext) -> None: + """Respond to user using Mem0 long-term memory.""" + ltm = ctx.long_term_memory + user_query = event.input + + # Get memory set for this session + memory_set = ltm.get_memory_set(name="assistant_memories") + + # Search for relevant context from past interactions + relevant = memory_set.search(query=user_query, limit=5) + memory_context = "\n".join([f"- {m.value}" for m in relevant]) + + # Generate response using your Agent logic + # (e.g., with a chat model) + prompt = f"Known context:\n{memory_context}\n\nUser: {user_query}" + # ... call your LLM ... + response = f"Response to: {user_query}" + + # Store the interaction + memory_set.add(items=f"User asked about: {user_query}") + + ctx.send_event(OutputEvent(output=response)) + +# Setup +env = AgentsExecutionEnvironment.get_execution_environment() +agents_config = env.get_config() +agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "personalized_assistant") +agents_config.set(LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, "my_chat_model") +agents_config.set(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, "my_embedding_model") +agents_config.set(LongTermMemoryOptions.Mem0.VECTOR_STORE, "my_vector_store") +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Context Isolation + +Mem0 Long-Term Memory supports Flink's keyed partition model through `agent_id` scoping: + +{{< tabs "Context Isolation" >}} + +{{< tab "Python" >}} + +```python +from flink_agents.api.decorators import action +from flink_agents.api.events.event import InputEvent +from flink_agents.api.runner_context import RunnerContext + +# Each keyed partition gets isolated memories +# user_id = job_id, agent_id = partition key, run_id = memory_set name + +@action(InputEvent) +@staticmethod +def process_event(event: InputEvent, ctx: RunnerContext) -> None: + ltm = ctx.long_term_memory + memory_set = ltm.get_memory_set(name="user_data") + + # Add memory — automatically scoped to the current key + memory_set.add(items=f"User said: {event.input}") + + # Search — only returns memories for the current key + results = memory_set.search(query=event.input, limit=10) +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Metadata Filtering + +Add metadata when storing memories and use filters during retrieval/searches: + +{{< tabs "Metadata Filtering" >}} + +{{< tab "Python" >}} + +```python +# Store with metadata +memory_set.add( + items="User prefers functional programming.", + metadatas={"topic": "programming", "confidence": "high"} +) + +# Retrieve with filter +results = memory_set.get(filters={"topic": "programming"}) + +# Search with filter +results = memory_set.search( + query="what programming language", + limit=5, + filters={"confidence": "high"} +) +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Adapter Mechanism (Advanced) + +Mem0 integration relies on three custom adapters registered under the `flink_agents` provider: + +- **FlinkAgentsLLM**: Wraps Flink Agents' `BaseChatModelSetup` — maps Mem0's LLM calls to the Flink Agents chat model API +- **FlinkAgentsEmbedding**: Wraps Flink Agents' `BaseEmbeddingModelSetup` — provides embeddings through Flink Agents' embedding model +- **FlinkAgentsMem0VectorStore**: Wraps any `CollectionManageableVectorStore` registered in Flink Agents — routes Mem0's vector operations to the configured store + +These adapters are automatically registered with Mem0's factory system when the `Mem0LongTermMemory` instance is created. + +## Best Practices + +1. **Use metadata for organization**: Add relevant metadata when storing memories to enable precise filtering +2. **Be specific in queries**: More specific search queries yield better semantic results +3. **Mem0 handles extraction**: Unlike the old VectorStore LTM, Mem0 automatically extracts facts — no manual compaction needed +4. **Monitor token usage**: Mem0 makes two LLM calls per `add` operation (fact extraction + memory update), which impacts cost +5. **Choose appropriate vector store**: Use Chroma for development, Elasticsearch/OpenSearch for production + +## Limitations + +- **Python-only**: Mem0 integration is currently available only for Python agents +- **LLM dependency**: Every memory `add` requires LLM calls, adding latency and cost +- **No capacity-based compaction**: Mem0 manages memory internally; compaction is not configurable through Flink Agents +- **External dependency**: Requires the `mem0ai` Python package to be installed + +For more details, refer to the [Mem0 Documentation](https://docs.mem0.ai/), including the [Python Quickstart](https://docs.mem0.ai/open-source/python-quickstart) and [Configuration Guide](https://docs.mem0.ai/open-source/configuration). From b00a1d2b225c4493b802a1ebcf83c2517b7b5834 Mon Sep 17 00:00:00 2001 From: matrix Date: Sat, 9 May 2026 05:08:13 +0800 Subject: [PATCH 2/2] [Doc] Rewrite Long-Term Memory docs per reviewer feedback - Consolidate mem0_long_term_memory.md into long_term_memory.md - Remove Architecture, Best Practices, Limitations sections - Remove pip install instruction (mem0ai is auto-installed) - Add JOB_IDENTIFIER default behavior hint - Move scope explanation to Context Isolation section - Add Java code examples based on PR #647 - Fix @action/@Action usage to match framework convention - Use Event base type with fromEvent()/from_event() pattern --- .../development/memory/long_term_memory.md | 591 ++++++++++-------- .../memory/mem0_long_term_memory.md | 434 ------------- 2 files changed, 330 insertions(+), 695 deletions(-) delete mode 100644 docs/content/docs/development/memory/mem0_long_term_memory.md diff --git a/docs/content/docs/development/memory/long_term_memory.md b/docs/content/docs/development/memory/long_term_memory.md index 8c018689d..e4cbd0296 100644 --- a/docs/content/docs/development/memory/long_term_memory.md +++ b/docs/content/docs/development/memory/long_term_memory.md @@ -24,447 +24,516 @@ under the License. ## Overview -Long-Term Memory is a persistent storage mechanism in Flink Agents designed for storing large amounts of data across multiple agent runs with semantic search capabilities. It provides efficient storage, retrieval, and automatic compaction to manage memory capacity. +Long-Term Memory is a persistent storage mechanism in Flink Agents for storing information across multiple agent runs with semantic search capabilities. It provides automatic memory extraction, consolidation, and retrieval. -{{< hint info >}} -Long-Term Memory is built on vector stores, enabling semantic search to find relevant information based on meaning rather than exact matches. -{{< /hint >}} +Long-Term Memory currently supports the [Mem0](https://github.com/mem0ai/mem0) backend. Mem0 is an intelligent memory layer that automatically extracts facts from conversations, consolidates related memories, and provides semantic retrieval — eliminating the need for manual memory management. -## When to Use Long-Term Memory +## Prerequisites -Long-Term Memory is ideal for: +Declare the following resources in your agent plan: +- A [ChatModel]({{< ref "docs/development/chat_models" >}}) for memory extraction and management +- An [EmbeddingModel]({{< ref "docs/development/embedding_models" >}}) for vector generation +- A [VectorStore]({{< ref "docs/development/vector_stores" >}}) for persistent storage -- **Large Document Collections**: Storing and searching through large amounts of text. -- **Conversation History**: Maintaining long conversation histories with semantic search. -- **Context Retrieval**: Finding relevant context from past interactions. +## Configuration -{{< hint warning >}} -Long-Term Memory is designed for retrieve concise and highly related context. For complete original data retrieval, consider using [Short-Term Memory]({{< ref "docs/development/memory/sensory_and_short_term_memory" >}}) instead. -{{< /hint >}} +Mem0 Long-Term Memory is enabled by setting three configuration options: + +| Key | Type | Description | +|--------------------------------------------|--------|----------------------------------| +| `long-term-memory.mem0.chat-model-setup` | String | Resource name of the chat model | +| `long-term-memory.mem0.embedding-model-setup` | String | Resource name of the embedding model | +| `long-term-memory.mem0.vector-store` | String | Resource name of the vector store | + +When all three options are configured, the framework automatically creates a Mem0-based Long-Term Memory instance and attaches it to the `RunnerContext`. + +### Configuration Example + +{{< tabs "LTM Configuration" >}} + +{{< tab "Python" >}} + +```python +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions + +env = AgentsExecutionEnvironment.get_execution_environment() +agents_config = env.get_config() + +# Set job identifier (maps to Mem0 user_id) +agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job") + +# Configure Mem0 Long-Term Memory +agents_config.set( + LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, + "my_chat_model" +) +agents_config.set( + LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, + "my_embedding_model" +) +agents_config.set( + LongTermMemoryOptions.Mem0.VECTOR_STORE, + "my_vector_store" +) +``` + +{{< /tab >}} + +{{< tab "Java" >}} + +```java +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); +Configuration agentsConfig = agentsEnv.getConfig(); -## Data Structure +// Set job identifier (maps to Mem0 user_id) +agentsConfig.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job"); + +// Configure Mem0 Long-Term Memory +agentsConfig.set( + LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, + "my_chat_model" +); +agentsConfig.set( + LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, + "my_embedding_model" +); +agentsConfig.set( + LongTermMemoryOptions.Mem0.VECTOR_STORE, + "my_vector_store" +); +``` -### Memory Item +{{< /tab >}} -`MemorySetItem` is the abstraction for representing an item stored in long-term memory. The item can be a piece of text, a chat message, a java or python object, semi-structured document, image, audio and video. +{{< /tabs >}} {{< hint info >}} -Currently, item can only be string and `ChatMessage`. +If `JOB_IDENTIFIER` is not configured, the Flink job ID will be used by default. {{< /hint >}} -`MemorySetItem` has the following properties: -* **memory_set_name**: The name of the memory set this item belong to. -* **id**: The unique identifier of the memory item. -* **value**: The value of the memory item. -* **compacted**: Whether this item has been compacted. -* **created_time**: Timestamp or timestamp range for when this memory item was created. -* **last_accessed_time**: Timestamp for the last time this memory item was accessed. +## Data Model + +### MemorySetItem -### Memory Set +Represents a single memory item stored in Long-Term Memory: -`MemorySet` is a set of memory items, which can be maintained and searched separately. +| Field | Type | Description | +|---------------------|-----------------------|--------------------------------------------| +| `memory_set_name` | String | Name of the memory set this item belongs to | +| `id` | String | Unique identifier of the item | +| `value` | String | The memory content (extracted by Mem0) | +| `created_at` | Optional[DateTime] | When the item was created | +| `updated_at` | Optional[DateTime] | When the item was last updated | +| `additional_metadata` | Optional[Map] | Additional metadata associated with the item | -`MemorySet` has the following properties: -- **Name**: Unique identifier for the memory set -- **Item Type**: Type of items stored -- **Capacity**: Maximum number of items before compaction is triggered -- **Compaction Config**: Configuration for compaction +### MemorySet + +A named collection of memory items. Memory sets provide logical grouping and isolation of memories. See [Context Isolation](#context-isolation) for details on how memories are scoped and isolated. ## Operations -### Creating and Getting Memory Set +### Getting a Memory Set -{{< tabs "Memory Set Management" >}} +{{< tabs "Get Memory Set" >}} {{< tab "Python" >}} ```python -ltm = ctx.long_term_memory -# Get or create a memory set -memory_set: MemorySet = ltm.get_or_create_memory_set( - name="my_memory_set", - item_type=str, # or ChatMessage - capacity=50, - compaction_config=CompactionConfig( - model="my_chat_model", - limit=1 # Number of summaries to generate - ) -) +from flink_agents.api.decorators import action +from flink_agents.api.events.event import InputEvent, Event +from flink_agents.api.runner_context import RunnerContext -# Get an existing memory set -memory_set: MemorySet = ltm.get_memory_set(name="my_memory_set") - -# Delete a memory set -deleted: bool = ltm.delete_memory_set(name="my_memory_set") +@action(InputEvent.EVENT_TYPE) +@staticmethod +def process_event(event: Event, ctx: RunnerContext) -> None: + ltm = ctx.long_term_memory + + # Get (or create) a memory set + memory_set = ltm.get_memory_set(name="conversations") ``` + {{< /tab >}} {{< tab "Java" >}} + ```java -BaseLongTermMemory ltm = ctx.getLongTermMemory(); -// Get or create a memory set -MemorySet memorySet = - ltm.getOrCreateMemorySet( - "my_memory_set", - String.class, - 50, - new CompactionConfig("my_chat_model", 1)); - -// Get an existing memory set -memorySet = ltm.getMemorySet("my_memory_set"); - -// Delete a memory set -boolean deleted = ltm.deleteMemorySet("my_memory_set"); +@Action(listenEventTypes = {InputEvent.EVENT_TYPE}) +public static void processEvent(Event event, RunnerContext ctx) throws Exception { + InputEvent inputEvent = InputEvent.fromEvent(event); + BaseLongTermMemory ltm = ctx.getLongTermMemory(); + + // Get (or create) a memory set + MemorySet memorySet = ltm.getMemorySet("conversations"); +} ``` + {{< /tab >}} {{< /tabs >}} ### Adding Items -Add items to a memory set. When capacity is reached, compaction is automatically triggered: - {{< tabs "Adding Items" >}} {{< tab "Python" >}} + ```python # Add a single item -item_id: List[str] = memory_set.add("This is a conversation message") +ids = memory_set.add(items="The user prefers Python over Java.") # Add multiple items -item_ids: List[str] = memory_set.add([ - "First message", - "Second message", - "Third message" +ids = memory_set.add(items=[ + "User likes coffee in the morning.", + "User works from home on Fridays.", ]) -# Add with custom IDs -item_ids = memory_set.add( - items=["Message 1", "Message 2"], - ids=["msg_1", "msg_2"] +# Add with metadata +ids = memory_set.add( + items="Important meeting tomorrow.", + metadatas={"category": "work"} ) ``` + {{< /tab >}} {{< tab "Java" >}} + ```java // Add a single item -String itemId = memorySet.add(List.of("This is a conversation message"), null, null).get(0); +List ids = memorySet.add( + List.of("The user prefers Python over Java."), null); // Add multiple items -List itemIds = memorySet.add(List.of( - "First message", - "Second message", - "Third message" -), null, null); - -// Add with custom IDs -itemIds = memorySet.add( - List.of("Message 1", "Message 2"), - List.of("msg_1", "msg_2"), - null +ids = memorySet.add(List.of( + "User likes coffee in the morning.", + "User works from home on Fridays." +), null); + +// Add with metadata +ids = memorySet.add( + List.of("Important meeting tomorrow."), + List.of(Map.of("category", "work")) ); ``` + {{< /tab >}} {{< /tabs >}} -{{< hint info >}} -If no custom ids are provided, random id will be generated for each item. -{{< /hint >}} ### Retrieving Items -Retrieve items by ID or get all items: - {{< tabs "Retrieving Items" >}} {{< tab "Python" >}} + ```python -# Get a single item by ID -item: MemorySetItem = memory_set.get(ids="item_id_1") +# Get a specific item by ID +items = memory_set.get(ids="mem_123abc") # Get multiple items by IDs -items: List[MemorySetItem] = memory_set.get(ids=["item_id_1", "item_id_2"]) +items = memory_set.get(ids=["mem_123abc", "mem_456def"]) + +# Get all items +all_items = memory_set.get() -# Get all items if no IDs provided -all_items: List[MemorySetItem] = memory_set.get() +# Get with metadata filter +work_items = memory_set.get(filters={"category": "work"}) # Access item properties for item in items: print(f"ID: {item.id}") print(f"Value: {item.value}") - print(f"Compacted: {item.compacted}") - print(f"Created: {item.created_time}") - print(f"Last Accessed: {item.last_accessed_time}") + print(f"Created: {item.created_at}") + print(f"Updated: {item.updated_at}") + print(f"Metadata: {item.additional_metadata}") ``` + {{< /tab >}} {{< tab "Java" >}} + ```java -// Get a single item by ID -MemorySetItem item = memorySet.get(List.of("item_id_1")).get(0); +// Get a specific item by ID +List items = memorySet.get(List.of("item_id_1"), null, null); // Get multiple items by IDs -List items = memorySet.get(List.of("item_id_1", "item_id_2")); +items = memorySet.get(List.of("item_id_1", "item_id_2"), null, null); -// Get all items if no IDs provided -List allItems = memorySet.get(null); +// Get all items +List allItems = memorySet.get(null, null, null); + +// Get with metadata filter +List workItems = memorySet.get(null, Map.of("category", "work"), null); // Access item properties -for (MemorySetItem myItem : items) { +for (MemorySetItem item : items) { System.out.println("ID: " + item.getId()); System.out.println("Value: " + item.getValue()); - System.out.println("Compacted: " + item.isCompacted()); - System.out.println("Created: " + item.getCreatedTime()); - System.out.println("Last Accessed: " + item.getLastAccessedTime()); + System.out.println("Created: " + item.getCreatedAt()); + System.out.println("Updated: " + item.getUpdatedAt()); + System.out.println("Metadata: " + item.getAdditionalMetadata()); } ``` + {{< /tab >}} {{< /tabs >}} ### Semantic Search -Search for relevant items using natural language queries: - {{< tabs "Semantic Search" >}} {{< tab "Python" >}} + ```python -# Search for relevant items -results: List[MemorySetItem] = memory_set.search( - query="What did the user ask about?", - limit=5 +# Basic search +results = memory_set.search( + query="What does the user like?", + limit=5, +) + +# Search with metadata filter +results = memory_set.search( + query="programming languages", + limit=5, + filters={"topic": "programming"}, ) ``` + {{< /tab >}} {{< tab "Java" >}} + ```java -// Search for relevant items +// Basic search List results = memorySet.search( - "What did the user ask about?", - 5, // limit - null // additional kwargs passed to vector store query + "What does the user like?", + 5, + null, + Map.of() +); + +// Search with metadata filter +results = memorySet.search( + "programming languages", + 5, + Map.of("topic", "programming"), + Map.of() ); ``` + {{< /tab >}} {{< /tabs >}} -### Count Size - -Check the current size of a memory set: +### Deleting Items -{{< tabs "Checking Size" >}} +{{< tabs "Deleting Items" >}} {{< tab "Python" >}} + ```python -# Get the current size -current_size = memory_set.size +# Delete specific items by ID +memory_set.delete(ids="mem_123abc") -# Check if capacity is reached -if memory_set.size >= memory_set.capacity: - print("Capacity reached, compaction will be triggered on next add") +# Delete multiple items +memory_set.delete(ids=["mem_123abc", "mem_456def"]) + +# Delete all items in the memory set +memory_set.delete() ``` + {{< /tab >}} {{< tab "Java" >}} + ```java -// Get the current size -int currentSize = memorySet.size(); +// Delete specific items by ID +memorySet.delete(List.of("item_id_1")); -// Check if capacity is reached -if (currentSize >= memorySet.getCapacity()) { - System.out.println("Capacity reached, compaction will be triggered on next add"); -} +// Delete multiple items +memorySet.delete(List.of("item_id_1", "item_id_2")); + +// Delete all items in the memory set +memorySet.delete(null); ``` + {{< /tab >}} {{< /tabs >}} -## Usage in Agent - -### Prerequisites +### Deleting a Memory Set -To use Long-Term Memory, you need: - -1. **Vector Store**: A configured vector store (e.g., ChromaDB) - see [Vector Stores]({{< ref "docs/development/vector_stores" >}}) -2. **Embedding Model**: An embedding model for converting text to vectors - see [Embedding Models]({{< ref "docs/development/embedding_models" >}}) -3. **Chat Model** : Used for summarizing and combining related items. - -### Configuration - -Before using Long-Term Memory, you need to configure it in your agent execution environment. - - -| Key | Default | Type | Description | -|--------------------------------------------------|---------|-----------------------|------------------------------------------------------------------------------------------------| -| AgentConfigOptions.JOB_IDENTIFIER | job id | String | The unique identifier of the agent job, remaining consistent after restoring from a savepoint. | -| LongTermMemoryOptions.BACKEND | none | LongTermMemoryBackend | The backend of the long-term memory. | -| LongTermMemoryOptions.EXTERNAL_VECTOR_STORE_NAME | none | String | The name of the vector store used as backend. | -| LongTermMemoryOptions.ASYNC_COMPACTION | true | boolean | Execute compaction asynchronously. | - -{{< tabs "Long-Term Memory Configuration" >}} +{{< tabs "Delete Memory Set" >}} {{< tab "Python" >}} -```python -agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) -agents_config = agents_env.get_config() - -# Set job identifier -agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job") - -# Configure long-term memory backend -agents_config.set( - LongTermMemoryOptions.BACKEND, - LongTermMemoryBackend.EXTERNAL_VECTOR_STORE -) - -# Specify the vector store to use -agents_config.set( - LongTermMemoryOptions.EXTERNAL_VECTOR_STORE_NAME, - "my_vector_store" -) -# Enable async compaction -agents_config.set(LongTermMemoryOptions.ASYNC_COMPACTION, True) +```python +ltm = ctx.long_term_memory +deleted = ltm.delete_memory_set(name="conversations") ``` + {{< /tab >}} {{< tab "Java" >}} -```java -AgentsExecutionEnvironment agentsEnv = - AgentsExecutionEnvironment.getExecutionEnvironment(env); -Configuration agentsConfig = agentsEnv.getConfig(); - -// Set job identifier -agentsConfig.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job"); - -// Configure long-term memory backend -agentsConfig.set( - LongTermMemoryOptions.BACKEND, - LongTermMemoryBackend.EXTERNAL_VECTOR_STORE -); -// Specify the vector store to use -agentsConfig.set( - LongTermMemoryOptions.EXTERNAL_VECTOR_STORE_NAME, - "my_vector_store" -); - -// Enable async compaction -agentsConfig.set(LongTermMemoryOptions.ASYNC_COMPACTION, true); +```java +BaseLongTermMemory ltm = ctx.getLongTermMemory(); +boolean deleted = ltm.deleteMemorySet("conversations"); ``` + {{< /tab >}} {{< /tabs >}} -### Accessing Long-Term Memory +## Usage in Agent -Long-Term Memory is accessed through the `RunnerContext` object: +### Complete Example -{{< tabs "Accessing Long-Term Memory" >}} +{{< tabs "Complete Example" >}} {{< tab "Python" >}} ```python -@action(InputEvent.EVENT_TYPE) -def process_event(event: Event, ctx: RunnerContext) -> None: - # Access long-term memory - ltm = ctx.long_term_memory +from flink_agents.api.decorators import action +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.events.event import InputEvent, OutputEvent, Event +from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions +from flink_agents.api.runner_context import RunnerContext + +class PersonalizedAssistant: - # Get or create a memory set - memory_set = ltm.get_or_create_memory_set( - name="conversations", - item_type=str, - capacity=100, - compaction_config=CompactionConfig(model="my_chat_model") - ) + @action(InputEvent.EVENT_TYPE) + @staticmethod + def process_event(event: Event, ctx: RunnerContext) -> None: + """Respond to user using long-term memory.""" + ltm = ctx.long_term_memory + user_query = InputEvent.from_event(event).input + + # Get memory set + memory_set = ltm.get_memory_set(name="assistant_memories") + + # Search for relevant context from past interactions + relevant = memory_set.search(query=user_query, limit=5) + memory_context = "\n".join([f"- {m.value}" for m in relevant]) + + # Generate response using your Agent logic + prompt = f"Known context:\n{memory_context}\n\nUser: {user_query}" + response = f"Response to: {user_query}" + + # Store the interaction + memory_set.add(items=f"User asked about: {user_query}") + + ctx.send_event(OutputEvent(output=response)) + +# Setup +env = AgentsExecutionEnvironment.get_execution_environment() +agents_config = env.get_config() +agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "personalized_assistant") +agents_config.set(LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, "my_chat_model") +agents_config.set(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, "my_embedding_model") +agents_config.set(LongTermMemoryOptions.Mem0.VECTOR_STORE, "my_vector_store") ``` + {{< /tab >}} {{< tab "Java" >}} + ```java @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) public static void processEvent(Event event, RunnerContext ctx) throws Exception { - // Access long-term memory + InputEvent inputEvent = InputEvent.fromEvent(event); BaseLongTermMemory ltm = ctx.getLongTermMemory(); + String userQuery = inputEvent.getInput(); + + // Get memory set + MemorySet memorySet = ltm.getMemorySet("assistant_memories"); - // Get or create a memory set - MemorySet memorySet = ltm.getOrCreateMemorySet( - "conversations", - String.class, - 100, - new CompactionConfig("my_chat_model") - ); + // Search for relevant context from past interactions + List relevant = memorySet.search(userQuery, 5, null, Map.of()); + StringBuilder memoryContext = new StringBuilder(); + for (MemorySetItem item : relevant) { + memoryContext.append("- ").append(item.getValue()).append("\n"); + } + + // Generate response using your Agent logic + String response = "Response to: " + userQuery; + + // Store the interaction + memorySet.add(List.of("User asked about: " + userQuery), null); + + ctx.sendEvent(new OutputEvent(response)); } ``` + {{< /tab >}} {{< /tabs >}} -## Compaction +## Context Isolation + +Long-Term Memory automatically provides context isolation through Flink's keyed partition model. Each keyed partition maintains its own isolated set of memories, ensuring that memories from one user or session do not leak into another. -When capacity is reached, long-term memory will use LLM to summarize and combine related items. +The isolation hierarchy works as follows: +- **Job-level** (`JOB_IDENTIFIER`): Separates memories between different Flink jobs +- **Partition-level** (keyed partition key): Separates memories between different keys within the same job +- **Set-level** (memory set name): Separates memories between different logical categories within the same partition -User can configure the compaction config when create the `MemorySet`. -{{< tabs "Compaction Config" >}} +This means you can safely use the same memory set name across different partitions — each partition will only access its own memories. + +## Metadata Filtering + +Add metadata when storing memories and use filters during retrieval and search: + +{{< tabs "Metadata Filtering" >}} {{< tab "Python" >}} ```python -# Create memory set with compaction configuration. -memory_set = ltm.get_or_create_memory_set( - name="conversations", - item_type=str, - capacity=10, # The framework will automatically trigger compactions and try to maintain the - # size of the memory set not exceeding the given capacity with best efforts - compaction_config=CompactionConfig(model="my_chat_model", limit=1) +# Store with metadata +memory_set.add( + items="User prefers functional programming.", + metadatas={"topic": "programming", "confidence": "high"} +) + +# Retrieve with filter +results = memory_set.get(filters={"topic": "programming"}) + +# Search with filter +results = memory_set.search( + query="what programming language", + limit=5, + filters={"confidence": "high"} ) ``` + {{< /tab >}} {{< tab "Java" >}} + ```java -// Create memory set with compaction configuration. -MemorySet memorySet = ltm.getOrCreateMemorySet( - "conversations", - String.class, - 10, // The framework will automatically trigger compactions and try to maintain the - // size of the memory set not exceeding the given capacity with best efforts - new CompactionConfig("my_chat_model", 1) +// Store with metadata +memorySet.add( + List.of("User prefers functional programming."), + List.of(Map.of("topic", "programming", "confidence", "high")) ); -``` -{{< /tab >}} -{{< /tabs >}} - -### Async Compaction - -Compactions are by default asynchronously performed, to avoid blocking the agent execution. You can also explicitly disable this, so that the agent execution will be paused during the compaction. -{{< tabs "Async Compaction" >}} +// Retrieve with filter +List results = memorySet.get(null, Map.of("topic", "programming"), null); -{{< tab "Python" >}} -```python -# Explicitly disable async compaction in configuration -agents_config.set(LongTermMemoryOptions.ASYNC_COMPACTION, False) +// Search with filter +results = memorySet.search( + "what programming language", + 5, + Map.of("confidence", "high"), + Map.of() +); ``` -{{< /tab >}} -{{< tab "Java" >}} -```java -// Explicitly disable async compaction in configuration -agentsConfig.set(LongTermMemoryOptions.ASYNC_COMPACTION, false); -``` {{< /tab >}} {{< /tabs >}} - -{{< hint info >}} -When async compaction is enabled, compaction runs in a background thread. If compaction fails, errors are logged but don't cause the Flink job to fail. -{{< /hint >}} - -{{< hint info >}} -When async compaction is enabled, compaction won't block user adding items to the memory set. The size of the memory set may exceed capacity temporarily. -{{< /hint >}} \ No newline at end of file diff --git a/docs/content/docs/development/memory/mem0_long_term_memory.md b/docs/content/docs/development/memory/mem0_long_term_memory.md deleted file mode 100644 index 8969c3998..000000000 --- a/docs/content/docs/development/memory/mem0_long_term_memory.md +++ /dev/null @@ -1,434 +0,0 @@ ---- -title: Mem0-based Long-Term Memory -weight: 6 -type: docs ---- - - -## Overview - -Flink Agents provides built-in support for [Mem0](https://github.com/mem0ai/mem0) as a Long-Term Memory backend. Mem0 is an intelligent memory layer for AI agents that provides automatic memory extraction, consolidation, and semantic retrieval. - -{{< hint info >}} -Mem0 replaces the previous VectorStore-based Long-Term Memory implementation since Flink Agents 0.3.0. -{{< /hint >}} - -## Architecture - -The integration uses a three-layer adapter pattern. `Mem0LongTermMemory` orchestrates a Mem0 `Memory` instance, and three adapters (`FlinkAgentsLLM`, `FlinkAgentsEmbedding`, `FlinkAgentsMem0VectorStore`) bridge Flink Agents resources (ChatModel, EmbeddingModel, VectorStore) to Mem0's factory system under the `flink_agents` provider. See [Adapter Mechanism](#adapter-mechanism-advanced) for details. - -## Prerequisites - -1. **Install Mem0 Python SDK**: - ```bash - pip install mem0ai - ``` - -2. **Declare required resources** in your agent plan: - - A [ChatModel]({{< ref "docs/development/chat_models" >}}) for Mem0's fact extraction and memory management - - An [EmbeddingModel]({{< ref "docs/development/embedding_models" >}}) for vector generation - - A [VectorStore]({{< ref "docs/development/vector_stores" >}}) for persistent storage (any `CollectionManageableVectorStore`) - -## Configuration - -Mem0 Long-Term Memory is enabled by setting three configuration options: - -| Key | Type | Description | -|------------------------------------------------------------|--------|---------------------------------| -| `long-term-memory.mem0.chat-model-setup` | String | Resource name of the chat model | -| `long-term-memory.mem0.embedding-model-setup` | String | Resource name of the embedding model | -| `long-term-memory.mem0.vector-store` | String | Resource name of the vector store | - -When all three options are configured, the framework automatically creates a `Mem0LongTermMemory` instance and attaches it to the `RunnerContext`. - -### Configuration Example - -{{< tabs "Mem0 Config" >}} - -{{< tab "Python" >}} - -```python -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.core_options import AgentConfigOptions -from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions - -env = AgentsExecutionEnvironment.get_execution_environment() -agents_config = env.get_config() - -# Set job identifier (maps to Mem0 user_id) -agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "my_job") - -# Configure Mem0 Long-Term Memory -agents_config.set( - LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, - "my_chat_model" -) -agents_config.set( - LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, - "my_embedding_model" -) -agents_config.set( - LongTermMemoryOptions.Mem0.VECTOR_STORE, - "my_vector_store" -) -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Data Model - -### MemorySetItem - -Represents a single memory item stored by Mem0: - -| Field | Type | Description | -|---------------------|-----------------------|--------------------------------------------| -| `memory_set_name` | String | Name of the memory set this item belongs to | -| `id` | String | Unique identifier of the item | -| `value` | String | The memory content (extracted by Mem0) | -| `created_at` | Optional[datetime] | When the item was created | -| `updated_at` | Optional[datetime] | When the item was last updated | -| `additional_metadata` | Optional[Dict] | Additional metadata associated with the item | - -### MemorySet - -A named collection of memory items. Memory sets are isolated through Mem0's hierarchical scoping (`user_id`, `agent_id`, `run_id`). In Flink Agents: -- `user_id` = job identifier -- `agent_id` = keyed partition key -- `run_id` = memory set name - -## Operations - -### Getting a Memory Set - -Unlike the VectorStore-based LTM, Mem0 does not require explicit `get_or_create_memory_set` with capacity and compaction config — simply call `get_memory_set`: - -{{< tabs "Get Memory Set" >}} - -{{< tab "Python" >}} - -```python -from flink_agents.api.decorators import action -from flink_agents.api.events.event import InputEvent -from flink_agents.api.runner_context import RunnerContext - -@action(InputEvent) -def process_event(event: InputEvent, ctx: RunnerContext) -> None: - ltm = ctx.long_term_memory - - # Get (or create) a memory set - memory_set = ltm.get_memory_set(name="conversations") -``` - -{{< /tab >}} - -{{< /tabs >}} - -### Adding Items - -Add text items to a memory set. Mem0 automatically extracts and consolidates facts: - -{{< tabs "Adding Items" >}} - -{{< tab "Python" >}} - -```python -# Add a single item -ids = memory_set.add(items="The user prefers Python over Java.") - -# Add multiple items -ids = memory_set.add(items=[ - "User likes coffee in the morning.", - "User works from home on Fridays.", -]) - -# Add with metadata -ids = memory_set.add( - items="Important meeting tomorrow.", - metadatas={"category": "work"} -) - -# Add multiple items with metadata -ids = memory_set.add( - items=[ - "Python is great for data science.", - "The weather in Paris is lovely in spring.", - ], - metadatas=[ - {"topic": "programming"}, - {"topic": "travel"}, - ] -) -``` - -{{< /tab >}} - -{{< /tabs >}} - -### Retrieving Items - -Retrieve items by ID or list all items with optional filters: - -{{< tabs "Retrieving Items" >}} - -{{< tab "Python" >}} - -```python -# Get a specific item by ID -items = memory_set.get(ids="mem_123abc") - -# Get multiple items by IDs -items = memory_set.get(ids=["mem_123abc", "mem_456def"]) - -# Get all items (up to 100 by default) -all_items = memory_set.get() - -# Get all items with metadata filter -work_items = memory_set.get(filters={"category": "work"}) - -# Get all items with custom limit -items = memory_set.get(limit=50) - -# Access item properties -for item in items: - print(f"ID: {item.id}") - print(f"Value: {item.value}") - print(f"Created: {item.created_at}") - print(f"Updated: {item.updated_at}") - print(f"Metadata: {item.additional_metadata}") -``` - -{{< /tab >}} - -{{< /tabs >}} - -### Semantic Search - -Search for relevant memories using natural language: - -{{< tabs "Semantic Search" >}} - -{{< tab "Python" >}} - -```python -# Basic search -results = memory_set.search( - query="What does the user like?", - limit=5, -) - -# Search with metadata filter -results = memory_set.search( - query="programming languages", - limit=5, - filters={"topic": "programming"}, -) -``` - -{{< /tab >}} - -{{< /tabs >}} - -### Deleting Items - -{{< tabs "Deleting Items" >}} - -{{< tab "Python" >}} - -```python -# Delete specific items by ID -memory_set.delete(ids="mem_123abc") - -# Delete multiple items -memory_set.delete(ids=["mem_123abc", "mem_456def"]) - -# Delete all items in the memory set -memory_set.delete() -``` - -{{< /tab >}} - -{{< /tabs >}} - -### Deleting a Memory Set - -{{< tabs "Delete Memory Set" >}} - -{{< tab "Python" >}} - -```python -ltm = ctx.long_term_memory -deleted = ltm.delete_memory_set(name="conversations") -# Returns True -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Usage in Agent - -### Complete Example - -Here's a complete example of a personalized assistant using Mem0 Long-Term Memory: - -{{< tabs "Complete Example" >}} - -{{< tab "Python" >}} - -```python -from flink_agents.api.decorators import action -from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.core_options import AgentConfigOptions -from flink_agents.api.events.event import InputEvent, OutputEvent -from flink_agents.api.memory.long_term_memory import LongTermMemoryOptions -from flink_agents.api.runner_context import RunnerContext - -class PersonalizedAssistant: - - @action(InputEvent) - @staticmethod - def process_event(event: InputEvent, ctx: RunnerContext) -> None: - """Respond to user using Mem0 long-term memory.""" - ltm = ctx.long_term_memory - user_query = event.input - - # Get memory set for this session - memory_set = ltm.get_memory_set(name="assistant_memories") - - # Search for relevant context from past interactions - relevant = memory_set.search(query=user_query, limit=5) - memory_context = "\n".join([f"- {m.value}" for m in relevant]) - - # Generate response using your Agent logic - # (e.g., with a chat model) - prompt = f"Known context:\n{memory_context}\n\nUser: {user_query}" - # ... call your LLM ... - response = f"Response to: {user_query}" - - # Store the interaction - memory_set.add(items=f"User asked about: {user_query}") - - ctx.send_event(OutputEvent(output=response)) - -# Setup -env = AgentsExecutionEnvironment.get_execution_environment() -agents_config = env.get_config() -agents_config.set(AgentConfigOptions.JOB_IDENTIFIER, "personalized_assistant") -agents_config.set(LongTermMemoryOptions.Mem0.CHAT_MODEL_SETUP, "my_chat_model") -agents_config.set(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP, "my_embedding_model") -agents_config.set(LongTermMemoryOptions.Mem0.VECTOR_STORE, "my_vector_store") -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Context Isolation - -Mem0 Long-Term Memory supports Flink's keyed partition model through `agent_id` scoping: - -{{< tabs "Context Isolation" >}} - -{{< tab "Python" >}} - -```python -from flink_agents.api.decorators import action -from flink_agents.api.events.event import InputEvent -from flink_agents.api.runner_context import RunnerContext - -# Each keyed partition gets isolated memories -# user_id = job_id, agent_id = partition key, run_id = memory_set name - -@action(InputEvent) -@staticmethod -def process_event(event: InputEvent, ctx: RunnerContext) -> None: - ltm = ctx.long_term_memory - memory_set = ltm.get_memory_set(name="user_data") - - # Add memory — automatically scoped to the current key - memory_set.add(items=f"User said: {event.input}") - - # Search — only returns memories for the current key - results = memory_set.search(query=event.input, limit=10) -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Metadata Filtering - -Add metadata when storing memories and use filters during retrieval/searches: - -{{< tabs "Metadata Filtering" >}} - -{{< tab "Python" >}} - -```python -# Store with metadata -memory_set.add( - items="User prefers functional programming.", - metadatas={"topic": "programming", "confidence": "high"} -) - -# Retrieve with filter -results = memory_set.get(filters={"topic": "programming"}) - -# Search with filter -results = memory_set.search( - query="what programming language", - limit=5, - filters={"confidence": "high"} -) -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Adapter Mechanism (Advanced) - -Mem0 integration relies on three custom adapters registered under the `flink_agents` provider: - -- **FlinkAgentsLLM**: Wraps Flink Agents' `BaseChatModelSetup` — maps Mem0's LLM calls to the Flink Agents chat model API -- **FlinkAgentsEmbedding**: Wraps Flink Agents' `BaseEmbeddingModelSetup` — provides embeddings through Flink Agents' embedding model -- **FlinkAgentsMem0VectorStore**: Wraps any `CollectionManageableVectorStore` registered in Flink Agents — routes Mem0's vector operations to the configured store - -These adapters are automatically registered with Mem0's factory system when the `Mem0LongTermMemory` instance is created. - -## Best Practices - -1. **Use metadata for organization**: Add relevant metadata when storing memories to enable precise filtering -2. **Be specific in queries**: More specific search queries yield better semantic results -3. **Mem0 handles extraction**: Unlike the old VectorStore LTM, Mem0 automatically extracts facts — no manual compaction needed -4. **Monitor token usage**: Mem0 makes two LLM calls per `add` operation (fact extraction + memory update), which impacts cost -5. **Choose appropriate vector store**: Use Chroma for development, Elasticsearch/OpenSearch for production - -## Limitations - -- **Python-only**: Mem0 integration is currently available only for Python agents -- **LLM dependency**: Every memory `add` requires LLM calls, adding latency and cost -- **No capacity-based compaction**: Mem0 manages memory internally; compaction is not configurable through Flink Agents -- **External dependency**: Requires the `mem0ai` Python package to be installed - -For more details, refer to the [Mem0 Documentation](https://docs.mem0.ai/), including the [Python Quickstart](https://docs.mem0.ai/open-source/python-quickstart) and [Configuration Guide](https://docs.mem0.ai/open-source/configuration).