diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index c75d213a..50f94ac4 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -174,6 +174,39 @@ def _open_connection(db_path: Path) -> apsw.Connection: return conn +def _ensure_drain_db_schema(db_path: Path) -> None: + from .vector_store import VectorStore + + store = VectorStore(db_path) + store.close() + + +def _db_needs_initial_schema(db_path: Path) -> bool: + try: + return not db_path.exists() or db_path.stat().st_size == 0 + except OSError: + return False + + +def _ensure_drain_db_schema_preserving_queue( + db_path: Path, + log_path: Path, + *, + context: str, + force: bool = False, +) -> bool: + if not force and not _db_needs_initial_schema(db_path): + return True + try: + _ensure_drain_db_schema(db_path) + return True + except Exception as exc: + if not _is_busy_error(exc): + raise + _log(log_path, f"{context} schema init busy; batch preserved: {exc}") + return False + + def _acquire_queue_lock(queue_dir: Path) -> int: fd = os.open(queue_dir, os.O_RDONLY) try: @@ -312,6 +345,14 @@ def _event_payload(event: dict[str, Any]) -> dict[str, Any]: return {"kind": "unknown", **event} +def _events_include_store(events: list[dict[str, Any]]) -> bool: + return any(_event_payload(event).get("kind") == "store_memory" for event in events) + + +def _is_missing_chunks_error(exc: BaseException) -> bool: + return "no such table: chunks" in str(exc).lower() + + def _event_created_at(event: dict[str, Any]) -> str: raw_created_at = event.get("created_at") if raw_created_at: @@ -441,6 +482,24 @@ def _run_enrichment_provenance_hooks( logger.exception("drain auto_supersede failed for chunk_id=%s", chunk_id) +def _apply_store_supersedes(conn: apsw.Connection, old_chunk_id: str | None, new_chunk_id: str) -> None: + if not old_chunk_id: + return + cols = _columns(conn, "chunks") + if "superseded_by" not in cols: + return + if "status" in cols: + conn.execute( + "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", + (new_chunk_id, old_chunk_id), + ) + else: + conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (new_chunk_id, old_chunk_id)) + for vector_table in ("chunk_vectors", "chunk_vectors_binary"): + if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone(): + conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (old_chunk_id,)) + + def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: raw_content = event.get("content") if raw_content is None: @@ -468,6 +527,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: metadata.update(raw_metadata) elif raw_metadata: logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id")) + supersedes = event.get("supersedes") or metadata.get("supersedes") tags = event.get("tags") explicit_chunk_origin = event.get("chunk_origin") or metadata.get("chunk_origin") existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() @@ -485,6 +545,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: "last_seen_at": now, }, ) + _apply_store_supersedes(conn, supersedes, chunk_id) return ApplyResult(chunk_id=chunk_id) return ApplyResult(collision_chunk_id=chunk_id) stored_chunk_id = _insert_or_merge_chunk( @@ -509,21 +570,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: "chunk_origin": detect_chunk_origin(content, explicit_chunk_origin), }, ) - supersedes = event.get("supersedes") or metadata.get("supersedes") - cols = _columns(conn, "chunks") - if supersedes and "superseded_by" in cols: - if "status" in cols: - conn.execute( - "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", - (stored_chunk_id, supersedes), - ) - else: - conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (stored_chunk_id, supersedes)) - for vector_table in ("chunk_vectors", "chunk_vectors_binary"): - if conn.execute( - "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) - ).fetchone(): - conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) + _apply_store_supersedes(conn, supersedes, stored_chunk_id) entity_id = event.get("entity_id") or metadata.get("entity_id") if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)): if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone(): @@ -758,7 +805,9 @@ def _table_names(conn: apsw.Connection) -> set[str]: def _is_busy_error(exc: BaseException) -> bool: - return _is_sqlite_busy_error(exc) + from .vector_store import WriterInUseError + + return isinstance(exc, WriterInUseError) or _is_sqlite_busy_error(exc) def _default_embed_fn() -> Callable[[str], list[float]]: @@ -979,49 +1028,72 @@ def burn_drain_once( return result all_events = [event for _, events in batch for event in events] - conn = _open_connection(db_path) - try: - _ensure_enrichment_update_schema(conn) - conn.execute("BEGIN IMMEDIATE") - prefetched_state = _prefetch_enrichment_state(conn, all_events) - store_chunk_ids: list[str] = [] - for _, events in batch: - for event in events: - if _is_verified_redundant_enrichment(event, prefetched_state): - payload = _event_payload(event) - enqueue_provenance_resolution_for_entities( - conn, - payload.get("entities"), - chunk_id=payload.get("chunk_id"), - commit=False, - ) - result.skipped_verified_stale += 1 - continue - applied = _apply_event(conn, event) - result.applied_events += 1 - if applied.chunk_id: - store_chunk_ids.append(applied.chunk_id) - if _embedding_enabled(): - _embed_store_chunks(conn, store_chunk_ids, embed_fn) - conn.execute("COMMIT") - conn.setbusytimeout(_checkpoint_busy_timeout_ms()) - try: - conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") - result.checkpoints += 1 - except apsw.Error as exc: - _log(log_path, f"burn drain checkpoint skipped: {exc}") - finally: - conn.setbusytimeout(_drain_busy_timeout_ms()) - except Exception as exc: - try: - conn.execute("ROLLBACK") - except Exception: - pass + batch_includes_store = _events_include_store(all_events) + if batch_includes_store and not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context="burn drain failed before transaction", + ): result.failed_files = len(batch) - _log(log_path, f"burn drain failed; batch preserved: {exc}") return result - finally: - conn.close() + for schema_attempt in range(2): + conn = _open_connection(db_path) + try: + _ensure_enrichment_update_schema(conn) + conn.execute("BEGIN IMMEDIATE") + prefetched_state = _prefetch_enrichment_state(conn, all_events) + store_chunk_ids: list[str] = [] + for _, events in batch: + for event in events: + if _is_verified_redundant_enrichment(event, prefetched_state): + payload = _event_payload(event) + enqueue_provenance_resolution_for_entities( + conn, + payload.get("entities"), + chunk_id=payload.get("chunk_id"), + commit=False, + ) + result.skipped_verified_stale += 1 + continue + applied = _apply_event(conn, event) + result.applied_events += 1 + if applied.chunk_id: + store_chunk_ids.append(applied.chunk_id) + if _embedding_enabled(): + _embed_store_chunks(conn, store_chunk_ids, embed_fn) + conn.execute("COMMIT") + conn.setbusytimeout(_checkpoint_busy_timeout_ms()) + try: + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + result.checkpoints += 1 + except apsw.Error as exc: + _log(log_path, f"burn drain checkpoint skipped: {exc}") + finally: + conn.setbusytimeout(_drain_busy_timeout_ms()) + break + except Exception as exc: + try: + conn.execute("ROLLBACK") + except Exception: + pass + if batch_includes_store and _is_missing_chunks_error(exc) and schema_attempt == 0: + conn.close() + conn = None + if not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context="burn drain failed before transaction", + force=True, + ): + result.failed_files = len(batch) + return result + continue + result.failed_files = len(batch) + _log(log_path, f"burn drain failed; batch preserved: {exc}") + return result + finally: + if conn is not None: + conn.close() for path, _events in batch: try: @@ -1080,6 +1152,13 @@ def drain_once( continue events_to_apply = events[: _max_events_per_transaction()] remaining_events = events[len(events_to_apply) :] + events_include_store = _events_include_store(events_to_apply) + if events_include_store and not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context=f"drain failed for {path.name}", + ): + break for attempt in range(5): conn: apsw.Connection | None = None @@ -1140,6 +1219,18 @@ def drain_once( _log(log_path, f"drain busy; retrying in {delay:.2f}s") time.sleep(delay) continue + if events_include_store and _is_missing_chunks_error(exc) and attempt < 4: + if conn is not None: + conn.close() + conn = None + if not _ensure_drain_db_schema_preserving_queue( + db_path, + log_path, + context=f"drain failed for {path.name}", + force=True, + ): + break + continue _log(log_path, f"drain failed for {path.name}: {exc}") break finally: diff --git a/src/brainlayer/mcp/_shared.py b/src/brainlayer/mcp/_shared.py index 0d98b588..ad10ad28 100644 --- a/src/brainlayer/mcp/_shared.py +++ b/src/brainlayer/mcp/_shared.py @@ -74,16 +74,21 @@ def _bootstrap_search_store(db_path) -> None: bootstrap_store.close() -def _get_vector_store(): +def _get_vector_store(timeout: float | None = None): """Get or initialize the global VectorStore (thread-safe).""" global _vector_store if _vector_store is None: - with _store_lock: + acquired = _store_lock.acquire() if timeout is None else _store_lock.acquire(timeout=max(0.0, timeout)) + if not acquired: + raise apsw.BusyError("timed out waiting for vector store initialization") + try: if _vector_store is None: from ..paths import get_db_path from ..vector_store import VectorStore _vector_store = VectorStore(get_db_path()) + finally: + _store_lock.release() return _vector_store diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index bd2bd89e..e85dda76 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -5,6 +5,7 @@ import json import os import threading +import time import uuid from contextlib import contextmanager from datetime import datetime, timezone @@ -27,6 +28,28 @@ _RETRY_MAX_ATTEMPTS = 4 _retry_delay = 0.15 # base delay in seconds (exposed for test patching) _QUEUE_MAX_SIZE = 100 +_DEFAULT_STORE_BUSY_BUDGET_MS = 3_000 +_MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 +_STORE_BUSY_TIMEOUT_LOCK = threading.Lock() +_NO_EXEC_TRACE = object() + + +def _positive_int_env(name: str, default: int) -> int: + try: + value = int(os.environ.get(name, str(default))) + except (TypeError, ValueError): + return default + if value <= 0 or value > _MAX_APSW_BUSY_TIMEOUT_MS: + return default + return value + + +def _store_busy_budget_ms() -> int: + return _positive_int_env("BRAINLAYER_STORE_BUSY_BUDGET_MS", _DEFAULT_STORE_BUSY_BUDGET_MS) + + +def _store_busy_deadline() -> float: + return time.monotonic() + (_store_busy_budget_ms() / 1000) def _is_lock_error(exc: BaseException) -> bool: @@ -510,6 +533,96 @@ def _deferred_store_receipt(chunk_id: str, queue_path, *, reason: str = "DB_BUSY } +def _connection_busy_timeout_ms(conn) -> int | None: + if conn is None: + return None + try: + row = conn.cursor().execute("PRAGMA busy_timeout").fetchone() + return int(row[0]) + except Exception: + return None + + +def _set_connection_busy_timeout(conn, timeout_ms: int | None) -> None: + if conn is None or timeout_ms is None: + return + try: + conn.setbusytimeout(max(1, min(timeout_ms, _MAX_APSW_BUSY_TIMEOUT_MS))) + except Exception: + logger.debug("Failed to set store busy timeout", exc_info=True) + + +def _connection_exec_trace(conn): + if conn is None or not hasattr(conn, "getexectrace"): + return _NO_EXEC_TRACE + try: + return conn.getexectrace() + except Exception: + logger.debug("Failed to read store exec trace", exc_info=True) + return _NO_EXEC_TRACE + + +def _set_connection_exec_trace(conn, trace) -> None: + if conn is None or not hasattr(conn, "setexectrace") or trace is _NO_EXEC_TRACE: + return + try: + conn.setexectrace(trace) + except Exception: + logger.debug("Failed to set store exec trace", exc_info=True) + + +def _is_store_busy_cleanup_statement(statement) -> bool: + if not isinstance(statement, str): + return False + sql = statement.lstrip().upper() + return sql.startswith("ROLLBACK") or sql.startswith("RELEASE") + + +def _run_connection_exec_trace(trace, cursor, statement, bindings): + if trace is _NO_EXEC_TRACE or trace is None: + return True + result = trace(cursor, statement, bindings) + return True if result is None else result + + +def _remaining_store_busy_budget_ms(deadline: float) -> int: + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("brain_store busy budget exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +@contextmanager +def _temporary_store_busy_timeout(conn, deadline: float): + if conn is None: + yield + return + + timeout_ms = _remaining_store_busy_budget_ms(deadline) + acquired = _STORE_BUSY_TIMEOUT_LOCK.acquire(timeout=max(timeout_ms, 1) / 1000) + if not acquired: + raise apsw.BusyError("brain_store busy_timeout lock wait exceeded") + + original_busy_timeout_ms = _connection_busy_timeout_ms(conn) + original_exec_trace = _connection_exec_trace(conn) + + def refresh_timeout(cursor, statement, bindings): + if _is_store_busy_cleanup_statement(statement): + return _run_connection_exec_trace(original_exec_trace, cursor, statement, bindings) + _set_connection_busy_timeout(conn, _remaining_store_busy_budget_ms(deadline)) + return _run_connection_exec_trace(original_exec_trace, cursor, statement, bindings) + + try: + timeout_ms = _remaining_store_busy_budget_ms(deadline) + _set_connection_busy_timeout(conn, timeout_ms) + _set_connection_exec_trace(conn, refresh_timeout) + yield + finally: + _set_connection_exec_trace(conn, original_exec_trace) + _set_connection_busy_timeout(conn, original_busy_timeout_ms) + _STORE_BUSY_TIMEOUT_LOCK.release() + + def _flush_pending_stores(store, embed_fn) -> int: """Flush pending-stores.jsonl (FIFO). Returns count flushed.""" from ..store import store_memory @@ -577,16 +690,39 @@ def _flush_pending_stores(store, embed_fn) -> int: return flushed -async def _store_memory_with_retries(store_memory, **kwargs): +async def _store_memory_with_retries(store_memory, *, deadline: float | None = None, **kwargs): last_err = None + budget_ms = _store_busy_budget_ms() + if deadline is None: + deadline = _store_busy_deadline() + conn = getattr(kwargs.get("store"), "conn", None) for attempt in range(_RETRY_MAX_ATTEMPTS): + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0 and last_err is not None: + raise last_err try: - return store_memory(**kwargs) + with _temporary_store_busy_timeout(conn, deadline): + return store_memory(**kwargs, busy_deadline=deadline, retry_on_busy=False) except Exception as exc: if not _is_lock_error(exc) or attempt >= _RETRY_MAX_ATTEMPTS - 1: raise last_err = exc + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + logger.warning( + "brain_store BusyError exceeded %dms busy budget after attempt %d/%d; deferring", + budget_ms, + attempt + 1, + _RETRY_MAX_ATTEMPTS, + ) + raise delay = _retry_delay * (2**attempt) + if int(delay * 1000) >= remaining_ms: + logger.warning( + "brain_store BusyError has %dms budget left before retry delay; deferring", + remaining_ms, + ) + raise logger.warning( "brain_store BusyError (attempt %d/%d), retrying in %.2fs", attempt + 1, @@ -597,6 +733,32 @@ async def _store_memory_with_retries(store_memory, **kwargs): raise last_err # type: ignore[misc] +def _get_store_vector_store(deadline: float): + timeout = max(0.0, deadline - time.monotonic()) + try: + return _get_vector_store(timeout=timeout) + except TypeError as exc: + if "timeout" not in str(exc): + raise + return _get_vector_store() + + +def _validate_store_request(content: str, memory_type: str) -> str: + from ..ingest_guard import reject_recursive_mcp_output + from ..pipeline.classify import looks_like_system_prompt + from ..store import VALID_MEMORY_TYPES + + if not content or not content.strip(): + raise ValueError("content must be non-empty") + content = content.strip() + if memory_type not in VALID_MEMORY_TYPES: + raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") + reject_recursive_mcp_output(content) + if looks_like_system_prompt(content): + raise ValueError("system prompt content is not stored in BrainLayer") + return content + + async def _store( content: str, memory_type: str, @@ -623,20 +785,11 @@ async def _store( promised_chunk_id = _new_manual_chunk_id() reservation_created_at = datetime.now(timezone.utc).isoformat() try: + content = _validate_store_request(content, memory_type) + if os.environ.get("BRAINLAYER_ARBITRATED") == "1": - from ..ingest_guard import reject_recursive_mcp_output - from ..pipeline.classify import looks_like_system_prompt from ..search_repo import clear_hybrid_search_cache - from ..store import VALID_MEMORY_TYPES - - if not content or not content.strip(): - raise ValueError("content must be non-empty") - content = content.strip() - if memory_type not in VALID_MEMORY_TYPES: - raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") - reject_recursive_mcp_output(content) - if looks_like_system_prompt(content): - raise ValueError("system prompt content is not stored in BrainLayer") + queue_path = _queue_store( { "chunk_id": promised_chunk_id, @@ -664,13 +817,17 @@ async def _store( return ([TextContent(type="text", text=format_store_result(promised_chunk_id, queued=True))], structured) from ..store import embed_hot_chunk, embed_pending_chunks, store_memory + from ..vector_store import temporary_write_busy_timeout_ms - store = _get_vector_store() + deadline = _store_busy_deadline() + with temporary_write_busy_timeout_ms(_remaining_store_busy_budget_ms(deadline), deadline=deadline): + store = _get_store_vector_store(deadline) normalized_project = _normalize_project_name(project) # Store WITHOUT embedding — returns immediately (no executor needed) result = await _store_memory_with_retries( store_memory, + deadline=deadline, store=store, embed_fn=None, content=content, @@ -697,7 +854,8 @@ async def _store( # If supersedes is set, mark the old chunk as superseded by the new one superseded_ok = None if supersedes: - superseded_ok = store.supersede_chunk(supersedes, chunk_id) + with _temporary_store_busy_timeout(getattr(store, "conn", None), deadline): + superseded_ok = store.supersede_chunk(supersedes, chunk_id) # Schedule background embedding + flush in a single daemon thread. # CRITICAL: must use a separate VectorStore connection — APSW enforces diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index 77f14e91..281f71f1 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -50,6 +50,32 @@ logger = logging.getLogger(__name__) VALID_MEMORY_TYPES = ["idea", "mistake", "decision", "learning", "todo", "bookmark", "note", "journal", "issue"] +_MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 + + +def _busy_deadline_timeout_ms(busy_deadline: Optional[float]) -> int | None: + if busy_deadline is None: + return None + remaining_ms = int((busy_deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("store busy budget exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _set_busy_timeout_for_deadline(conn, busy_deadline: Optional[float]) -> None: + timeout_ms = _busy_deadline_timeout_ms(busy_deadline) + if timeout_ms is None: + return + conn.setbusytimeout(timeout_ms) + + +def _sleep_before_busy_retry(delay: float, busy_deadline: Optional[float]) -> None: + if busy_deadline is not None: + remaining = busy_deadline - time.monotonic() + if remaining <= 0 or delay >= remaining: + raise apsw.BusyError("store busy budget exceeded") + delay = min(delay, remaining) + time.sleep(delay) def store_memory( @@ -76,6 +102,8 @@ def store_memory( origin_repo_path: Optional[str] = None, replayed_by: Optional[str] = None, chunk_origin: Optional[str] = None, + busy_deadline: Optional[float] = None, + retry_on_busy: bool = True, ) -> Dict[str, Any]: """Persistently store a memory into BrainLayer. @@ -107,6 +135,8 @@ def store_memory( origin_repo_path: Optional git root for the fallback's originating repo. replayed_by: Optional replay worker identifier. chunk_origin: Optional explicit origin classification preserved from queued fallback metadata. + busy_deadline: Optional monotonic deadline for internal BusyError retries. + retry_on_busy: Whether store_memory should run its own synchronous BusyError retry loop. Returns: Dict with 'id' (chunk ID) and 'related' (list of similar existing memories). @@ -190,6 +220,7 @@ def store_memory( cursor = store.conn.cursor() transaction_started = False try: + _set_busy_timeout_for_deadline(store.conn, busy_deadline) cursor.execute("BEGIN IMMEDIATE") transaction_started = True pending_reembed = None @@ -337,9 +368,9 @@ def store_memory( except apsw.BusyError: if transaction_started: cursor.execute("ROLLBACK") - if attempt == 4: + if not retry_on_busy or attempt == 4: raise - time.sleep(0.1 * (2**attempt)) + _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: if transaction_started: cursor.execute("ROLLBACK") @@ -352,6 +383,7 @@ def store_memory( cursor = store.conn.cursor() transaction_started = False try: + _set_busy_timeout_for_deadline(store.conn, busy_deadline) cursor.execute("BEGIN IMMEDIATE") transaction_started = True store._upsert_chunk_vector(cursor, reembed_chunk_id, merged_embedding) @@ -361,9 +393,9 @@ def store_memory( except apsw.BusyError: if transaction_started: cursor.execute("ROLLBACK") - if attempt == 4: + if not retry_on_busy or attempt == 4: raise - time.sleep(0.1 * (2**attempt)) + _sleep_before_busy_retry(0.1 * (2**attempt), busy_deadline) except Exception: if transaction_started: cursor.execute("ROLLBACK") diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 30eb3504..bde69713 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -15,6 +15,7 @@ import subprocess import threading import time +from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional @@ -63,6 +64,8 @@ _DEFAULT_BUSY_TIMEOUT_MS = 30_000 _DEFAULT_READ_BUSY_TIMEOUT_MS = 5_000 _MAX_APSW_BUSY_TIMEOUT_MS = 2_147_483_647 +_WRITE_BUSY_TIMEOUT_STATE = threading.local() +_NO_EXEC_TRACE = object() def _positive_int_env(name: str, default: int) -> int: @@ -79,6 +82,98 @@ def _read_busy_timeout_ms() -> int: return _positive_int_env("BRAINLAYER_READ_BUSY_TIMEOUT_MS", _DEFAULT_READ_BUSY_TIMEOUT_MS) +def _write_busy_deadline() -> float | None: + deadline = getattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline", None) + if isinstance(deadline, int | float): + return float(deadline) + return None + + +def _remaining_busy_deadline_ms(deadline: float) -> int: + remaining_ms = int((deadline - time.monotonic()) * 1000) + if remaining_ms <= 0: + raise apsw.BusyError("write busy deadline exceeded") + return max(1, min(remaining_ms, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _write_busy_timeout_ms() -> int: + timeout_ms = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", _DEFAULT_BUSY_TIMEOUT_MS) + deadline = _write_busy_deadline() + if deadline is not None: + timeout_ms = min(int(timeout_ms), _remaining_busy_deadline_ms(deadline)) + return max(1, min(int(timeout_ms), _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _default_write_busy_timeout_ms() -> int: + return max(1, min(_DEFAULT_BUSY_TIMEOUT_MS, _MAX_APSW_BUSY_TIMEOUT_MS)) + + +def _refresh_write_busy_timeout_for_deadline(conn: apsw.Connection) -> None: + if _write_busy_deadline() is None: + return + conn.setbusytimeout(_write_busy_timeout_ms()) + + +def _is_write_busy_cleanup_statement(statement) -> bool: + if not isinstance(statement, str): + return False + sql = statement.lstrip().upper() + return sql.startswith("ROLLBACK") or sql.startswith("RELEASE") + + +def _run_write_busy_exec_trace(trace, cursor, statement, bindings): + if trace is _NO_EXEC_TRACE or trace is None: + return True + result = trace(cursor, statement, bindings) + return True if result is None else result + + +def _install_write_busy_deadline_trace(conn: apsw.Connection): + if _write_busy_deadline() is None: + return _NO_EXEC_TRACE + old_trace = conn.getexectrace() + + def refresh_timeout(cursor, statement, bindings): + if _is_write_busy_cleanup_statement(statement): + return _run_write_busy_exec_trace(old_trace, cursor, statement, bindings) + _refresh_write_busy_timeout_for_deadline(conn) + return _run_write_busy_exec_trace(old_trace, cursor, statement, bindings) + + conn.setexectrace(refresh_timeout) + return old_trace + + +def _restore_write_busy_deadline_trace(conn: apsw.Connection, old_trace) -> None: + if old_trace is _NO_EXEC_TRACE: + return + conn.setexectrace(old_trace) + + +@contextmanager +def temporary_write_busy_timeout_ms(timeout_ms: int, *, deadline: float | None = None): + old_timeout = getattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms", None) + old_deadline = getattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline", None) + _WRITE_BUSY_TIMEOUT_STATE.timeout_ms = max(1, min(int(timeout_ms), _MAX_APSW_BUSY_TIMEOUT_MS)) + if deadline is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline") + else: + _WRITE_BUSY_TIMEOUT_STATE.deadline = float(deadline) + try: + yield + finally: + if old_timeout is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "timeout_ms") + else: + _WRITE_BUSY_TIMEOUT_STATE.timeout_ms = old_timeout + if old_deadline is None: + if hasattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline"): + delattr(_WRITE_BUSY_TIMEOUT_STATE, "deadline") + else: + _WRITE_BUSY_TIMEOUT_STATE.deadline = old_deadline + + def _set_busy_timeout_hook(conn: apsw.Connection) -> None: """Set busy_timeout on every new connection before any other hooks. @@ -86,7 +181,7 @@ def _set_busy_timeout_hook(conn: apsw.Connection) -> None: the Connection() constructor. Without busy_timeout set first, this PRAGMA fails with BusyError when other processes hold the DB lock. """ - timeout_ms = _read_busy_timeout_ms() if conn.readonly("main") else _DEFAULT_BUSY_TIMEOUT_MS + timeout_ms = _read_busy_timeout_ms() if conn.readonly("main") else _write_busy_timeout_ms() conn.setbusytimeout(timeout_ms) @@ -530,9 +625,13 @@ def _init_db_with_retry(self) -> None: last_err = None start = time.monotonic() retry_budget = max(int(self._INIT_MAX_RETRIES), 1) + deadline = _write_busy_deadline() for attempt in range(retry_budget): + if deadline is not None and time.monotonic() >= deadline: + raise last_err or apsw.BusyError("write busy deadline exceeded") try: self._init_db() + self._restore_default_write_busy_timeout() return except apsw.Error as e: if not _is_retryable_init_error(e): @@ -547,15 +646,36 @@ def _init_db_with_retry(self) -> None: f"elapsed {elapsed:.1f}s, retrying in {delay:.1f}s...", file=sys.stderr, ) + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0 or delay >= remaining: + raise last_err time.sleep(delay) raise last_err # type: ignore[misc] + def _restore_default_write_busy_timeout(self) -> None: + conn = getattr(self, "conn", None) + if conn is None: + return + _restore_write_busy_deadline_trace( + conn, + getattr(self, "_init_write_busy_deadline_trace", _NO_EXEC_TRACE), + ) + self._init_write_busy_deadline_trace = _NO_EXEC_TRACE + try: + if conn.readonly("main"): + return + except apsw.Error: + return + conn.setbusytimeout(_default_write_busy_timeout_ms()) + def _init_db(self) -> None: """Initialize database with vector extension.""" self.conn = apsw.Connection(str(self.db_path)) # Set busy timeout IMMEDIATELY via APSW native method — before any DDL. - self.conn.setbusytimeout(10_000) # 10 seconds + self.conn.setbusytimeout(_write_busy_timeout_ms()) + self._init_write_busy_deadline_trace = _install_write_busy_deadline_trace(self.conn) self.conn.enableloadextension(True) self.conn.loadextension(sqlite_vec.loadable_path()) diff --git a/tests/test_arbitration.py b/tests/test_arbitration.py index 3bea2f75..8eb5391c 100644 --- a/tests/test_arbitration.py +++ b/tests/test_arbitration.py @@ -515,6 +515,53 @@ def test_queue_sanitizes_source_and_drain_preserves_supersedes(tmp_path): assert entity_link == replacement_id +def test_drain_existing_promised_chunk_still_applies_supersedes(tmp_path, monkeypatch): + """A queued replay after post-store supersede contention must finish lifecycle work.""" + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('old-id', 'old content', '{}', 'seed') + """ + ) + conn.execute( + """ + INSERT INTO chunks (id, content, metadata, source_file) + VALUES ('manual-new1234', 'replacement content', '{}', 'mcp') + """ + ) + conn.commit() + + (queue_dir / "store-replay.jsonl").write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-new1234", + "content": "replacement content", + "memory_type": "note", + "project": "arbitration-test", + "supersedes": "old-id", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + superseded_by = conn.execute("SELECT superseded_by FROM chunks WHERE id = 'old-id'").fetchone()[0] + + assert superseded_by == "manual-new1234" + + def test_drain_store_events_merge_duplicates_and_write_alias(tmp_path): from brainlayer.drain import drain_once from brainlayer.queue_io import enqueue_store diff --git a/tests/test_store_handler.py b/tests/test_store_handler.py index 160e2667..9ff2ffe7 100644 --- a/tests/test_store_handler.py +++ b/tests/test_store_handler.py @@ -1,7 +1,10 @@ """Tests for MCP store handler responses.""" +import asyncio import json -from unittest.mock import patch +import threading +import time +from unittest.mock import MagicMock, patch import apsw import pytest @@ -68,6 +71,783 @@ async def test_busy_queue_fallback_returns_loud_deferred_receipt(tmp_path): assert any("DEFERRED" in item.text for item in texts) +@pytest.mark.asyncio +async def test_store_validates_before_busy_deferral(tmp_path): + """Busy before store_memory must not queue requests that validation would reject.""" + from brainlayer.mcp.store_handler import _store + + queue_dir = tmp_path / "queue" + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", side_effect=apsw.BusyError("database is locked")), + patch("brainlayer.queue_io.get_queue_dir", return_value=queue_dir), + ): + result = await _store( + content="invalid memory type should never be queued", + memory_type="invalid_type", + project="test", + ) + + assert result.isError is True + assert "Validation error" in result.content[0].text + assert not list(queue_dir.glob("mcp-*.jsonl")) + + +@pytest.mark.asyncio +async def test_store_busy_budget_defers_promptly_and_pending_flush_replays(tmp_path, monkeypatch): + """A held write lock must hit the bounded store budget, defer, and replay later.""" + from brainlayer.mcp.store_handler import _flush_pending_stores, _store + + pending_path = tmp_path / "pending-stores.jsonl" + attempts = 0 + + def held_write_lock_store_memory(**kwargs): + nonlocal attempts + attempts += 1 + time.sleep(0.075) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=MagicMock()), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=held_write_lock_store_memory), + ): + started = time.perf_counter() + texts, structured = await _store( + content="queued within busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.22 + assert attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert structured["deferred"]["queue_path"] == str(pending_path) + assert any("queued" in item.text.lower() for item in texts) + + queued_event = json.loads(pending_path.read_text()) + assert queued_event["chunk_id"] == structured["chunk_id"] + assert queued_event["content"] == "queued within busy budget" + + with ( + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", return_value={"id": structured["chunk_id"], "related": []}) as replay, + ): + flushed = _flush_pending_stores(MagicMock(), MagicMock()) + + assert flushed == 1 + assert not pending_path.exists() + assert replay.call_args.kwargs["chunk_id"] == structured["chunk_id"] + assert replay.call_args.kwargs["content"] == "queued within busy budget" + + +@pytest.mark.asyncio +async def test_store_busy_budget_restores_timeout_between_concurrent_retries(monkeypatch): + """Overlapping retry sleeps must not snapshot another store call's temporary timeout.""" + from brainlayer.mcp import store_handler + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def locked_store_memory(**kwargs): + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "1000") + monkeypatch.setattr(store_handler, "_retry_delay", 0.02) + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 2) + + results = await asyncio.gather( + store_handler._store_memory_with_retries(locked_store_memory, store=store), + store_handler._store_memory_with_retries(locked_store_memory, store=store), + return_exceptions=True, + ) + + assert all(isinstance(result, apsw.BusyError) for result in results) + assert store.conn.timeout_ms == 5000 + + +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_store_memory_inner_retry_loop(tmp_path, monkeypatch): + """The MCP budget must bound store_memory's internal BEGIN IMMEDIATE retry loop.""" + import brainlayer.store as store_module + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + real_sleep = time.sleep + begin_attempts = 0 + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql, *args): + nonlocal begin_attempts + if sql == "PRAGMA busy_timeout": + return self + if sql == "BEGIN IMMEDIATE": + begin_attempts += 1 + real_sleep(self.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + raise AssertionError(f"unexpected SQL: {sql}") + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + monkeypatch.setattr(store_module.time, "sleep", lambda delay: real_sleep(min(delay, 0.001))) + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=store), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="inner retries must respect mcp busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.18 + assert begin_attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + + +@pytest.mark.asyncio +async def test_store_busy_budget_refreshes_after_timeout_lock_wait(monkeypatch): + """Time spent waiting for the timeout lock must count against the store budget.""" + from brainlayer.mcp import store_handler + + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def locked_store_memory(**kwargs): + real_sleep(store.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "200") + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 1) + + store_handler._STORE_BUSY_TIMEOUT_LOCK.acquire() + timer = threading.Timer(0.15, store_handler._STORE_BUSY_TIMEOUT_LOCK.release) + timer.start() + try: + started = time.perf_counter() + result = await store_handler._store_memory_with_retries(locked_store_memory, store=store) + except apsw.BusyError as exc: + result = exc + elapsed = time.perf_counter() - started + finally: + timer.cancel() + if store_handler._STORE_BUSY_TIMEOUT_LOCK.locked(): + store_handler._STORE_BUSY_TIMEOUT_LOCK.release() + + assert isinstance(result, apsw.BusyError) + assert elapsed < 0.28 + + +@pytest.mark.asyncio +async def test_store_busy_budget_covers_cold_vector_store_init(tmp_path, monkeypatch): + """The store budget must include cold VectorStore init before queuing.""" + import brainlayer.store # noqa: F401 + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + from brainlayer.vector_store import VectorStore + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "busy-init.db" + real_sleep = time.sleep + init_attempts = 0 + + def busy_init(self): + nonlocal init_attempts + init_attempts += 1 + real_sleep(0.075) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr(VectorStore, "_INIT_MAX_RETRIES", 4) + monkeypatch.setattr(VectorStore, "_INIT_BASE_DELAY", 0.001) + monkeypatch.setattr(VectorStore, "_INIT_MAX_DELAY", 0.001) + monkeypatch.setattr(VectorStore, "_acquire_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_release_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_init_db", busy_init) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="cold vector store init must respect busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.22 + assert init_attempts <= 2 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + + +@pytest.mark.asyncio +async def test_store_busy_budget_restores_cold_vector_store_write_timeout(tmp_path, monkeypatch): + """A cold store init may be capped, but the singleton must return to normal writes.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + from brainlayer.vector_store import VectorStore + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "cold-timeout.db" + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = vector_store_module._DEFAULT_BUSY_TIMEOUT_MS + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def readonly(self, _database): + return False + + def close(self): + pass + + def fake_init_db(self): + self.conn = FakeConn() + self.conn.setbusytimeout(vector_store_module._write_busy_timeout_ms()) + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "100") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr(VectorStore, "_INIT_MAX_RETRIES", 1) + monkeypatch.setattr(VectorStore, "_acquire_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_release_writer_pidfile", lambda self: None) + monkeypatch.setattr(VectorStore, "_init_db", fake_init_db) + monkeypatch.setattr("brainlayer.mcp.store_handler._retry_delay", 0.001) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("database is locked")), + ): + _texts, structured = await _store( + content="cold vector store timeout must restore", + memory_type="note", + project="test", + ) + + assert structured["status"] == "DEFERRED" + assert _shared._vector_store.conn.timeout_ms == vector_store_module._DEFAULT_BUSY_TIMEOUT_MS + + +def test_store_busy_budget_refreshes_cold_init_timeout_between_statements(tmp_path, monkeypatch): + """Cold init must spend one absolute store budget across multiple DDL statements.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.vector_store import VectorStore, temporary_write_busy_timeout_ms + + class StopInit(Exception): + pass + + fake_clock = {"now": 0.0} + seen_timeouts = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql, *args): + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + seen_timeouts.append(self.conn.timeout_ms) + fake_clock["now"] += 0.04 + if len(seen_timeouts) == 3: + raise StopInit + return self + + def fetchone(self): + return None + + def __iter__(self): + return iter(()) + + class FakeConnection: + def __init__(self, path): + self.path = path + self.timeout_ms = None + self.exec_trace = None + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + def enableloadextension(self, enabled): + self.load_extension_enabled = enabled + + def loadextension(self, path): + self.loaded_extension = path + + def cursor(self): + return FakeCursor(self) + + def readonly(self, name): + return False + + monkeypatch.setattr(vector_store_module.apsw, "Connection", FakeConnection) + monkeypatch.setattr(vector_store_module.sqlite_vec, "loadable_path", lambda: "sqlite-vec") + monkeypatch.setattr(vector_store_module.time, "monotonic", lambda: fake_clock["now"]) + + store = object.__new__(VectorStore) + store.db_path = tmp_path / "cold-init-deadline.db" + + with temporary_write_busy_timeout_ms(200, deadline=0.2), pytest.raises(StopInit): + store._init_db() + + assert len(seen_timeouts) == 3 + assert seen_timeouts[1] < seen_timeouts[0] + assert seen_timeouts[2] < seen_timeouts[1] + + +def test_vector_store_init_trace_allows_expired_rollback_cleanup(monkeypatch): + """Init deadline trace must not block transaction cleanup after expiry.""" + import brainlayer.vector_store as vector_store_module + from brainlayer.vector_store import temporary_write_busy_timeout_ms + + fake_clock = {"now": 0.0} + executed = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + if self.conn.exec_trace is not None: + result = self.conn.exec_trace(self, sql, None) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + executed.append(sql) + return self + + class FakeConn: + def __init__(self): + self.timeout_ms = 100 + self.exec_trace = None + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + def cursor(self): + return FakeCursor(self) + + conn = FakeConn() + monkeypatch.setattr(vector_store_module.time, "monotonic", lambda: fake_clock["now"]) + + with temporary_write_busy_timeout_ms(100, deadline=0.1): + vector_store_module._install_write_busy_deadline_trace(conn) + cursor = conn.cursor() + fake_clock["now"] = 0.11 + cursor.execute("ROLLBACK") + cursor.execute("ROLLBACK TO vector_init") + cursor.execute("RELEASE vector_init") + with pytest.raises(apsw.BusyError): + cursor.execute("CREATE TABLE after_deadline(id TEXT)") + + assert executed == ["ROLLBACK", "ROLLBACK TO vector_init", "RELEASE vector_init"] + + +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_singleton_init_lock_wait(tmp_path, monkeypatch): + """Waiting for another cold VectorStore init must not exceed the store budget.""" + from brainlayer.mcp import _shared + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + db_path = tmp_path / "singleton-lock.db" + + class FakeVectorStore: + def __init__(self, path): + self.db_path = path + self.conn = MagicMock() + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr(_shared, "_vector_store", None) + monkeypatch.setattr("brainlayer.paths.get_db_path", lambda: db_path) + monkeypatch.setattr("brainlayer.vector_store.VectorStore", FakeVectorStore) + + with ( + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=apsw.BusyError("database is locked")), + ): + _shared._store_lock.acquire() + timer = threading.Timer(0.24, _shared._store_lock.release) + timer.start() + try: + started = time.perf_counter() + _texts, structured = await _store( + content="store init lock wait must respect busy budget", + memory_type="note", + project="test", + ) + elapsed = time.perf_counter() - started + finally: + timer.cancel() + if _shared._store_lock.locked(): + _shared._store_lock.release() + + assert elapsed < 0.18 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert _shared._vector_store is None + + +@pytest.mark.asyncio +async def test_store_busy_budget_disables_store_memory_internal_busy_sleep(monkeypatch): + """Outer MCP retries should own busy backoff instead of holding the timeout lock.""" + from brainlayer.mcp import store_handler + + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 5000 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + retry_flags = [] + + def internally_retrying_store_memory(**kwargs): + retry_flags.append(kwargs.get("retry_on_busy")) + if kwargs.get("retry_on_busy") is not False: + real_sleep(0.12) + raise apsw.BusyError("database is locked") + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + monkeypatch.setattr(store_handler, "_RETRY_MAX_ATTEMPTS", 1) + + started = time.perf_counter() + with pytest.raises(apsw.BusyError): + await store_handler._store_memory_with_retries(internally_retrying_store_memory, store=store) + elapsed = time.perf_counter() - started + + assert elapsed < 0.16 + assert retry_flags == [False] + + +@pytest.mark.asyncio +async def test_store_busy_budget_bounds_supersede_update(tmp_path, monkeypatch): + """The same store budget must bound the post-store supersede write.""" + from brainlayer.mcp.store_handler import _store + + pending_path = tmp_path / "pending-stores.jsonl" + real_sleep = time.sleep + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + + def execute(self, sql): + assert sql == "PRAGMA busy_timeout" + return self + + def fetchone(self): + return (self.conn.timeout_ms,) + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + store = MagicMock() + store.conn = FakeConn() + + def successful_store_memory(**kwargs): + return {"id": kwargs["chunk_id"], "related": []} + + def locked_supersede(_old_chunk_id, _new_chunk_id): + real_sleep(store.conn.timeout_ms / 1000) + raise apsw.BusyError("database is locked") + + store.supersede_chunk.side_effect = locked_supersede + + monkeypatch.setenv("BRAINLAYER_STORE_BUSY_BUDGET_MS", "80") + + with ( + patch("brainlayer.mcp.store_handler._get_vector_store", return_value=store), + patch("brainlayer.mcp.store_handler._normalize_project_name", return_value="test"), + patch("brainlayer.queue_io.enqueue_store", side_effect=RuntimeError("force legacy pending queue")), + patch("brainlayer.mcp.store_handler._get_pending_store_path", return_value=pending_path), + patch("brainlayer.store.store_memory", side_effect=successful_store_memory), + ): + started = time.perf_counter() + _texts, structured = await _store( + content="supersede update must respect busy budget", + memory_type="note", + project="test", + supersedes="manual-old", + ) + elapsed = time.perf_counter() - started + + assert elapsed < 0.18 + assert structured["status"] == "DEFERRED" + assert structured["deferred"]["action"] == "queued_for_replay" + assert json.loads(pending_path.read_text())["supersedes"] == "manual-old" + + +def test_store_busy_budget_refreshes_supersede_timeout_between_statements(monkeypatch): + """Store busy context must refresh the deadline before each SQL statement.""" + from brainlayer.mcp import store_handler + + fake_clock = {"now": 0.0} + seen_timeouts = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + self._pragma = False + + def execute(self, sql, *args): + if sql == "PRAGMA busy_timeout": + self._pragma = True + return self + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + seen_timeouts.append(self.conn.timeout_ms) + fake_clock["now"] += 0.04 + return self + + def fetchone(self): + if self._pragma: + return (self.conn.timeout_ms,) + return None + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + self.exec_trace = None + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + conn = FakeConn() + monkeypatch.setattr(store_handler.time, "monotonic", lambda: fake_clock["now"]) + + with store_handler._temporary_store_busy_timeout(conn, 0.2): + cursor = conn.cursor() + cursor.execute("SELECT 1 FROM chunks WHERE id = ?", ("old",)) + cursor.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", ("new", "old")) + cursor.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", ("old",)) + + assert seen_timeouts[1] < seen_timeouts[0] + assert seen_timeouts[2] < seen_timeouts[1] + assert conn.timeout_ms == 250 + assert conn.exec_trace is None + + +def test_store_busy_budget_allows_expired_rollback_cleanup(monkeypatch): + """Rollback cleanup must not be blocked by the expired busy-budget trace.""" + from brainlayer.mcp import store_handler + + fake_clock = {"now": 0.0} + executed = [] + + class FakeCursor: + def __init__(self, conn): + self.conn = conn + self._pragma = False + + def execute(self, sql, *args): + if sql == "PRAGMA busy_timeout": + self._pragma = True + return self + if self.conn.exec_trace is not None: + bindings = args[0] if args else None + result = self.conn.exec_trace(self, sql, bindings) + if result is False: + raise apsw.ExecTraceAbort("exec trace aborted") + executed.append(sql) + return self + + def fetchone(self): + if self._pragma: + return (self.conn.timeout_ms,) + return None + + class FakeConn: + def __init__(self): + self.timeout_ms = 250 + self.exec_trace = None + + def cursor(self): + return FakeCursor(self) + + def setbusytimeout(self, timeout_ms): + self.timeout_ms = timeout_ms + + def getexectrace(self): + return self.exec_trace + + def setexectrace(self, trace): + self.exec_trace = trace + + conn = FakeConn() + monkeypatch.setattr(store_handler.time, "monotonic", lambda: fake_clock["now"]) + + with store_handler._temporary_store_busy_timeout(conn, 0.1): + cursor = conn.cursor() + cursor.execute("BEGIN IMMEDIATE") + fake_clock["now"] = 0.11 + cursor.execute("ROLLBACK") + cursor.execute("ROLLBACK TO store_memory") + cursor.execute("RELEASE store_memory") + + assert executed == [ + "BEGIN IMMEDIATE", + "ROLLBACK", + "ROLLBACK TO store_memory", + "RELEASE store_memory", + ] + assert conn.timeout_ms == 250 + assert conn.exec_trace is None + + @pytest.mark.asyncio async def test_store_preassigns_same_chunk_id_across_busy_retry(tmp_path, monkeypatch): """The MCP handler promises one chunk ID before the first write attempt.""" diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py index 155f27a3..ac1ec22e 100644 --- a/tests/test_write_queue.py +++ b/tests/test_write_queue.py @@ -17,7 +17,7 @@ import apsw import pytest -from brainlayer.drain import _open_connection, drain_once +from brainlayer.drain import _open_connection, burn_drain_once, drain_once from brainlayer.mcp.store_handler import ( _flush_pending_stores, _queue_store, @@ -284,6 +284,116 @@ def test_flush_drains_queue(self, tmp_path): assert flushed == 1 assert not pending_path.exists() # File deleted after full flush + def test_drain_store_event_initializes_missing_schema(self, tmp_path, monkeypatch): + """Queued store replay must recover if the first store deferred before schema init.""" + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-fresh-schema.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-fresh-schema", + "content": "fresh queued store should create schema before replay", + "memory_type": "note", + "project": "test", + "created_at": "2026-06-19T08:54:00Z", + } + ) + + "\n" + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1, log_path=log_path) == 1 + assert not queued.exists() + with sqlite3.connect(db_path) as conn: + row = conn.execute( + "SELECT id, content, project FROM chunks WHERE id = ?", + ("manual-fresh-schema",), + ).fetchone() + assert row == ( + "manual-fresh-schema", + "fresh queued store should create schema before replay", + "test", + ) + + def test_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): + """Schema bootstrap contention must preserve queued store files for retry.""" + from brainlayer.vector_store import WriterInUseError + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-schema-contended.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-schema-contended", + "content": "store queue should survive contended schema bootstrap", + "memory_type": "note", + "project": "test", + } + ) + + "\n" + ) + + with patch( + "brainlayer.drain._ensure_drain_db_schema", + side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"), + ): + drained = drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1, log_path=log_path) + + assert drained == 0 + assert queued.exists() + assert "batch preserved" in log_path.read_text() + + def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): + """Burn drain must preserve store files if first-run schema init is contended.""" + from brainlayer.vector_store import WriterInUseError + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + queued = queue_dir / "store-schema-contended.jsonl" + queued.write_text( + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-schema-contended", + "content": "burn drain should preserve contended schema bootstrap", + "memory_type": "note", + "project": "test", + } + ) + + "\n" + ) + + with patch( + "brainlayer.drain._ensure_drain_db_schema", + side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"), + ): + result = burn_drain_once( + db_path=db_path, + queue_dir=queue_dir, + batch_size=1, + log_path=log_path, + ) + + assert result.failed_files == 1 + assert result.files_deleted == 0 + assert queued.exists() + assert "batch preserved" in log_path.read_text() + def test_flush_preserves_legacy_pending_chunk_id(self, tmp_path): """Legacy pending-stores flush must persist the caller-visible queued ID.""" pending_path = tmp_path / "pending-stores.jsonl"