Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 148 additions & 57 deletions src/brainlayer/drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,39 @@ def _open_connection(db_path: Path) -> apsw.Connection:
return conn


def _ensure_drain_db_schema(db_path: Path) -> None:
from .vector_store import VectorStore

store = VectorStore(db_path)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve queued stores when schema bootstrap is contended

When a store batch is drained against a missing or empty DB while another BrainLayer process already owns the VectorStore writer pidfile, this VectorStore(db_path) construction raises WriterInUseError. The helper is called from drain_once/burn_drain_once before their existing failure-preserving retry blocks, and that exception is not treated as an APSW busy error, so the drain daemon exits instead of leaving the JSONL file queued for a later retry; a first-run/startup race can strand queued brain_store events until the daemon is restarted.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c048857. drain_once and burn_drain_once now treat WriterInUseError as busy-like schema bootstrap contention and preserve queued store batches instead of letting the exception escape. The same preserve path is used for missing-chunks schema bootstrap retries. Added drain and burn-drain regressions that patch schema initialization to raise WriterInUseError and assert the queue file remains for retry.

store.close()


def _db_needs_initial_schema(db_path: Path) -> bool:
try:
return not db_path.exists() or db_path.stat().st_size == 0
except OSError:
return False


def _ensure_drain_db_schema_preserving_queue(
db_path: Path,
log_path: Path,
*,
context: str,
force: bool = False,
) -> bool:
if not force and not _db_needs_initial_schema(db_path):
return True
try:
_ensure_drain_db_schema(db_path)
return True
except Exception as exc:
if not _is_busy_error(exc):
raise
_log(log_path, f"{context} schema init busy; batch preserved: {exc}")
return False


def _acquire_queue_lock(queue_dir: Path) -> int:
fd = os.open(queue_dir, os.O_RDONLY)
try:
Expand Down Expand Up @@ -312,6 +345,14 @@ def _event_payload(event: dict[str, Any]) -> dict[str, Any]:
return {"kind": "unknown", **event}


def _events_include_store(events: list[dict[str, Any]]) -> bool:
return any(_event_payload(event).get("kind") == "store_memory" for event in events)


def _is_missing_chunks_error(exc: BaseException) -> bool:
return "no such table: chunks" in str(exc).lower()


def _event_created_at(event: dict[str, Any]) -> str:
raw_created_at = event.get("created_at")
if raw_created_at:
Expand Down Expand Up @@ -441,6 +482,24 @@ def _run_enrichment_provenance_hooks(
logger.exception("drain auto_supersede failed for chunk_id=%s", chunk_id)


def _apply_store_supersedes(conn: apsw.Connection, old_chunk_id: str | None, new_chunk_id: str) -> None:
if not old_chunk_id:
return
cols = _columns(conn, "chunks")
if "superseded_by" not in cols:
return
if "status" in cols:
conn.execute(
"UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?",
(new_chunk_id, old_chunk_id),
)
else:
conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (new_chunk_id, old_chunk_id))
for vector_table in ("chunk_vectors", "chunk_vectors_binary"):
if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone():
conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (old_chunk_id,))


def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
raw_content = event.get("content")
if raw_content is None:
Expand Down Expand Up @@ -468,6 +527,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
metadata.update(raw_metadata)
elif raw_metadata:
logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id"))
supersedes = event.get("supersedes") or metadata.get("supersedes")
tags = event.get("tags")
explicit_chunk_origin = event.get("chunk_origin") or metadata.get("chunk_origin")
existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
Expand All @@ -485,6 +545,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
"last_seen_at": now,
},
)
_apply_store_supersedes(conn, supersedes, chunk_id)
return ApplyResult(chunk_id=chunk_id)
return ApplyResult(collision_chunk_id=chunk_id)
stored_chunk_id = _insert_or_merge_chunk(
Expand All @@ -509,21 +570,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult:
"chunk_origin": detect_chunk_origin(content, explicit_chunk_origin),
},
)
supersedes = event.get("supersedes") or metadata.get("supersedes")
cols = _columns(conn, "chunks")
if supersedes and "superseded_by" in cols:
if "status" in cols:
conn.execute(
"UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?",
(stored_chunk_id, supersedes),
)
else:
conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (stored_chunk_id, supersedes))
for vector_table in ("chunk_vectors", "chunk_vectors_binary"):
if conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)
).fetchone():
conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,))
_apply_store_supersedes(conn, supersedes, stored_chunk_id)
entity_id = event.get("entity_id") or metadata.get("entity_id")
if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)):
if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone():
Expand Down Expand Up @@ -758,7 +805,9 @@ def _table_names(conn: apsw.Connection) -> set[str]:


def _is_busy_error(exc: BaseException) -> bool:
return _is_sqlite_busy_error(exc)
from .vector_store import WriterInUseError

return isinstance(exc, WriterInUseError) or _is_sqlite_busy_error(exc)


def _default_embed_fn() -> Callable[[str], list[float]]:
Expand Down Expand Up @@ -979,49 +1028,72 @@ def burn_drain_once(
return result

all_events = [event for _, events in batch for event in events]
conn = _open_connection(db_path)
try:
_ensure_enrichment_update_schema(conn)
conn.execute("BEGIN IMMEDIATE")
prefetched_state = _prefetch_enrichment_state(conn, all_events)
store_chunk_ids: list[str] = []
for _, events in batch:
for event in events:
if _is_verified_redundant_enrichment(event, prefetched_state):
payload = _event_payload(event)
enqueue_provenance_resolution_for_entities(
conn,
payload.get("entities"),
chunk_id=payload.get("chunk_id"),
commit=False,
)
result.skipped_verified_stale += 1
continue
applied = _apply_event(conn, event)
result.applied_events += 1
if applied.chunk_id:
store_chunk_ids.append(applied.chunk_id)
if _embedding_enabled():
_embed_store_chunks(conn, store_chunk_ids, embed_fn)
conn.execute("COMMIT")
conn.setbusytimeout(_checkpoint_busy_timeout_ms())
try:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
result.checkpoints += 1
except apsw.Error as exc:
_log(log_path, f"burn drain checkpoint skipped: {exc}")
finally:
conn.setbusytimeout(_drain_busy_timeout_ms())
except Exception as exc:
try:
conn.execute("ROLLBACK")
except Exception:
pass
batch_includes_store = _events_include_store(all_events)
if batch_includes_store and not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context="burn drain failed before transaction",
):
result.failed_files = len(batch)
_log(log_path, f"burn drain failed; batch preserved: {exc}")
return result
finally:
conn.close()
for schema_attempt in range(2):
conn = _open_connection(db_path)
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
try:
_ensure_enrichment_update_schema(conn)
conn.execute("BEGIN IMMEDIATE")
prefetched_state = _prefetch_enrichment_state(conn, all_events)
store_chunk_ids: list[str] = []
for _, events in batch:
for event in events:
if _is_verified_redundant_enrichment(event, prefetched_state):
payload = _event_payload(event)
enqueue_provenance_resolution_for_entities(
conn,
payload.get("entities"),
chunk_id=payload.get("chunk_id"),
commit=False,
)
result.skipped_verified_stale += 1
continue
applied = _apply_event(conn, event)
result.applied_events += 1
Comment on lines +1058 to +1059

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Defer burn-drain counters until commit succeeds

This increments the public applied_events count inside the transaction before the new missing-chunks retry path can roll it back. If a batch contains an event that returns before a later store raises no such table: chunks, the forced schema retry applies the batch again and the result/log over-count rolled-back work (for example two queued lines can report applied_events=3 after one retry). Accumulate counts locally per attempt and add them to result only after COMMIT succeeds.

Useful? React with 👍 / 👎.

if applied.chunk_id:
store_chunk_ids.append(applied.chunk_id)
if _embedding_enabled():
_embed_store_chunks(conn, store_chunk_ids, embed_fn)
conn.execute("COMMIT")
conn.setbusytimeout(_checkpoint_busy_timeout_ms())
try:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
result.checkpoints += 1
except apsw.Error as exc:
_log(log_path, f"burn drain checkpoint skipped: {exc}")
finally:
conn.setbusytimeout(_drain_busy_timeout_ms())
break
except Exception as exc:
try:
conn.execute("ROLLBACK")
except Exception:
pass
if batch_includes_store and _is_missing_chunks_error(exc) and schema_attempt == 0:
conn.close()
conn = None
if not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context="burn drain failed before transaction",
force=True,
):
result.failed_files = len(batch)
return result
continue
result.failed_files = len(batch)
_log(log_path, f"burn drain failed; batch preserved: {exc}")
return result
finally:
if conn is not None:
conn.close()

for path, _events in batch:
try:
Expand Down Expand Up @@ -1080,6 +1152,13 @@ def drain_once(
continue
events_to_apply = events[: _max_events_per_transaction()]
remaining_events = events[len(events_to_apply) :]
events_include_store = _events_include_store(events_to_apply)
if events_include_store and not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context=f"drain failed for {path.name}",
):
break

for attempt in range(5):
conn: apsw.Connection | None = None
Expand Down Expand Up @@ -1140,6 +1219,18 @@ def drain_once(
_log(log_path, f"drain busy; retrying in {delay:.2f}s")
time.sleep(delay)
continue
if events_include_store and _is_missing_chunks_error(exc) and attempt < 4:
if conn is not None:
conn.close()
conn = None
if not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context=f"drain failed for {path.name}",
force=True,
):
break
Comment on lines +1231 to +1232

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop draining after contended schema bootstrap

When the DB file already exists but lacks chunks, a store file reaches this forced schema-bootstrap path; if bootstrap is busy, this break exits only the retry loop, so drain_once continues to later queue files while preserving the current one. A subsequent file can then initialize the schema and be applied/deleted before the earlier preserved store, violating queue order and potentially applying supersedes/dedupe against the wrong prior state. This should stop the outer drain pass just like the pre-transaction schema-busy path above.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2ca90cc. drain_once now stops the outer file loop when forced schema bootstrap is busy after a missing-chunks failure, so later queue files are not applied or unlinked while the first store file remains queued. Added test_drain_stops_after_forced_schema_bootstrap_writer_in_use, which failed before the fix by calling schema bootstrap for the second file and now verifies both queued payloads are unchanged.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop draining after forced schema bootstrap is busy

When the DB file is non-empty but still lacks chunks, the first store file reaches this forced schema bootstrap path; if another writer owns schema init, this break exits only the retry loop, so the outer for path in files continues and attempts later queue files in the same drain run. That violates FIFO/write-safety under schema contention and can repeatedly open the DB/log failures for subsequent payloads instead of preserving the queue until the writer releases; return or break the outer drain loop here.

Useful? React with 👍 / 👎.

continue
Comment on lines +1226 to +1233

@coderabbitai coderabbitai Bot Jun 19, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stop draining after preserving a schema-blocked store file.

When forced schema bootstrap is busy, Line 1232 only exits the retry loop. The outer file loop then advances, so later queue files can be applied/unlinked while this store file remains queued, reordering queued writes. Return or break the outer loop here, matching the pre-transaction path at Line 1161.

As per coding guidelines, **/*.py: “Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior.”

Suggested stop-on-preserve fix
                         if not _ensure_drain_db_schema_preserving_queue(
                             db_path,
                             log_path,
                             context=f"drain failed for {path.name}",
                             force=True,
                         ):
-                            break
+                            return drained
                         continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context=f"drain failed for {path.name}",
force=True,
):
break
continue
if not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context=f"drain failed for {path.name}",
force=True,
):
return drained
continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/brainlayer/drain.py` around lines 1226 - 1233, The current code at the
`_ensure_drain_db_schema_preserving_queue` call only breaks from the inner retry
loop when schema bootstrap is busy, allowing the outer file loop to continue
processing other queue files. This reorders queued writes since the current
store file remains queued while later files are processed. Instead of breaking
from the inner loop, break from the outer loop (or return if applicable) to
match the behavior at line 1161 in the pre-transaction path, ensuring the store
file is fully handled before advancing to process other files.

Source: Coding guidelines

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2ca90cc. drain_once now sets a stop flag when forced schema bootstrap is busy after a missing-chunks failure, breaks the outer file loop, and leaves later queue files untouched until the first preserved store can replay. Added test_drain_stops_after_forced_schema_bootstrap_writer_in_use, which failed before the fix by advancing to the second file and now verifies both queued JSONL payloads are unchanged.

_log(log_path, f"drain failed for {path.name}: {exc}")
break
finally:
Expand Down
9 changes: 7 additions & 2 deletions src/brainlayer/mcp/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,21 @@ def _bootstrap_search_store(db_path) -> None:
bootstrap_store.close()


def _get_vector_store():
def _get_vector_store(timeout: float | None = None):
"""Get or initialize the global VectorStore (thread-safe)."""
global _vector_store
if _vector_store is None:
with _store_lock:
acquired = _store_lock.acquire() if timeout is None else _store_lock.acquire(timeout=max(0.0, timeout))
if not acquired:
raise apsw.BusyError("timed out waiting for vector store initialization")
try:
if _vector_store is None:
from ..paths import get_db_path
from ..vector_store import VectorStore

_vector_store = VectorStore(get_db_path())
finally:
_store_lock.release()
return _vector_store


Expand Down
Loading
Loading