Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/brainlayer/drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ def burn_drain_once(
conn.execute("BEGIN IMMEDIATE")
prefetched_state = _prefetch_enrichment_state(conn, all_events)
store_chunk_ids: list[str] = []
attempt_applied_events = 0
attempt_skipped_verified_stale = 0
for _, events in batch:
for event in events:
if _is_verified_redundant_enrichment(event, prefetched_state):
Expand All @@ -1053,15 +1055,17 @@ def burn_drain_once(
chunk_id=payload.get("chunk_id"),
commit=False,
)
result.skipped_verified_stale += 1
attempt_skipped_verified_stale += 1
continue
applied = _apply_event(conn, event)
result.applied_events += 1
attempt_applied_events += 1
if applied.chunk_id:
store_chunk_ids.append(applied.chunk_id)
if _embedding_enabled():
_embed_store_chunks(conn, store_chunk_ids, embed_fn)
conn.execute("COMMIT")
result.applied_events += attempt_applied_events
result.skipped_verified_stale += attempt_skipped_verified_stale
conn.setbusytimeout(_checkpoint_busy_timeout_ms())
try:
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
Expand Down Expand Up @@ -1160,6 +1164,7 @@ def drain_once(
):
break

stop_draining = False
for attempt in range(5):
conn: apsw.Connection | None = None
attempt_drained = 0
Expand Down Expand Up @@ -1229,13 +1234,16 @@ def drain_once(
context=f"drain failed for {path.name}",
force=True,
):
stop_draining = True
break
continue
_log(log_path, f"drain failed for {path.name}: {exc}")
break
finally:
if conn is not None:
conn.close()
if stop_draining:
break
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)
Expand Down
88 changes: 88 additions & 0 deletions tests/test_write_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,50 @@ def test_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, t
assert queued.exists()
assert "batch preserved" in log_path.read_text()

def test_drain_stops_after_forced_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch):
"""A schema-blocked store file must stop later queue files from draining."""
from brainlayer.vector_store import WriterInUseError

db_path = tmp_path / "brainlayer.db"
queue_dir = tmp_path / "queue"
log_path = tmp_path / "drain.log"
queue_dir.mkdir()
monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0")
with sqlite3.connect(db_path) as conn:
conn.execute("CREATE TABLE marker(id TEXT)")

first = queue_dir / "a-store-schema-contended.jsonl"
second = queue_dir / "b-store-must-wait.jsonl"
first_payload = {
"kind": "store_memory",
"chunk_id": "manual-schema-contended-1",
"content": "first store should preserve queue order",
"memory_type": "note",
"project": "test",
}
second_payload = {
"kind": "store_memory",
"chunk_id": "manual-schema-contended-2",
"content": "second store must not be attempted after first schema block",
"memory_type": "note",
"project": "test",
}
first.write_text(json.dumps(first_payload) + "\n")
second.write_text(json.dumps(second_payload) + "\n")
first_before = first.read_text()
second_before = second.read_text()

with patch(
"brainlayer.drain._ensure_drain_db_schema",
side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"),
) as ensure_schema:
drained = drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=2, log_path=log_path)

assert drained == 0
assert ensure_schema.call_count == 1
assert first.read_text() == first_before
assert second.read_text() == second_before

def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch):
"""Burn drain must preserve store files if first-run schema init is contended."""
from brainlayer.vector_store import WriterInUseError
Expand Down Expand Up @@ -394,6 +438,50 @@ def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(se
assert queued.exists()
assert "batch preserved" in log_path.read_text()

def test_burn_drain_missing_chunks_retry_counts_events_once(self, tmp_path, monkeypatch):
"""A failed first schema attempt must not inflate committed event counts."""
db_path = tmp_path / "brainlayer.db"
queue_dir = tmp_path / "queue"
log_path = tmp_path / "drain.log"
queue_dir.mkdir()
monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0")
with sqlite3.connect(db_path) as conn:
conn.execute("CREATE TABLE marker(id TEXT)")

queued = queue_dir / "store-missing-chunks-retry.jsonl"
queued.write_text(
json.dumps({"kind": "unknown", "content": "noop counted only after commit"})
+ "\n"
+ json.dumps(
{
"kind": "store_memory",
"chunk_id": "manual-missing-chunks-retry",
"content": "burn drain retry should count committed events once",
"memory_type": "note",
"project": "test",
}
)
+ "\n"
)

result = burn_drain_once(
db_path=db_path,
queue_dir=queue_dir,
batch_size=1,
log_path=log_path,
)

assert result.applied_events == 2
assert result.failed_files == 0
assert result.files_deleted == 1
assert not queued.exists()
with sqlite3.connect(db_path) as conn:
row = conn.execute(
"SELECT id FROM chunks WHERE id = ?",
("manual-missing-chunks-retry",),
).fetchone()
assert row == ("manual-missing-chunks-retry",)

def test_flush_preserves_legacy_pending_chunk_id(self, tmp_path):
"""Legacy pending-stores flush must persist the caller-visible queued ID."""
pending_path = tmp_path / "pending-stores.jsonl"
Expand Down
Loading