Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e9f44cb
feat: add Mimir as persistent cross-session memory backend
SessantaNove Jun 17, 2026
01b3bb5
feat: wire MimirStorage into factory resolution
SessantaNove Jun 17, 2026
2d7815f
fix: address CodeRabbit review comments on MimirStorage integration
SessantaNove Jun 17, 2026
aa62068
fix: adjust SDK calls to async format and fix return types
SessantaNove Jun 17, 2026
bd02734
fix: switch to MimirSyncClient to satisfy synchronous StorageBackend …
SessantaNove Jun 17, 2026
4fa46f7
fix: sanitize MimirSyncClient config keys and fix comment typo
SessantaNove Jun 17, 2026
21d7956
refactor: upgrade Mimir memory backend to official MCP standard
SessantaNove Jun 18, 2026
b9e4349
refactor: complete Mimir integration update in readme and factory
SessantaNove Jun 18, 2026
711e379
fix: address coderabbit recommendations on db flag, expansion and sco…
SessantaNove Jun 18, 2026
d6b95cf
Merge branch 'main' into main
Jacopos311 Jun 18, 2026
67b8320
Address maintainer feedback for Mimir storage module
SessantaNove Jun 19, 2026
76dc276
Fix Mimir storage implementation to fully conform with StorageBackend…
SessantaNove Jun 19, 2026
0b2c880
Enhance MimirStorage stability with direct imports and subprocess tim…
SessantaNove Jun 19, 2026
cd0fbaf
fix(task): centralize token tracking and preserve usage during guardr…
SessantaNove Jun 19, 2026
bba8015
fix(task): address token delta calculation for dict types and explici…
SessantaNove Jun 19, 2026
539d2f9
fix(task,memory): address token tracking lifecycle and sync storage l…
SessantaNove Jun 19, 2026
d5c3468
fix(memory): align search limit and min_score protocol defaults in mi…
SessantaNove Jun 19, 2026
030fff6
feat: implement deterministic tool permission gating for agents (#6232)
SessantaNove Jun 21, 2026
6bfc9a8
fix(security): propagate capability gating to structured tools
SessantaNove Jun 21, 2026
be6e070
Merge branch 'main' into fix/task-token-tracking
Jacopos311 Jun 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lib/crewai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,35 @@ This example demonstrates how to:
3. Use Flow decorators to manage the sequence of operations
4. Implement conditional branching based on Crew results

### Mimir Memory Backend

CrewAI now supports **Mimir** as a persistent memory engine, integrated seamlessly via the Model Context Protocol (MCP).

#### Prerequisites
Ensure you have the official Mimir binary installed and accessible in your system environment. For detailed setup and installation instructions, please visit the official repository:
👉 [Perseus-Computing-LLC/mimir](https://github.com/Perseus-Computing-LLC/mimir)

#### Setup
Since CrewAI communicates with Mimir using MCP via standard I/O subprocesses, you must ensure the `mcp` Python package is installed (automatically handled by CrewAI dependencies).

To use `MimirStorage` as your memory backend, initialize it within your Crew setup by providing the configuration dictionary containing your custom database path:

```python
from crewai import Crew
from crewai.memory.storage.mimir_storage import MimirStorage

mimir_config = {
"db_path": "~/.mimir/custom_mimir.db"
}

crew = Crew(
agents=[...],
tasks=[...],
memory=True,
storage=MimirStorage(config=mimir_config)
)


## Connecting Your Crew to a Model

CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
Expand Down
10 changes: 9 additions & 1 deletion lib/crewai/src/crewai/agents/agent_builder/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from hashlib import md5
from pathlib import Path
import re
from typing import TYPE_CHECKING, Annotated, Any, Final, Literal
from typing import TYPE_CHECKING, Annotated, Any, Final, List, Literal, Optional
import uuid

from pydantic import (
Expand Down Expand Up @@ -248,6 +248,14 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
Set private attributes.
"""

capabilities: Optional[List[str]] = Field(
default=None,
description="List of deterministic capabilities or permissions assigned to the agent."
)
require_approval_for: Optional[List[str]] = Field(
default=None,
description="List of tools or capabilities that require manual human approval before execution."
)
entity_type: Literal["agent"] = "agent"

__hash__ = object.__hash__
Expand Down
23 changes: 19 additions & 4 deletions lib/crewai/src/crewai/memory/storage/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
"""

from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional


if TYPE_CHECKING:
Expand Down Expand Up @@ -45,11 +44,27 @@ def set_memory_storage_factory(factory: MemoryStorageFactory | None) -> None:
_factory = factory


def resolve_memory_storage(spec: str) -> StorageBackend | None:
def resolve_memory_storage(spec: str, config: Optional[dict] = None) -> StorageBackend | None:
"""Return the registered factory's backend for ``spec``, or ``None``.

``None`` means no factory is registered or it declined this spec; the
caller then falls back to the built-in selection.
"""
# First, respect user-registered custom factories if available
factory = _factory
return factory(spec) if factory is not None else None
if factory is not None:
try:
# Try to pass config if the custom factory supports it
custom_backend = factory(spec, config=config) # type: ignore
except TypeError:
custom_backend = factory(spec)

if custom_backend is not None:
return custom_backend

# Built-in fallback to Mimir storage if the spec matches
if spec == "mimir":
from crewai.memory.storage.mimir_storage import MimirStorage
return MimirStorage(config=config)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return None
152 changes: 152 additions & 0 deletions lib/crewai/src/crewai/memory/storage/mimir_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import os
import shutil
import subprocess
import json
import logging
import hashlib
import re
from typing import List, Dict, Any, Optional, Tuple

from crewai.memory.storage.backend import StorageBackend
# CodeRabbit Fix: Direct import to fail-fast and avoid masking integration issues
from crewai.memory.storage.interface import MemoryRecord # type: ignore

logger = logging.getLogger(__name__)

class MimirStorage(StorageBackend):
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}

# Resolve db_path from config dictionary, expanding '~' to home directory
raw_db_path = self.config.get("db_path", "~/mimir_db")
self.db_path = os.path.expanduser(raw_db_path)
os.makedirs(self.db_path, exist_ok=True)

# Verify mimir binary availability in common paths or system PATH
self.mimir_path = self._find_mimir_binary()
if not self.mimir_path:
raise FileNotFoundError(
"The 'mimir' binary could not be found. Please ensure it is installed "
"and available in PATH or at common locations (~/.cargo/bin/mimir, /usr/local/bin/mimir)."
)

def _find_mimir_binary(self) -> Optional[str]:
"""Checks common paths and system PATH for the mimir binary."""
path_binary = shutil.which("mimir")
if path_binary:
return path_binary

common_paths = [
os.path.expanduser("~/.cargo/bin/mimir"),
"/usr/local/bin/mimir",
"/usr/bin/mimir"
]
for path in common_paths:
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None

def _validate_inputs(self, category: str, query: Optional[str] = None) -> None:
"""Validates input arguments to safeguard against CLI/flag injection attacks."""
if category and not re.match(r"^[A-Za-z0-9_-]+$", category):
raise ValueError(f"Malicious characters detected in scope/category: '{category}'")
if query and query.startswith("-"):
raise ValueError("Query string cannot start with a hyphen to prevent flag injection.")

def save(self, records: List[MemoryRecord]) -> None:
"""Saves a list of MemoryRecords conforming to the StorageBackend protocol."""
for record in records:
value_str = str(record.value)

# Generate a persistent deterministic hash key using hashlib MD5
hash_suffix = hashlib.md5(value_str.encode('utf-8')).hexdigest()[:12]
key = f"memory_{hash_suffix}"

# Scope memories using config metadata or default category
category = record.metadata.get("agent_id", "default")
self._validate_inputs(category)

# Prepare payload
payload = {
"key": key,
"value": value_str,
"category": category,
"metadata": record.metadata
}

# Call the subprocess using '--db' flag per Mimir CLI docs (with 10s timeout)
try:
cmd = [self.mimir_path, "--db", self.db_path, "store", json.dumps(payload)]
subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=10)
logger.info(f"Successfully stored memory with key: {key}")
except subprocess.TimeoutExpired as te:
logger.error(f"Mimir store operation timed out: {te}")
raise te
except subprocess.CalledProcessError as e:
logger.error(f"Failed to store memory in Mimir: {e.stderr}")
raise e

def search(
self,
query: Any,
limit: Optional[int] = None,
scope_prefix: Optional[str] = None,
categories: Optional[List[str]] = None,
min_score: Optional[float] = None,
metadata_filter: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[Tuple[MemoryRecord, float]]:
"""Searches memories and returns a list of (MemoryRecord, score) tuples."""
query_str = query if isinstance(query, str) else str(query)

actual_limit = limit if limit is not None else 3

category = scope_prefix if scope_prefix else "default"

if categories and len(categories) > 0:
category = categories[0]

self._validate_inputs(category, query_str)

try:
cmd = [
self.mimir_path,
"--db", self.db_path,
"search",
query_str,
"--limit", str(actual_limit),
"--category", category
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=10)

raw_results = json.loads(result.stdout)
formatted_results = []

for res in raw_results:
content_text = res.get("value", res.get("text", ""))
score = float(res.get("score", 0.0))
meta = res.get("metadata", {})

if min_score is not None and score < min_score:
continue

if metadata_filter:
match = True
for k, v in metadata_filter.items():
if meta.get(k) != v:
match = False
break
if not match:
continue

# Construct official MemoryRecord instances
record = MemoryRecord(value=content_text, metadata=meta)
formatted_results.append((record, score))

return formatted_results
except subprocess.TimeoutExpired as te:
logger.error(f"Mimir search operation timed out: {te}")
raise te
except subprocess.CalledProcessError as e:
logger.error(f"Search failed in Mimir: {e.stderr}")
raise e
45 changes: 41 additions & 4 deletions lib/crewai/src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ async def _aexecute_core(
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)

tokens_before = self._get_agent_token_usage(agent)
self.prompt_context = context
tools = tools or self.tools or []

Expand Down Expand Up @@ -688,6 +688,9 @@ async def _aexecute_core(
raw = result
pydantic_output, json_output = None, None

tokens_after = self._get_agent_token_usage(agent)
token_delta = self._calculate_token_delta(tokens_before, tokens_after)

task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
Expand All @@ -698,6 +701,7 @@ async def _aexecute_core(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=token_delta,
)

if self._guardrails:
Expand Down Expand Up @@ -775,7 +779,7 @@ def _execute_core(
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)

tokens_before = self._get_agent_token_usage(agent)
self.prompt_context = context
tools = tools or self.tools or []

Expand Down Expand Up @@ -813,6 +817,9 @@ def _execute_core(
raw = result
pydantic_output, json_output = None, None

tokens_after = self._get_agent_token_usage(agent)
token_delta = self._calculate_token_delta(tokens_before, tokens_after)

task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
Expand All @@ -823,6 +830,7 @@ def _execute_core(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=token_delta,
)

if self._guardrails:
Expand Down Expand Up @@ -1149,7 +1157,30 @@ async def _aexport_output(
pydantic_output, json_output = self._unpack_model_output(model_output)

return pydantic_output, json_output

def _get_agent_token_usage(self, agent: BaseAgent) -> Any:
"""Capture the current snapshot of tokens consumed by the agent's LLM."""
if hasattr(agent, "llm") and hasattr(agent.llm, "token_usage"):
return shallow_copy(agent.llm.token_usage)
return None

def _calculate_token_delta(self, before: Any, after: Any) -> Any:
"""Calculate the delta of token usage."""
if isinstance(before, dict) and isinstance(after, dict):
delta_dict = {}
for key, after_val in after.items():
before_val = before.get(key, 0)
if isinstance(after_val, (int, float)) and isinstance(before_val, (int, float)):
delta_dict[key] = after_val - before_val
else:
delta_dict[key] = after_val
return delta_dict

try:
return after - before
except Exception:
return after
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@staticmethod
def _unpack_model_output(
model_output: dict[str, Any] | BaseModel | str,
Expand Down Expand Up @@ -1262,6 +1293,7 @@ def _invoke_guardrail_function(
max_attempts = self.guardrail_max_retries + 1

for attempt in range(max_attempts):
current_token_usage = getattr(task_output, 'token_usage', None)
guardrail_result = process_guardrail(
output=task_output,
guardrail=guardrail,
Expand All @@ -1286,7 +1318,8 @@ def _invoke_guardrail_function(
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result

if getattr(task_output, 'token_usage', None) is None and current_token_usage is not None:
task_output.token_usage = current_token_usage
return task_output

if attempt >= self.guardrail_max_retries:
Expand Down Expand Up @@ -1348,6 +1381,7 @@ def _invoke_guardrail_function(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=current_token_usage,
)

return task_output
Expand All @@ -1372,6 +1406,7 @@ async def _ainvoke_guardrail_function(
max_attempts = self.guardrail_max_retries + 1

for attempt in range(max_attempts):
current_token_usage = getattr(task_output, 'token_usage', None)
guardrail_result = process_guardrail(
output=task_output,
guardrail=guardrail,
Expand All @@ -1396,7 +1431,8 @@ async def _ainvoke_guardrail_function(
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result

if getattr(task_output, 'token_usage', None) is None and current_token_usage is not None:
task_output.token_usage = current_token_usage
return task_output

if attempt >= self.guardrail_max_retries:
Expand Down Expand Up @@ -1458,6 +1494,7 @@ async def _ainvoke_guardrail_function(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=current_token_usage,
)

return task_output
Loading