From 53b2fa6c54ad847236325b0a64486cc59593c3fe Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 08:57:25 +0300 Subject: [PATCH 01/14] fix: bound brain_store busy deferral --- src/brainlayer/mcp/store_handler.py | 94 ++++++++++++++++++++++++----- tests/test_store_handler.py | 58 +++++++++++++++++- 2 files changed, 135 insertions(+), 17 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index bd2bd89e..e75ae9c7 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -5,6 +5,7 @@ import json import os import threading +import time import uuid from contextlib import contextmanager from datetime import datetime, timezone @@ -27,6 +28,22 @@ _RETRY_MAX_ATTEMPTS = 4 _retry_delay = 0.15 # base delay in seconds (exposed for test patching) _QUEUE_MAX_SIZE = 100 +_DEFAULT_STORE_BUSY_BUDGET_MS = 3_000 +_MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 + + +def _positive_int_env(name: str, default: int) -> int: + try: + value = int(os.environ.get(name, str(default))) + except (TypeError, ValueError): + return default + if value <= 0 or value > _MAX_APSW_BUSY_TIMEOUT_MS: + return default + return value + + +def _store_busy_budget_ms() -> int: + return _positive_int_env("BRAINLAYER_STORE_BUSY_BUDGET_MS", _DEFAULT_STORE_BUSY_BUDGET_MS) def _is_lock_error(exc: BaseException) -> bool: @@ -510,6 +527,25 @@ def _deferred_store_receipt(chunk_id: str, queue_path, *, reason: str = "DB_BUSY } +def _connection_busy_timeout_ms(conn) -> int | None: + if conn is None: + return None + try: + row = conn.cursor().execute("PRAGMA busy_timeout").fetchone() + return int(row[0]) + except Exception: + return None + + +def _set_connection_busy_timeout(conn, timeout_ms: int | None) -> None: + if conn is None or timeout_ms is None: + return + try: + conn.setbusytimeout(max(1, min(timeout_ms, _MAX_APSW_BUSY_TIMEOUT_MS))) + except Exception: + logger.debug("Failed to set store busy timeout", exc_info=True) + + def _flush_pending_stores(store, embed_fn) -> int: """Flush pending-stores.jsonl (FIFO). Returns count flushed.""" from ..store import store_memory @@ -579,22 +615,48 @@ def _flush_pending_stores(store, embed_fn) -> int: async def _store_memory_with_retries(store_memory, **kwargs): last_err = None - for attempt in range(_RETRY_MAX_ATTEMPTS): - try: - return store_memory(**kwargs) - except Exception as exc: - if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: - raise - last_err = exc - delay = _retry_delay * (2**attempt) - logger.warning( - "brain_store BusyError (attempt %d/%d), retrying in %.2fs", - attempt + 1, - _RETRY_MAX_ATTEMPTS, - delay, - ) - await asyncio.sleep(delay) - raise last_err # type: ignore[misc] + budget_ms = _store_busy_budget_ms() + deadline = time.monotonic() + (budget_ms / 1000) + conn = getattr(kwargs.get("store"), "conn", None) + original_busy_timeout_ms = _connection_busy_timeout_ms(conn) + try: + for attempt in range(_RETRY_MAX_ATTEMPTS): + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0 and last_err is not None: + raise last_err + _set_connection_busy_timeout(conn, remaining_ms) + try: + return store_memory(**kwargs) + except Exception as exc: + if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: + raise + last_err = exc + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + logger.warning( + "brain_store BusyError exceeded %dms busy budget after attempt %d/%d; deferring", + budget_ms, + attempt + 1, + _RETRY_MAX_ATTEMPTS, + ) + raise + delay = _retry_delay * (2**attempt) + if int(delay * 1000) >= remaining_ms: + logger.warning( + "brain_store BusyError has %dms budget left before retry delay; deferring", + remaining_ms, + ) + raise + logger.warning( + "brain_store BusyError (attempt %d/%d), retrying in %.2fs", + attempt + 1, + _RETRY_MAX_ATTEMPTS, + delay, + ) + await asyncio.sleep(delay) + raise last_err # type: ignore[misc] + finally: + _set_connection_busy_timeout(conn, original_busy_timeout_ms) async def _store( diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 160e2667..db06af88 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -1,7 +1,8 @@ """Tests for MCP store handler responses.""" import json -from unittest.mock import patch +import time +from unittest.mock import MagicMock, patch import apsw import pytest @@ -68,6 +69,61 @@ async def test_busy_queue_fallback_returns_loud_deferred_receipt(tmp_path): assert any("DEFERRED" in item.text for item in texts) +@pytest.mark.asyncio +async def test_store_busy_budget_defers_promptly_and_pending_flush_replays(tmp_path, monkeypatch): + """A held write lock must hit the bounded store budget, defer, and replay later.""" + from brainlayer.mcp.store_handler import _flush_pending_stores, _store + + pending_path = tmp_path / "pending-stores.jsonl" + attempts = 0 + + def held_write_lock_store_memory(**kwargs): + nonlocal attempts + attempts += 1 + time.sleep(0.075) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=MagicMock()), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=held_write_lock_store_memory), + ): + started = time.perf_counter() + texts, structured = await _store( + content="queued within busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.22 + assert attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert structured["deferred"]["queue_path"] == str(pending_path) + assert any("queued" in item.text.lower() for item in texts) + + queued_event = json.loads(pending_path.read_text()) + assert queued_event["chunk_id"] == structured["chunk_id"] + assert queued_event["content"] == "queued within busy budget" + + with ( + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", return_value={"id": structured["chunk_id"], "related": []}) as replay, + ): + flushed = _flush_pending_stores(MagicMock(), MagicMock()) + + assert flushed == 1 + assert not pending_path.exists() + assert replay.call_args.kwargs["chunk_id"] == structured["chunk_id"] + assert replay.call_args.kwargs["content"] == "queued within busy budget" + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From b26be70a872b69c3caf5beb0508fa11731cc3929 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 09:14:18 +0300 Subject: [PATCH 02/14] fix: restore store busy timeout per attempt --- src/brainlayer/mcp/store_handler.py | 84 +++++++++++++++++------------ tests/test_store_handler.py | 47 ++++++++++++++++ 2 files changed, 97 insertions(+), 34 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index e75ae9c7..2803af4d 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -30,6 +30,7 @@ _QUEUE_MAX_SIZE = 100 _DEFAULT_STORE_BUSY_BUDGET_MS = 3_000 _MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 +_STORE_BUSY_TIMEOUT_LOCK = threading.Lock() def _positive_int_env(name: str, default: int) -> int: @@ -546,6 +547,25 @@ def _set_connection_busy_timeout(conn, timeout_ms: int | None) -> None: logger.debug("Failed to set store busy timeout", exc_info=True) +@contextmanager +def _temporary_store_busy_timeout(conn, timeout_ms: int): + if conn is None: + yield + return + + acquired = _STORE_BUSY_TIMEOUT_LOCK.acquire(timeout=max(timeout_ms, 1) / 1000) + if not acquired: + raise apsw.BusyError("brain_store busy_timeout lock wait exceeded") + + original_busy_timeout_ms = _connection_busy_timeout_ms(conn) + try: + _set_connection_busy_timeout(conn, timeout_ms) + yield + finally: + _set_connection_busy_timeout(conn, original_busy_timeout_ms) + _STORE_BUSY_TIMEOUT_LOCK.release() + + def _flush_pending_stores(store, embed_fn) -> int: """Flush pending-stores.jsonl (FIFO). Returns count flushed.""" from ..store import store_memory @@ -618,45 +638,41 @@ async def _store_memory_with_retries(store_memory, **kwargs): budget_ms = _store_busy_budget_ms() deadline = time.monotonic() + (budget_ms / 1000) conn = getattr(kwargs.get("store"), "conn", None) - original_busy_timeout_ms = _connection_busy_timeout_ms(conn) - try: - for attempt in range(_RETRY_MAX_ATTEMPTS): - remaining_ms = int((deadline - time.monotonic()) * 1000) - if remaining_ms <= 0 and last_err is not None: - raise last_err - _set_connection_busy_timeout(conn, remaining_ms) - try: + for attempt in range(_RETRY_MAX_ATTEMPTS): + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0 and last_err is not None: + raise last_err + try: + with _temporary_store_busy_timeout(conn, remaining_ms): return store_memory(**kwargs) - except Exception as exc: - if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: - raise - last_err = exc - remaining_ms = int((deadline - time.monotonic()) * 1000) - if remaining_ms <= 0: - logger.warning( - "brain_store BusyError exceeded %dms busy budget after attempt %d/%d; deferring", - budget_ms, - attempt + 1, - _RETRY_MAX_ATTEMPTS, - ) - raise - delay = _retry_delay * (2**attempt) - if int(delay * 1000) >= remaining_ms: - logger.warning( - "brain_store BusyError has %dms budget left before retry delay; deferring", - remaining_ms, - ) - raise + except Exception as exc: + if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: + raise + last_err = exc + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: logger.warning( - "brain_store BusyError (attempt %d/%d), retrying in %.2fs", + "brain_store BusyError exceeded %dms busy budget after attempt %d/%d; deferring", + budget_ms, attempt + 1, _RETRY_MAX_ATTEMPTS, - delay, ) - await asyncio.sleep(delay) - raise last_err # type: ignore[misc] - finally: - _set_connection_busy_timeout(conn, original_busy_timeout_ms) + raise + delay = _retry_delay * (2**attempt) + if int(delay * 1000) >= remaining_ms: + logger.warning( + "brain_store BusyError has %dms budget left before retry delay; deferring", + remaining_ms, + ) + raise + logger.warning( + "brain_store BusyError (attempt %d/%d), retrying in %.2fs", + attempt + 1, + _RETRY_MAX_ATTEMPTS, + delay, + ) + await asyncio.sleep(delay) + raise last_err # type: ignore[misc] async def _store( diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index db06af88..a703dced 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -1,5 +1,6 @@ """Tests for MCP store handler responses.""" +import asyncio import json import time from unittest.mock import MagicMock, patch @@ -124,6 +125,52 @@ def held_write_lock_store_memory(**kwargs): assert replay.call_args.kwargs["content"] == "queued within busy budget" +@pytest.mark.asyncio +async def test_store_busy_budget_restores_timeout_between_concurrent_retries(monkeypatch): + """Overlapping retry sleeps must not snapshot another store call's temporary timeout.""" + from brainlayer.mcp import store_handler + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def locked_store_memory(**kwargs): + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "1000") + monkeypatch.setattr(store_handler, "_retry_delay", 0.02) + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 2) + + results = await asyncio.gather( + store_handler._store_memory_with_retries(locked_store_memory, store=store), + store_handler._store_memory_with_retries(locked_store_memory, store=store), + return_exceptions=True, + ) + + assert all(isinstance(result, apsw.BusyError) for result in results) + assert store.conn.timeout_ms == 5000 + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From fa6a1ba8761403d011c8b6989b462b47bd6ad002 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 09:26:04 +0300 Subject: [PATCH 03/14] fix: enforce store busy deadline through inner retries --- src/brainlayer/mcp/store_handler.py | 15 +++- src/brainlayer/store.py | 34 +++++++- tests/test_store_handler.py | 121 ++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 5 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index 2803af4d..a2ceb33e 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -547,18 +547,27 @@ def _set_connection_busy_timeout(conn, timeout_ms: int | None) -> None: logger.debug("Failed to set store busy timeout", exc_info=True) +def _remaining_store_busy_budget_ms(deadline: float) -> int: + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("brain_store busy budget exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + @contextmanager -def _temporary_store_busy_timeout(conn, timeout_ms: int): +def _temporary_store_busy_timeout(conn, deadline: float): if conn is None: yield return + timeout_ms = _remaining_store_busy_budget_ms(deadline) acquired = _STORE_BUSY_TIMEOUT_LOCK.acquire(timeout=max(timeout_ms, 1) / 1000) if not acquired: raise apsw.BusyError("brain_store busy_timeout lock wait exceeded") original_busy_timeout_ms = _connection_busy_timeout_ms(conn) try: + timeout_ms = _remaining_store_busy_budget_ms(deadline) _set_connection_busy_timeout(conn, timeout_ms) yield finally: @@ -643,8 +652,8 @@ async def _store_memory_with_retries(store_memory, **kwargs): if remaining_ms <= 0 and last_err is not None: raise last_err try: - with _temporary_store_busy_timeout(conn, remaining_ms): - return store_memory(**kwargs) + with _temporary_store_busy_timeout(conn, deadline): + return store_memory(**kwargs, busy_deadline=deadline) except Exception as exc: if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: raise diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index 77f14e91..c4e8af69 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -50,6 +50,32 @@ logger = logging.getLogger(__name__) VALID_MEMORY_TYPES = ["idea", "mistake", "decision", "learning", "todo", "bookmark", "note", "journal", "issue"] +_MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 + + +def _busy_deadline_timeout_ms(busy_deadline: Optional[float]) -> int | None: + if busy_deadline is None: + return None + remaining_ms = int((busy_deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("store busy budget exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _set_busy_timeout_for_deadline(conn, busy_deadline: Optional[float]) -> None: + timeout_ms = _busy_deadline_timeout_ms(busy_deadline) + if timeout_ms is None: + return + conn.setbusytimeout(timeout_ms) + + +def _sleep_before_busy_retry(delay: float, busy_deadline: Optional[float]) -> None: + if busy_deadline is not None: + remaining = busy_deadline - time.monotonic() + if remaining <= 0 or delay >= remaining: + raise apsw.BusyError("store busy budget exceeded") + delay = min(delay, remaining) + time.sleep(delay) def store_memory( @@ -76,6 +102,7 @@ def store_memory( origin_repo_path: Optional[str] = None, replayed_by: Optional[str] = None, chunk_origin: Optional[str] = None, + busy_deadline: Optional[float] = None, ) -> Dict[str, Any]: """Persistently store a memory into BrainLayer. @@ -107,6 +134,7 @@ def store_memory( origin_repo_path: Optional git root for the fallback's originating repo. replayed_by: Optional replay worker identifier. chunk_origin: Optional explicit origin classification preserved from queued fallback metadata. + busy_deadline: Optional monotonic deadline for internal BusyError retries. Returns: Dict with 'id' (chunk ID) and 'related' (list of similar existing memories). @@ -190,6 +218,7 @@ def store_memory( cursor = store.conn.cursor() transaction_started = False try: + _set_busy_timeout_for_deadline(store.conn, busy_deadline) cursor.execute("BEGIN IMMEDIATE") transaction_started = True pending_reembed = None @@ -339,7 +368,7 @@ def store_memory( cursor.execute("ROLLBACK") if attempt == 4: raise - time.sleep(0.1 * (2**attempt)) + _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: if transaction_started: cursor.execute("ROLLBACK") @@ -352,6 +381,7 @@ def store_memory( cursor = store.conn.cursor() transaction_started = False try: + _set_busy_timeout_for_deadline(store.conn, busy_deadline) cursor.execute("BEGIN IMMEDIATE") transaction_started = True store._upsert_chunk_vector(cursor, reembed_chunk_id, merged_embedding) @@ -363,7 +393,7 @@ def store_memory( cursor.execute("ROLLBACK") if attempt == 4: raise - time.sleep(0.1 * (2**attempt)) + _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: if transaction_started: cursor.execute("ROLLBACK") diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index a703dced..1992f019 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -2,6 +2,7 @@ import asyncio import json +import threading import time from unittest.mock import MagicMock, patch @@ -171,6 +172,126 @@ def locked_store_memory(**kwargs): assert store.conn.timeout_ms == 5000 +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_store_memory_inner_retry_loop(tmp_path, monkeypatch): + """The MCP budget must bound store_memory's internal BEGIN IMMEDIATE retry loop.""" + import brainlayer.store as store_module + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + real_sleep = time.sleep + begin_attempts = 0 + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql, *args): + nonlocal begin_attempts + if sql == "PRAGMA busy_timeout": + return self + if sql == "BEGIN IMMEDIATE": + begin_attempts += 1 + real_sleep(self.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + raise AssertionError(f"unexpected SQL: {sql}") + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + monkeypatch.setattr(store_module.time, "sleep", lambda delay: real_sleep(min(delay, 0.001))) + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=store), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="inner retries must respect mcp busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.18 + assert begin_attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + + +@pytest.mark.asyncio +async def test_store_busy_budget_refreshes_after_timeout_lock_wait(monkeypatch): + """Time spent waiting for the timeout lock must count against the store budget.""" + from brainlayer.mcp import store_handler + + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def locked_store_memory(**kwargs): + real_sleep(store.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "200") + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 1) + + store_handler._STORE_BUSY_TIMEOUT_LOCK.acquire() + timer = threading.Timer(0.15, store_handler._STORE_BUSY_TIMEOUT_LOCK.release) + timer.start() + try: + started = time.perf_counter() + result = await store_handler._store_memory_with_retries(locked_store_memory, store=store) + except apsw.BusyError as exc: + result = exc + elapsed = time.perf_counter() - started + finally: + timer.cancel() + if store_handler._STORE_BUSY_TIMEOUT_LOCK.locked(): + store_handler._STORE_BUSY_TIMEOUT_LOCK.release() + + assert isinstance(result, apsw.BusyError) + assert elapsed < 0.28 + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From f0f50d9bce754608079a63901ca5c235aac25e0a Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 09:43:25 +0300 Subject: [PATCH 04/14] fix: include store init in busy budget --- src/brainlayer/mcp/store_handler.py | 15 ++++++-- src/brainlayer/vector_store.py | 60 ++++++++++++++++++++++++++++- tests/test_store_handler.py | 48 +++++++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index a2ceb33e..f84ace6b 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -47,6 +47,10 @@ def _store_busy_budget_ms() -> int: return _positive_int_env("BRAINLAYER_STORE_BUSY_BUDGET_MS", _DEFAULT_STORE_BUSY_BUDGET_MS) +def _store_busy_deadline() -> float: + return time.monotonic() + (_store_busy_budget_ms() / 1000) + + def _is_lock_error(exc: BaseException) -> bool: from ..vector_store import WriterInUseError @@ -642,10 +646,11 @@ def _flush_pending_stores(store, embed_fn) -> int: return flushed -async def _store_memory_with_retries(store_memory, **kwargs): +async def _store_memory_with_retries(store_memory, *, deadline: float | None = None, **kwargs): last_err = None budget_ms = _store_busy_budget_ms() - deadline = time.monotonic() + (budget_ms / 1000) + if deadline is None: + deadline = _store_busy_deadline() conn = getattr(kwargs.get("store"), "conn", None) for attempt in range(_RETRY_MAX_ATTEMPTS): remaining_ms = int((deadline - time.monotonic()) * 1000) @@ -751,13 +756,17 @@ async def _store( return ([TextContent(type="text", text=format_store_result(promised_chunk_id, queued=True))], structured) from ..store import embed_hot_chunk, embed_pending_chunks, store_memory + from ..vector_store import temporary_write_busy_timeout_ms - store = _get_vector_store() + deadline = _store_busy_deadline() + with temporary_write_busy_timeout_ms(_remaining_store_busy_budget_ms(deadline), deadline=deadline): + store = _get_vector_store() normalized_project = _normalize_project_name(project) # Store WITHOUT embedding — returns immediately (no executor needed) result = await _store_memory_with_retries( store_memory, + deadline=deadline, store=store, embed_fn=None, content=content, diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 30eb3504..9c7be739 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -15,6 +15,7 @@ import subprocess import threading import time +from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional @@ -63,6 +64,7 @@ _DEFAULT_BUSY_TIMEOUT_MS = 30_000 _DEFAULT_READ_BUSY_TIMEOUT_MS = 5_000 _MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 +_WRITE_BUSY_TIMEOUT_STATE = threading.local() def _positive_int_env(name: str, default: int) -> int: @@ -79,6 +81,53 @@ def _read_busy_timeout_ms() -> int: return _positive_int_env("BRAINLAYER_READ_BUSY_TIMEOUT_MS", _DEFAULT_READ_BUSY_TIMEOUT_MS) +def _write_busy_deadline() -> float | None: + deadline = getattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline", None) + if isinstance(deadline, int | float): + return float(deadline) + return None + + +def _remaining_busy_deadline_ms(deadline: float) -> int: + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("write busy deadline exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _write_busy_timeout_ms() -> int: + timeout_ms = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", _DEFAULT_BUSY_TIMEOUT_MS) + deadline = _write_busy_deadline() + if deadline is not None: + timeout_ms = min(int(timeout_ms), _remaining_busy_deadline_ms(deadline)) + return max(1, min(int(timeout_ms), _MAX_APSW_BUSY_TIMEOUT_MS)) + + +@contextmanager +def temporary_write_busy_timeout_ms(timeout_ms: int, *, deadline: float | None = None): + old_timeout = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", None) + old_deadline = getattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline", None) + _WRITE_BUSY_TIMEOUT_STATE.timeout_ms = max(1, min(int(timeout_ms), _MAX_APSW_BUSY_TIMEOUT_MS)) + if deadline is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline") + else: + _WRITE_BUSY_TIMEOUT_STATE.deadline = float(deadline) + try: + yield + finally: + if old_timeout is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms") + else: + _WRITE_BUSY_TIMEOUT_STATE.timeout_ms = old_timeout + if old_deadline is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline") + else: + _WRITE_BUSY_TIMEOUT_STATE.deadline = old_deadline + + def _set_busy_timeout_hook(conn: apsw.Connection) -> None: """Set busy_timeout on every new connection before any other hooks. @@ -86,7 +135,7 @@ def _set_busy_timeout_hook(conn: apsw.Connection) -> None: the Connection() constructor. Without busy_timeout set first, this PRAGMA fails with BusyError when other processes hold the DB lock. """ - timeout_ms = _read_busy_timeout_ms() if conn.readonly("main") else _DEFAULT_BUSY_TIMEOUT_MS + timeout_ms = _read_busy_timeout_ms() if conn.readonly("main") else _write_busy_timeout_ms() conn.setbusytimeout(timeout_ms) @@ -530,7 +579,10 @@ def _init_db_with_retry(self) -> None: last_err = None start = time.monotonic() retry_budget = max(int(self._INIT_MAX_RETRIES), 1) + deadline = _write_busy_deadline() for attempt in range(retry_budget): + if deadline is not None and time.monotonic() >= deadline: + raise last_err or apsw.BusyError("write busy deadline exceeded") try: self._init_db() return @@ -547,6 +599,10 @@ def _init_db_with_retry(self) -> None: f"elapsed {elapsed:.1f}s, retrying in {delay:.1f}s...", file=sys.stderr, ) + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0 or delay >= remaining: + raise last_err time.sleep(delay) raise last_err # type: ignore[misc] @@ -555,7 +611,7 @@ def _init_db(self) -> None: self.conn = apsw.Connection(str(self.db_path)) # Set busy timeout IMMEDIATELY via APSW native method — before any DDL. - self.conn.setbusytimeout(10_000) # 10 seconds + self.conn.setbusytimeout(_write_busy_timeout_ms()) self.conn.enableloadextension(True) self.conn.loadextension(sqlite_vec.loadable_path()) diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 1992f019..a7ebdc8f 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -292,6 +292,54 @@ def locked_store_memory(**kwargs): assert elapsed < 0.28 +@pytest.mark.asyncio +async def test_store_busy_budget_covers_cold_vector_store_init(tmp_path, monkeypatch): + """The store budget must include cold VectorStore init before queuing.""" + import brainlayer.store # noqa: F401 + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + from brainlayer.vector_store import VectorStore + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "busy-init.db" + real_sleep = time.sleep + init_attempts = 0 + + def busy_init(self): + nonlocal init_attempts + init_attempts += 1 + real_sleep(0.075) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr(VectorStore, "_INIT_MAX_RETRIES", 4) + monkeypatch.setattr(VectorStore, "_INIT_BASE_DELAY", 0.001) + monkeypatch.setattr(VectorStore, "_INIT_MAX_DELAY", 0.001) + monkeypatch.setattr(VectorStore, "_acquire_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_release_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_init_db", busy_init) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="cold vector store init must respect busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.22 + assert init_attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From d625e737e43c34297c1e562f192aaa8d734d27b4 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 09:58:07 +0300 Subject: [PATCH 05/14] fix: bound supersede store busy wait --- src/brainlayer/mcp/store_handler.py | 3 +- tests/test_store_handler.py | 65 +++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index f84ace6b..e30212fc 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -793,7 +793,8 @@ async def _store( # If supersedes is set, mark the old chunk as superseded by the new one superseded_ok = None if supersedes: - superseded_ok = store.supersede_chunk(supersedes, chunk_id) + with _temporary_store_busy_timeout(getattr(store, "conn", None), deadline): + superseded_ok = store.supersede_chunk(supersedes, chunk_id) # Schedule background embedding + flush in a single daemon thread. # CRITICAL: must use a separate VectorStore connection — APSW enforces diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index a7ebdc8f..337408b1 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -340,6 +340,71 @@ def busy_init(self): assert structured["deferred"]["action"] == "queued_for_replay" +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_supersede_update(tmp_path, monkeypatch): + """The same store budget must bound the post-store supersede write.""" + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def successful_store_memory(**kwargs): + return {"id": kwargs["chunk_id"], "related": []} + + def locked_supersede(_old_chunk_id, _new_chunk_id): + real_sleep(store.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + + store.supersede_chunk.side_effect = locked_supersede + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=store), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=successful_store_memory), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="supersede update must respect busy budget", + memory_type="note", + project="test", + supersedes="manual-old", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.18 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert json.loads(pending_path.read_text())["supersedes"] == "manual-old" + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From 21eb174683f13aff26d80345749ff8cc12e56171 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 10:06:44 +0300 Subject: [PATCH 06/14] fix: restore store init busy timeout --- src/brainlayer/vector_store.py | 16 ++++++++ tests/test_store_handler.py | 67 ++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 9c7be739..76da69d3 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -103,6 +103,10 @@ def _write_busy_timeout_ms() -> int: return max(1, min(int(timeout_ms), _MAX_APSW_BUSY_TIMEOUT_MS)) +def _default_write_busy_timeout_ms() -> int: + return max(1, min(_DEFAULT_BUSY_TIMEOUT_MS, _MAX_APSW_BUSY_TIMEOUT_MS)) + + @contextmanager def temporary_write_busy_timeout_ms(timeout_ms: int, *, deadline: float | None = None): old_timeout = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", None) @@ -585,6 +589,7 @@ def _init_db_with_retry(self) -> None: raise last_err or apsw.BusyError("write busy deadline exceeded") try: self._init_db() + self._restore_default_write_busy_timeout() return except apsw.Error as e: if not _is_retryable_init_error(e): @@ -606,6 +611,17 @@ def _init_db_with_retry(self) -> None: time.sleep(delay) raise last_err # type: ignore[misc] + def _restore_default_write_busy_timeout(self) -> None: + conn = getattr(self, "conn", None) + if conn is None: + return + try: + if conn.readonly("main"): + return + except apsw.Error: + return + conn.setbusytimeout(_default_write_busy_timeout_ms()) + def _init_db(self) -> None: """Initialize database with vector extension.""" self.conn = apsw.Connection(str(self.db_path)) diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 337408b1..e56d5dca 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -340,6 +340,73 @@ def busy_init(self): assert structured["deferred"]["action"] == "queued_for_replay" +@pytest.mark.asyncio +async def test_store_busy_budget_restores_cold_vector_store_write_timeout(tmp_path, monkeypatch): + """A cold store init may be capped, but the singleton must return to normal writes.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + from brainlayer.vector_store import VectorStore + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "cold-timeout.db" + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = vector_store_module._DEFAULT_BUSY_TIMEOUT_MS + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def readonly(self, _database): + return False + + def close(self): + pass + + def fake_init_db(self): + self.conn = FakeConn() + self.conn.setbusytimeout(vector_store_module._write_busy_timeout_ms()) + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr(VectorStore, "_INIT_MAX_RETRIES", 1) + monkeypatch.setattr(VectorStore, "_acquire_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_release_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_init_db", fake_init_db) + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("database is locked")), + ): + _texts, structured = await _store( + content="cold vector store timeout must restore", + memory_type="note", + project="test", + ) + + assert structured["status"] == "DEFERRED" + assert _shared._vector_store.conn.timeout_ms == vector_store_module._DEFAULT_BUSY_TIMEOUT_MS + + @pytest.mark.asyncio async def test_store_busy_budget_bounds_supersede_update(tmp_path, monkeypatch): """The same store budget must bound the post-store supersede write.""" From 73712fc117e641d34c3c67446fc0e8fad8bbef5c Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 10:18:21 +0300 Subject: [PATCH 07/14] fix: bound store singleton waits --- src/brainlayer/mcp/_shared.py | 9 ++- src/brainlayer/mcp/store_handler.py | 14 ++++- src/brainlayer/store.py | 6 +- tests/test_store_handler.py | 97 +++++++++++++++++++++++++++++ 4 files changed, 120 insertions(+), 6 deletions(-) diff --git a/src/brainlayer/mcp/_shared.py b/src/brainlayer/mcp/_shared.py index 0d98b588..ad10ad28 100644 --- a/src/brainlayer/mcp/_shared.py +++ b/src/brainlayer/mcp/_shared.py @@ -74,16 +74,21 @@ def _bootstrap_search_store(db_path) -> None: bootstrap_store.close() -def _get_vector_store(): +def _get_vector_store(timeout: float | None = None): """Get or initialize the global VectorStore (thread-safe).""" global _vector_store if _vector_store is None: - with _store_lock: + acquired = _store_lock.acquire() if timeout is None else _store_lock.acquire(timeout=max(0.0, timeout)) + if not acquired: + raise apsw.BusyError("timed out waiting for vector store initialization") + try: if _vector_store is None: from ..paths import get_db_path from ..vector_store import VectorStore _vector_store = VectorStore(get_db_path()) + finally: + _store_lock.release() return _vector_store diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index e30212fc..c53747ff 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -658,7 +658,7 @@ async def _store_memory_with_retries(store_memory, *, deadline: float | None = N raise last_err try: with _temporary_store_busy_timeout(conn, deadline): - return store_memory(**kwargs, busy_deadline=deadline) + return store_memory(**kwargs, busy_deadline=deadline, retry_on_busy=False) except Exception as exc: if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: raise @@ -689,6 +689,16 @@ async def _store_memory_with_retries(store_memory, *, deadline: float | None = N raise last_err # type: ignore[misc] +def _get_store_vector_store(deadline: float): + timeout = max(0.0, deadline - time.monotonic()) + try: + return _get_vector_store(timeout=timeout) + except TypeError as exc: + if "timeout" not in str(exc): + raise + return _get_vector_store() + + async def _store( content: str, memory_type: str, @@ -760,7 +770,7 @@ async def _store( deadline = _store_busy_deadline() with temporary_write_busy_timeout_ms(_remaining_store_busy_budget_ms(deadline), deadline=deadline): - store = _get_vector_store() + store = _get_store_vector_store(deadline) normalized_project = _normalize_project_name(project) # Store WITHOUT embedding — returns immediately (no executor needed) diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index c4e8af69..281f71f1 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -103,6 +103,7 @@ def store_memory( replayed_by: Optional[str] = None, chunk_origin: Optional[str] = None, busy_deadline: Optional[float] = None, + retry_on_busy: bool = True, ) -> Dict[str, Any]: """Persistently store a memory into BrainLayer. @@ -135,6 +136,7 @@ def store_memory( replayed_by: Optional replay worker identifier. chunk_origin: Optional explicit origin classification preserved from queued fallback metadata. busy_deadline: Optional monotonic deadline for internal BusyError retries. + retry_on_busy: Whether store_memory should run its own synchronous BusyError retry loop. Returns: Dict with 'id' (chunk ID) and 'related' (list of similar existing memories). @@ -366,7 +368,7 @@ def store_memory( except apsw.BusyError: if transaction_started: cursor.execute("ROLLBACK") - if attempt == 4: + if not retry_on_busy or attempt == 4: raise _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: @@ -391,7 +393,7 @@ def store_memory( except apsw.BusyError: if transaction_started: cursor.execute("ROLLBACK") - if attempt == 4: + if not retry_on_busy or attempt == 4: raise _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index e56d5dca..79cd74c7 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -407,6 +407,103 @@ def fake_init_db(self): assert _shared._vector_store.conn.timeout_ms == vector_store_module._DEFAULT_BUSY_TIMEOUT_MS +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_singleton_init_lock_wait(tmp_path, monkeypatch): + """Waiting for another cold VectorStore init must not exceed the store budget.""" + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "singleton-lock.db" + + class FakeVectorStore: + def __init__(self, path): + self.db_path = path + self.conn = MagicMock() + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr("brainlayer.vector_store.VectorStore", FakeVectorStore) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("database is locked")), + ): + _shared._store_lock.acquire() + timer = threading.Timer(0.24, _shared._store_lock.release) + timer.start() + try: + started = time.perf_counter() + _texts, structured = await _store( + content="store init lock wait must respect busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + finally: + timer.cancel() + if _shared._store_lock.locked(): + _shared._store_lock.release() + + assert elapsed < 0.18 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert _shared._vector_store is None + + +@pytest.mark.asyncio +async def test_store_busy_budget_disables_store_memory_internal_busy_sleep(monkeypatch): + """Outer MCP retries should own busy backoff instead of holding the timeout lock.""" + from brainlayer.mcp import store_handler + + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + retry_flags = [] + + def internally_retrying_store_memory(**kwargs): + retry_flags.append(kwargs.get("retry_on_busy")) + if kwargs.get("retry_on_busy") is not False: + real_sleep(0.12) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 1) + + started = time.perf_counter() + with pytest.raises(apsw.BusyError): + await store_handler._store_memory_with_retries(internally_retrying_store_memory, store=store) + elapsed = time.perf_counter() - started + + assert elapsed < 0.16 + assert retry_flags == [False] + + @pytest.mark.asyncio async def test_store_busy_budget_bounds_supersede_update(tmp_path, monkeypatch): """The same store budget must bound the post-store supersede write.""" From a97a3229b5abce84a7168055e9bfa7ebd1af4f08 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 10:30:20 +0300 Subject: [PATCH 08/14] fix: apply supersedes on replayed stores --- src/brainlayer/drain.py | 36 +++++++++++++++++------------- tests/test_arbitration.py | 47 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 15 deletions(-) diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index c75d213a..2213b6bf 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -441,6 +441,24 @@ def _run_enrichment_provenance_hooks( logger.exception("drain auto_supersede failed for chunk_id=%s", chunk_id) +def _apply_store_supersedes(conn: apsw.Connection, old_chunk_id: str | None, new_chunk_id: str) -> None: + if not old_chunk_id: + return + cols = _columns(conn, "chunks") + if "superseded_by" not in cols: + return + if "status" in cols: + conn.execute( + "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", + (new_chunk_id, old_chunk_id), + ) + else: + conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (new_chunk_id, old_chunk_id)) + for vector_table in ("chunk_vectors", "chunk_vectors_binary"): + if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone(): + conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (old_chunk_id,)) + + def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: raw_content = event.get("content") if raw_content is None: @@ -468,6 +486,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: metadata.update(raw_metadata) elif raw_metadata: logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id")) + supersedes = event.get("supersedes") or metadata.get("supersedes") tags = event.get("tags") explicit_chunk_origin = event.get("chunk_origin") or metadata.get("chunk_origin") existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() @@ -485,6 +504,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: "last_seen_at": now, }, ) + _apply_store_supersedes(conn, supersedes, chunk_id) return ApplyResult(chunk_id=chunk_id) return ApplyResult(collision_chunk_id=chunk_id) stored_chunk_id = _insert_or_merge_chunk( @@ -509,21 +529,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: "chunk_origin": detect_chunk_origin(content, explicit_chunk_origin), }, ) - supersedes = event.get("supersedes") or metadata.get("supersedes") - cols = _columns(conn, "chunks") - if supersedes and "superseded_by" in cols: - if "status" in cols: - conn.execute( - "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", - (stored_chunk_id, supersedes), - ) - else: - conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (stored_chunk_id, supersedes)) - for vector_table in ("chunk_vectors", "chunk_vectors_binary"): - if conn.execute( - "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) - ).fetchone(): - conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) + _apply_store_supersedes(conn, supersedes, stored_chunk_id) entity_id = event.get("entity_id") or metadata.get("entity_id") if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)): if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone(): diff --git a/tests/test_arbitration.py b/tests/test_arbitration.py index 3bea2f75..8eb5391c 100644 --- a/tests/test_arbitration.py +++ b/tests/test_arbitration.py @@ -515,6 +515,53 @@ def test_queue_sanitizes_source_and_drain_preserves_supersedes(tmp_path): assert entity_link == replacement_id +def test_drain_existing_promised_chunk_still_applies_supersedes(tmp_path, monkeypatch): + """A queued replay after post-store supersede contention must finish lifecycle work.""" + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('old-id', 'old content', '{}', 'seed') + """ + ) + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('manual-new1234', 'replacement content', '{}', 'mcp') + """ + ) + conn.commit() + + (queue_dir / "store-replay.jsonl").write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-new1234", + "content": "replacement content", + "memory_type": "note", + "project": "arbitration-test", + "supersedes": "old-id", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + superseded_by = conn.execute("SELECT superseded_by FROM chunks WHERE id = 'old-id'").fetchone()[0] + + assert superseded_by == "manual-new1234" + + def test_drain_store_events_merge_duplicates_and_write_alias(tmp_path): from brainlayer.drain import drain_once from brainlayer.queue_io import enqueue_store From e48b2edcdbe520b6c899119f16f9e51f96dc8a16 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 10:39:43 +0300 Subject: [PATCH 09/14] fix: validate store before busy deferral --- src/brainlayer/mcp/store_handler.py | 31 ++++++++++++++++++----------- tests/test_store_handler.py | 22 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index c53747ff..f95767c3 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -699,6 +699,22 @@ def _get_store_vector_store(deadline: float): return _get_vector_store() +def _validate_store_request(content: str, memory_type: str) -> str: + from ..ingest_guard import reject_recursive_mcp_output + from ..pipeline.classify import looks_like_system_prompt + from ..store import VALID_MEMORY_TYPES + + if not content or not content.strip(): + raise ValueError("content must be non-empty") + content = content.strip() + if memory_type not in VALID_MEMORY_TYPES: + raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") + reject_recursive_mcp_output(content) + if looks_like_system_prompt(content): + raise ValueError("system prompt content is not stored in BrainLayer") + return content + + async def _store( content: str, memory_type: str, @@ -725,20 +741,11 @@ async def _store( promised_chunk_id = _new_manual_chunk_id() reservation_created_at = datetime.now(timezone.utc).isoformat() try: + content = _validate_store_request(content, memory_type) + if os.environ.get("BRAINLAYER_ARBITRATED") == "1": - from ..ingest_guard import reject_recursive_mcp_output - from ..pipeline.classify import looks_like_system_prompt from ..search_repo import clear_hybrid_search_cache - from ..store import VALID_MEMORY_TYPES - - if not content or not content.strip(): - raise ValueError("content must be non-empty") - content = content.strip() - if memory_type not in VALID_MEMORY_TYPES: - raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") - reject_recursive_mcp_output(content) - if looks_like_system_prompt(content): - raise ValueError("system prompt content is not stored in BrainLayer") + queue_path = _queue_store( { "chunk_id": promised_chunk_id, diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 79cd74c7..0c51ca4b 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -71,6 +71,28 @@ async def test_busy_queue_fallback_returns_loud_deferred_receipt(tmp_path): assert any("DEFERRED" in item.text for item in texts) +@pytest.mark.asyncio +async def test_store_validates_before_busy_deferral(tmp_path): + """Busy before store_memory must not queue requests that validation would reject.""" + from brainlayer.mcp.store_handler import _store + + queue_dir = tmp_path / "queue" + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", side_effect=apsw.BusyError("database is locked")), + patch("brainlayer.queue_io.get_queue_dir", return_value=queue_dir), + ): + result = await _store( + content="invalid memory type should never be queued", + memory_type="invalid_type", + project="test", + ) + + assert result.isError is True + assert "Validation error" in result.content[0].text + assert not list(queue_dir.glob("mcp-*.jsonl")) + + @pytest.mark.asyncio async def test_store_busy_budget_defers_promptly_and_pending_flush_replays(tmp_path, monkeypatch): """A held write lock must hit the bounded store budget, defer, and replay later.""" From 315c1c235f1049d71895001460f2f9ff1581805d Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 11:02:48 +0300 Subject: [PATCH 10/14] fix: refresh init busy deadline per statement --- src/brainlayer/vector_store.py | 35 ++++++++++++++++ tests/test_store_handler.py | 75 ++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 76da69d3..c7c8d782 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -65,6 +65,7 @@ _DEFAULT_READ_BUSY_TIMEOUT_MS = 5_000 _MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 _WRITE_BUSY_TIMEOUT_STATE = threading.local() +_NO_EXEC_TRACE = object() def _positive_int_env(name: str, default: int) -> int: @@ -107,6 +108,34 @@ def _default_write_busy_timeout_ms() -> int: return max(1, min(_DEFAULT_BUSY_TIMEOUT_MS, _MAX_APSW_BUSY_TIMEOUT_MS)) +def _refresh_write_busy_timeout_for_deadline(conn: apsw.Connection) -> None: + if _write_busy_deadline() is None: + return + conn.setbusytimeout(_write_busy_timeout_ms()) + + +def _install_write_busy_deadline_trace(conn: apsw.Connection): + if _write_busy_deadline() is None: + return _NO_EXEC_TRACE + old_trace = conn.getexectrace() + + def refresh_timeout(cursor, statement, bindings): + _refresh_write_busy_timeout_for_deadline(conn) + if old_trace is None: + return True + result = old_trace(cursor, statement, bindings) + return True if result is None else result + + conn.setexectrace(refresh_timeout) + return old_trace + + +def _restore_write_busy_deadline_trace(conn: apsw.Connection, old_trace) -> None: + if old_trace is _NO_EXEC_TRACE: + return + conn.setexectrace(old_trace) + + @contextmanager def temporary_write_busy_timeout_ms(timeout_ms: int, *, deadline: float | None = None): old_timeout = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", None) @@ -615,6 +644,11 @@ def _restore_default_write_busy_timeout(self) -> None: conn = getattr(self, "conn", None) if conn is None: return + _restore_write_busy_deadline_trace( + conn, + getattr(self, "_init_write_busy_deadline_trace", _NO_EXEC_TRACE), + ) + self._init_write_busy_deadline_trace = _NO_EXEC_TRACE try: if conn.readonly("main"): return @@ -628,6 +662,7 @@ def _init_db(self) -> None: # Set busy timeout IMMEDIATELY via APSW native method — before any DDL. self.conn.setbusytimeout(_write_busy_timeout_ms()) + self._init_write_busy_deadline_trace = _install_write_busy_deadline_trace(self.conn) self.conn.enableloadextension(True) self.conn.loadextension(sqlite_vec.loadable_path()) diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 0c51ca4b..6ed62029 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -429,6 +429,81 @@ def fake_init_db(self): assert _shared._vector_store.conn.timeout_ms == vector_store_module._DEFAULT_BUSY_TIMEOUT_MS +def test_store_busy_budget_refreshes_cold_init_timeout_between_statements(tmp_path, monkeypatch): + """Cold init must spend one absolute store budget across multiple DDL statements.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.vector_store import VectorStore, temporary_write_busy_timeout_ms + + class StopInit(Exception): + pass + + fake_clock = {"now": 0.0} + seen_timeouts = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql, *args): + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + seen_timeouts.append(self.conn.timeout_ms) + fake_clock["now"] += 0.04 + if len(seen_timeouts) == 3: + raise StopInit + return self + + def fetchone(self): + return None + + def __iter__(self): + return iter(()) + + class FakeConnection: + def __init__(self, path): + self.path = path + self.timeout_ms = None + self.exec_trace = None + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + def enableloadextension(self, enabled): + self.load_extension_enabled = enabled + + def loadextension(self, path): + self.loaded_extension = path + + def cursor(self): + return FakeCursor(self) + + def readonly(self, name): + return False + + monkeypatch.setattr(vector_store_module.apsw, "Connection", FakeConnection) + monkeypatch.setattr(vector_store_module.sqlite_vec, "loadable_path", lambda: "sqlite-vec") + monkeypatch.setattr(vector_store_module.time, "monotonic", lambda: fake_clock["now"]) + + store = object.__new__(VectorStore) + store.db_path = tmp_path / "cold-init-deadline.db" + + with temporary_write_busy_timeout_ms(200, deadline=0.2), pytest.raises(StopInit): + store._init_db() + + assert len(seen_timeouts) == 3 + assert seen_timeouts[1] < seen_timeouts[0] + assert seen_timeouts[2] < seen_timeouts[1] + + @pytest.mark.asyncio async def test_store_busy_budget_bounds_singleton_init_lock_wait(tmp_path, monkeypatch): """Waiting for another cold VectorStore init must not exceed the store budget.""" From 0fa85210c387cf829ead430400c64ad8367c9f02 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 11:22:52 +0300 Subject: [PATCH 11/14] fix: refresh store busy deadline per statement --- src/brainlayer/mcp/store_handler.py | 31 +++++++++++++++ tests/test_store_handler.py | 62 +++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index f95767c3..83a5412c 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -31,6 +31,7 @@ _DEFAULT_STORE_BUSY_BUDGET_MS = 3_000 _MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 _STORE_BUSY_TIMEOUT_LOCK = threading.Lock() +_NO_EXEC_TRACE = object() def _positive_int_env(name: str, default: int) -> int: @@ -551,6 +552,25 @@ def _set_connection_busy_timeout(conn, timeout_ms: int | None) -> None: logger.debug("Failed to set store busy timeout", exc_info=True) +def _connection_exec_trace(conn): + if conn is None or not hasattr(conn, "getexectrace"): + return _NO_EXEC_TRACE + try: + return conn.getexectrace() + except Exception: + logger.debug("Failed to read store exec trace", exc_info=True) + return _NO_EXEC_TRACE + + +def _set_connection_exec_trace(conn, trace) -> None: + if conn is None or not hasattr(conn, "setexectrace") or trace is _NO_EXEC_TRACE: + return + try: + conn.setexectrace(trace) + except Exception: + logger.debug("Failed to set store exec trace", exc_info=True) + + def _remaining_store_busy_budget_ms(deadline: float) -> int: remaining_ms = int((deadline - time.monotonic()) * 1000) if remaining_ms <= 0: @@ -570,11 +590,22 @@ def _temporary_store_busy_timeout(conn, deadline: float): raise apsw.BusyError("brain_store busy_timeout lock wait exceeded") original_busy_timeout_ms = _connection_busy_timeout_ms(conn) + original_exec_trace = _connection_exec_trace(conn) + + def refresh_timeout(cursor, statement, bindings): + _set_connection_busy_timeout(conn, _remaining_store_busy_budget_ms(deadline)) + if original_exec_trace is _NO_EXEC_TRACE or original_exec_trace is None: + return True + result = original_exec_trace(cursor, statement, bindings) + return True if result is None else result + try: timeout_ms = _remaining_store_busy_budget_ms(deadline) _set_connection_busy_timeout(conn, timeout_ms) + _set_connection_exec_trace(conn, refresh_timeout) yield finally: + _set_connection_exec_trace(conn, original_exec_trace) _set_connection_busy_timeout(conn, original_busy_timeout_ms) _STORE_BUSY_TIMEOUT_LOCK.release() diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 6ed62029..f93cd35a 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -666,6 +666,68 @@ def locked_supersede(_old_chunk_id, _new_chunk_id): assert json.loads(pending_path.read_text())["supersedes"] == "manual-old" +def test_store_busy_budget_refreshes_supersede_timeout_between_statements(monkeypatch): + """Store busy context must refresh the deadline before each SQL statement.""" + from brainlayer.mcp import store_handler + + fake_clock = {"now": 0.0} + seen_timeouts = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + self._pragma = False + + def execute(self, sql, *args): + if sql == "PRAGMA busy_timeout": + self._pragma = True + return self + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + seen_timeouts.append(self.conn.timeout_ms) + fake_clock["now"] += 0.04 + return self + + def fetchone(self): + if self._pragma: + return (self.conn.timeout_ms,) + return None + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + self.exec_trace = None + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + conn = FakeConn() + monkeypatch.setattr(store_handler.time, "monotonic", lambda: fake_clock["now"]) + + with store_handler._temporary_store_busy_timeout(conn, 0.2): + cursor = conn.cursor() + cursor.execute("SELECT 1 FROM chunks WHERE id = ?", ("old",)) + cursor.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", ("new", "old")) + cursor.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", ("old",)) + + assert seen_timeouts[1] < seen_timeouts[0] + assert seen_timeouts[2] < seen_timeouts[1] + assert conn.timeout_ms == 250 + assert conn.exec_trace is None + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From 62bb38088909eaafafd7487fdb82214e427d944a Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 11:49:15 +0300 Subject: [PATCH 12/14] fix: allow store busy rollback cleanup --- src/brainlayer/mcp/store_handler.py | 21 +++++++-- tests/test_store_handler.py | 67 +++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index 83a5412c..e85dda76 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -571,6 +571,20 @@ def _set_connection_exec_trace(conn, trace) -> None: logger.debug("Failed to set store exec trace", exc_info=True) +def _is_store_busy_cleanup_statement(statement) -> bool: + if not isinstance(statement, str): + return False + sql = statement.lstrip().upper() + return sql.startswith("ROLLBACK") or sql.startswith("RELEASE") + + +def _run_connection_exec_trace(trace, cursor, statement, bindings): + if trace is _NO_EXEC_TRACE or trace is None: + return True + result = trace(cursor, statement, bindings) + return True if result is None else result + + def _remaining_store_busy_budget_ms(deadline: float) -> int: remaining_ms = int((deadline - time.monotonic()) * 1000) if remaining_ms <= 0: @@ -593,11 +607,10 @@ def _temporary_store_busy_timeout(conn, deadline: float): original_exec_trace = _connection_exec_trace(conn) def refresh_timeout(cursor, statement, bindings): + if _is_store_busy_cleanup_statement(statement): + return _run_connection_exec_trace(original_exec_trace, cursor, statement, bindings) _set_connection_busy_timeout(conn, _remaining_store_busy_budget_ms(deadline)) - if original_exec_trace is _NO_EXEC_TRACE or original_exec_trace is None: - return True - result = original_exec_trace(cursor, statement, bindings) - return True if result is None else result + return _run_connection_exec_trace(original_exec_trace, cursor, statement, bindings) try: timeout_ms = _remaining_store_busy_budget_ms(deadline) diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index f93cd35a..38a42917 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -728,6 +728,73 @@ def setexectrace(self, trace): assert conn.exec_trace is None +def test_store_busy_budget_allows_expired_rollback_cleanup(monkeypatch): + """Rollback cleanup must not be blocked by the expired busy-budget trace.""" + from brainlayer.mcp import store_handler + + fake_clock = {"now": 0.0} + executed = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + self._pragma = False + + def execute(self, sql, *args): + if sql == "PRAGMA busy_timeout": + self._pragma = True + return self + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + executed.append(sql) + return self + + def fetchone(self): + if self._pragma: + return (self.conn.timeout_ms,) + return None + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + self.exec_trace = None + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + conn = FakeConn() + monkeypatch.setattr(store_handler.time, "monotonic", lambda: fake_clock["now"]) + + with store_handler._temporary_store_busy_timeout(conn, 0.1): + cursor = conn.cursor() + cursor.execute("BEGIN IMMEDIATE") + fake_clock["now"] = 0.11 + cursor.execute("ROLLBACK") + cursor.execute("ROLLBACK TO store_memory") + cursor.execute("RELEASE store_memory") + + assert executed == [ + "BEGIN IMMEDIATE", + "ROLLBACK", + "ROLLBACK TO store_memory", + "RELEASE store_memory", + ] + assert conn.timeout_ms == 250 + assert conn.exec_trace is None + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" From e9402305d9b0bd4dd456b5554622975e8e9396b5 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 12:28:07 +0300 Subject: [PATCH 13/14] fix: initialize drain schema for queued stores --- src/brainlayer/drain.py | 124 +++++++++++++++++++++++++------------- tests/test_write_queue.py | 36 +++++++++++ 2 files changed, 119 insertions(+), 41 deletions(-) diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index 2213b6bf..34a960cd 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -174,6 +174,20 @@ def _open_connection(db_path: Path) -> apsw.Connection: return conn +def _ensure_drain_db_schema(db_path: Path) -> None: + from .vector_store import VectorStore + + store = VectorStore(db_path) + store.close() + + +def _db_needs_initial_schema(db_path: Path) -> bool: + try: + return not db_path.exists() or db_path.stat().st_size == 0 + except OSError: + return False + + def _acquire_queue_lock(queue_dir: Path) -> int: fd = os.open(queue_dir, os.O_RDONLY) try: @@ -312,6 +326,14 @@ def _event_payload(event: dict[str, Any]) -> dict[str, Any]: return {"kind": "unknown", **event} +def _events_include_store(events: list[dict[str, Any]]) -> bool: + return any(_event_payload(event).get("kind") == "store_memory" for event in events) + + +def _is_missing_chunks_error(exc: BaseException) -> bool: + return "no such table: chunks" in str(exc).lower() + + def _event_created_at(event: dict[str, Any]) -> str: raw_created_at = event.get("created_at") if raw_created_at: @@ -985,49 +1007,60 @@ def burn_drain_once( return result all_events = [event for _, events in batch for event in events] - conn = _open_connection(db_path) - try: - _ensure_enrichment_update_schema(conn) - conn.execute("BEGIN IMMEDIATE") - prefetched_state = _prefetch_enrichment_state(conn, all_events) - store_chunk_ids: list[str] = [] - for _, events in batch: - for event in events: - if _is_verified_redundant_enrichment(event, prefetched_state): - payload = _event_payload(event) - enqueue_provenance_resolution_for_entities( - conn, - payload.get("entities"), - chunk_id=payload.get("chunk_id"), - commit=False, - ) - result.skipped_verified_stale += 1 - continue - applied = _apply_event(conn, event) - result.applied_events += 1 - if applied.chunk_id: - store_chunk_ids.append(applied.chunk_id) - if _embedding_enabled(): - _embed_store_chunks(conn, store_chunk_ids, embed_fn) - conn.execute("COMMIT") - conn.setbusytimeout(_checkpoint_busy_timeout_ms()) + batch_includes_store = _events_include_store(all_events) + if batch_includes_store and _db_needs_initial_schema(db_path): + _ensure_drain_db_schema(db_path) + for schema_attempt in range(2): + conn = _open_connection(db_path) try: - conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") - result.checkpoints += 1 - except apsw.Error as exc: - _log(log_path, f"burn drain checkpoint skipped: {exc}") + _ensure_enrichment_update_schema(conn) + conn.execute("BEGIN IMMEDIATE") + prefetched_state = _prefetch_enrichment_state(conn, all_events) + store_chunk_ids: list[str] = [] + for _, events in batch: + for event in events: + if _is_verified_redundant_enrichment(event, prefetched_state): + payload = _event_payload(event) + enqueue_provenance_resolution_for_entities( + conn, + payload.get("entities"), + chunk_id=payload.get("chunk_id"), + commit=False, + ) + result.skipped_verified_stale += 1 + continue + applied = _apply_event(conn, event) + result.applied_events += 1 + if applied.chunk_id: + store_chunk_ids.append(applied.chunk_id) + if _embedding_enabled(): + _embed_store_chunks(conn, store_chunk_ids, embed_fn) + conn.execute("COMMIT") + conn.setbusytimeout(_checkpoint_busy_timeout_ms()) + try: + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + result.checkpoints += 1 + except apsw.Error as exc: + _log(log_path, f"burn drain checkpoint skipped: {exc}") + finally: + conn.setbusytimeout(_drain_busy_timeout_ms()) + break + except Exception as exc: + try: + conn.execute("ROLLBACK") + except Exception: + pass + if batch_includes_store and _is_missing_chunks_error(exc) and schema_attempt == 0: + conn.close() + conn = None + _ensure_drain_db_schema(db_path) + continue + result.failed_files = len(batch) + _log(log_path, f"burn drain failed; batch preserved: {exc}") + return result finally: - conn.setbusytimeout(_drain_busy_timeout_ms()) - except Exception as exc: - try: - conn.execute("ROLLBACK") - except Exception: - pass - result.failed_files = len(batch) - _log(log_path, f"burn drain failed; batch preserved: {exc}") - return result - finally: - conn.close() + if conn is not None: + conn.close() for path, _events in batch: try: @@ -1086,6 +1119,9 @@ def drain_once( continue events_to_apply = events[: _max_events_per_transaction()] remaining_events = events[len(events_to_apply) :] + events_include_store = _events_include_store(events_to_apply) + if events_include_store and _db_needs_initial_schema(db_path): + _ensure_drain_db_schema(db_path) for attempt in range(5): conn: apsw.Connection | None = None @@ -1146,6 +1182,12 @@ def drain_once( _log(log_path, f"drain busy; retrying in {delay:.2f}s") time.sleep(delay) continue + if events_include_store and _is_missing_chunks_error(exc) and attempt < 4: + if conn is not None: + conn.close() + conn = None + _ensure_drain_db_schema(db_path) + continue _log(log_path, f"drain failed for {path.name}: {exc}") break finally: diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py index 155f27a3..1b1c0060 100644 --- a/tests/test_write_queue.py +++ b/tests/test_write_queue.py @@ -284,6 +284,42 @@ def test_flush_drains_queue(self, tmp_path): assert flushed == 1 assert not pending_path.exists() # File deleted after full flush + def test_drain_store_event_initializes_missing_schema(self, tmp_path, monkeypatch): + """Queued store replay must recover if the first store deferred before schema init.""" + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-fresh-schema.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-fresh-schema", + "content": "fresh queued store should create schema before replay", + "memory_type": "note", + "project": "test", + "created_at": "2026-06-19T08:54:00Z", + } + ) + + "\n" + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1, log_path=log_path) == 1 + assert not queued.exists() + with sqlite3.connect(db_path) as conn: + row = conn.execute( + "SELECT id, content, project FROM chunks WHERE id = ?", + ("manual-fresh-schema",), + ).fetchone() + assert row == ( + "manual-fresh-schema", + "fresh queued store should create schema before replay", + "test", + ) + def test_flush_preserves_legacy_pending_chunk_id(self, tmp_path): """Legacy pending-stores flush must persist the caller-visible queued ID.""" pending_path = tmp_path / "pending-stores.jsonl" From c048857219c086ba59ff8a60df62119b403d8b5b Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 19 Jun 2026 12:58:04 +0300 Subject: [PATCH 14/14] fix: preserve queued stores under schema contention --- src/brainlayer/drain.py | 57 +++++++++++++++++++++---- src/brainlayer/vector_store.py | 21 ++++++++-- tests/test_store_handler.py | 53 ++++++++++++++++++++++++ tests/test_write_queue.py | 76 +++++++++++++++++++++++++++++++++- 4 files changed, 195 insertions(+), 12 deletions(-) diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index 34a960cd..50f94ac4 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -188,6 +188,25 @@ def _db_needs_initial_schema(db_path: Path) -> bool: return False +def _ensure_drain_db_schema_preserving_queue( + db_path: Path, + log_path: Path, + *, + context: str, + force: bool = False, +) -> bool: + if not force and not _db_needs_initial_schema(db_path): + return True + try: + _ensure_drain_db_schema(db_path) + return True + except Exception as exc: + if not _is_busy_error(exc): + raise + _log(log_path, f"{context} schema init busy; batch preserved: {exc}") + return False + + def _acquire_queue_lock(queue_dir: Path) -> int: fd = os.open(queue_dir, os.O_RDONLY) try: @@ -786,7 +805,9 @@ def _table_names(conn: apsw.Connection) -> set[str]: def _is_busy_error(exc: BaseException) -> bool: - return _is_sqlite_busy_error(exc) + from .vector_store import WriterInUseError + + return isinstance(exc, WriterInUseError) or _is_sqlite_busy_error(exc) def _default_embed_fn() -> Callable[[str], list[float]]: @@ -1008,8 +1029,13 @@ def burn_drain_once( all_events = [event for _, events in batch for event in events] batch_includes_store = _events_include_store(all_events) - if batch_includes_store and _db_needs_initial_schema(db_path): - _ensure_drain_db_schema(db_path) + if batch_includes_store and not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context="burn drain failed before transaction", + ): + result.failed_files = len(batch) + return result for schema_attempt in range(2): conn = _open_connection(db_path) try: @@ -1053,7 +1079,14 @@ def burn_drain_once( if batch_includes_store and _is_missing_chunks_error(exc) and schema_attempt == 0: conn.close() conn = None - _ensure_drain_db_schema(db_path) + if not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context="burn drain failed before transaction", + force=True, + ): + result.failed_files = len(batch) + return result continue result.failed_files = len(batch) _log(log_path, f"burn drain failed; batch preserved: {exc}") @@ -1120,8 +1153,12 @@ def drain_once( events_to_apply = events[: _max_events_per_transaction()] remaining_events = events[len(events_to_apply) :] events_include_store = _events_include_store(events_to_apply) - if events_include_store and _db_needs_initial_schema(db_path): - _ensure_drain_db_schema(db_path) + if events_include_store and not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context=f"drain failed for {path.name}", + ): + break for attempt in range(5): conn: apsw.Connection | None = None @@ -1186,7 +1223,13 @@ def drain_once( if conn is not None: conn.close() conn = None - _ensure_drain_db_schema(db_path) + if not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context=f"drain failed for {path.name}", + force=True, + ): + break continue _log(log_path, f"drain failed for {path.name}: {exc}") break diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index c7c8d782..bde69713 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -114,17 +114,30 @@ def _refresh_write_busy_timeout_for_deadline(conn: apsw.Connection) -> None: conn.setbusytimeout(_write_busy_timeout_ms()) +def _is_write_busy_cleanup_statement(statement) -> bool: + if not isinstance(statement, str): + return False + sql = statement.lstrip().upper() + return sql.startswith("ROLLBACK") or sql.startswith("RELEASE") + + +def _run_write_busy_exec_trace(trace, cursor, statement, bindings): + if trace is _NO_EXEC_TRACE or trace is None: + return True + result = trace(cursor, statement, bindings) + return True if result is None else result + + def _install_write_busy_deadline_trace(conn: apsw.Connection): if _write_busy_deadline() is None: return _NO_EXEC_TRACE old_trace = conn.getexectrace() def refresh_timeout(cursor, statement, bindings): + if _is_write_busy_cleanup_statement(statement): + return _run_write_busy_exec_trace(old_trace, cursor, statement, bindings) _refresh_write_busy_timeout_for_deadline(conn) - if old_trace is None: - return True - result = old_trace(cursor, statement, bindings) - return True if result is None else result + return _run_write_busy_exec_trace(old_trace, cursor, statement, bindings) conn.setexectrace(refresh_timeout) return old_trace diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 38a42917..9ff2ffe7 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -504,6 +504,59 @@ def readonly(self, name): assert seen_timeouts[2] < seen_timeouts[1] +def test_vector_store_init_trace_allows_expired_rollback_cleanup(monkeypatch): + """Init deadline trace must not block transaction cleanup after expiry.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.vector_store import temporary_write_busy_timeout_ms + + fake_clock = {"now": 0.0} + executed = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + if self.conn.exec_trace is not None: + result = self.conn.exec_trace(self, sql, None) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + executed.append(sql) + return self + + class FakeConn: + def __init__(self): + self.timeout_ms = 100 + self.exec_trace = None + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + def cursor(self): + return FakeCursor(self) + + conn = FakeConn() + monkeypatch.setattr(vector_store_module.time, "monotonic", lambda: fake_clock["now"]) + + with temporary_write_busy_timeout_ms(100, deadline=0.1): + vector_store_module._install_write_busy_deadline_trace(conn) + cursor = conn.cursor() + fake_clock["now"] = 0.11 + cursor.execute("ROLLBACK") + cursor.execute("ROLLBACK TO vector_init") + cursor.execute("RELEASE vector_init") + with pytest.raises(apsw.BusyError): + cursor.execute("CREATE TABLE after_deadline(id TEXT)") + + assert executed == ["ROLLBACK", "ROLLBACK TO vector_init", "RELEASE vector_init"] + + @pytest.mark.asyncio async def test_store_busy_budget_bounds_singleton_init_lock_wait(tmp_path, monkeypatch): """Waiting for another cold VectorStore init must not exceed the store budget.""" diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py index 1b1c0060..ac1ec22e 100644 --- a/tests/test_write_queue.py +++ b/tests/test_write_queue.py @@ -17,7 +17,7 @@ import apsw import pytest -from brainlayer.drain import _open_connection, drain_once +from brainlayer.drain import _open_connection, burn_drain_once, drain_once from brainlayer.mcp.store_handler import ( _flush_pending_stores, _queue_store, @@ -320,6 +320,80 @@ def test_drain_store_event_initializes_missing_schema(self, tmp_path, monkeypatc "test", ) + def test_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): + """Schema bootstrap contention must preserve queued store files for retry.""" + from brainlayer.vector_store import WriterInUseError + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-schema-contended.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-schema-contended", + "content": "store queue should survive contended schema bootstrap", + "memory_type": "note", + "project": "test", + } + ) + + "\n" + ) + + with patch( + "brainlayer.drain._ensure_drain_db_schema", + side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"), + ): + drained = drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1, log_path=log_path) + + assert drained == 0 + assert queued.exists() + assert "batch preserved" in log_path.read_text() + + def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): + """Burn drain must preserve store files if first-run schema init is contended.""" + from brainlayer.vector_store import WriterInUseError + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-schema-contended.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-schema-contended", + "content": "burn drain should preserve contended schema bootstrap", + "memory_type": "note", + "project": "test", + } + ) + + "\n" + ) + + with patch( + "brainlayer.drain._ensure_drain_db_schema", + side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"), + ): + result = burn_drain_once( + db_path=db_path, + queue_dir=queue_dir, + batch_size=1, + log_path=log_path, + ) + + assert result.failed_files == 1 + assert result.files_deleted == 0 + assert queued.exists() + assert "batch preserved" in log_path.read_text() + def test_flush_preserves_legacy_pending_chunk_id(self, tmp_path): """Legacy pending-stores flush must persist the caller-visible queued ID.""" pending_path = tmp_path / "pending-stores.jsonl"