diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index 50f94ac4..143b207e 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -1043,6 +1043,8 @@ def burn_drain_once( conn.execute("BEGIN IMMEDIATE") prefetched_state = _prefetch_enrichment_state(conn, all_events) store_chunk_ids: list[str] = [] + attempt_applied_events = 0 + attempt_skipped_verified_stale = 0 for _, events in batch: for event in events: if _is_verified_redundant_enrichment(event, prefetched_state): @@ -1053,15 +1055,17 @@ def burn_drain_once( chunk_id=payload.get("chunk_id"), commit=False, ) - result.skipped_verified_stale += 1 + attempt_skipped_verified_stale += 1 continue applied = _apply_event(conn, event) - result.applied_events += 1 + attempt_applied_events += 1 if applied.chunk_id: store_chunk_ids.append(applied.chunk_id) if _embedding_enabled(): _embed_store_chunks(conn, store_chunk_ids, embed_fn) conn.execute("COMMIT") + result.applied_events += attempt_applied_events + result.skipped_verified_stale += attempt_skipped_verified_stale conn.setbusytimeout(_checkpoint_busy_timeout_ms()) try: conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") @@ -1160,6 +1164,7 @@ def drain_once( ): break + stop_draining = False for attempt in range(5): conn: apsw.Connection | None = None attempt_drained = 0 @@ -1229,6 +1234,7 @@ def drain_once( context=f"drain failed for {path.name}", force=True, ): + stop_draining = True break continue _log(log_path, f"drain failed for {path.name}: {exc}") @@ -1236,6 +1242,8 @@ def drain_once( finally: if conn is not None: conn.close() + if stop_draining: + break finally: fcntl.flock(lock_fd, fcntl.LOCK_UN) os.close(lock_fd) diff --git a/tests/test_write_queue.py b/tests/test_write_queue.py index ac1ec22e..13c3f979 100644 --- a/tests/test_write_queue.py +++ b/tests/test_write_queue.py @@ -354,6 +354,50 @@ def test_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, t assert queued.exists() assert "batch preserved" in log_path.read_text() + def test_drain_stops_after_forced_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): + """A schema-blocked store file must stop later queue files from draining.""" + from brainlayer.vector_store import WriterInUseError + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + with sqlite3.connect(db_path) as conn: + conn.execute("CREATE TABLE marker(id TEXT)") + + first = queue_dir / "a-store-schema-contended.jsonl" + second = queue_dir / "b-store-must-wait.jsonl" + first_payload = { + "kind": "store_memory", + "chunk_id": "manual-schema-contended-1", + "content": "first store should preserve queue order", + "memory_type": "note", + "project": "test", + } + second_payload = { + "kind": "store_memory", + "chunk_id": "manual-schema-contended-2", + "content": "second store must not be attempted after first schema block", + "memory_type": "note", + "project": "test", + } + first.write_text(json.dumps(first_payload) + "\n") + second.write_text(json.dumps(second_payload) + "\n") + first_before = first.read_text() + second_before = second.read_text() + + with patch( + "brainlayer.drain._ensure_drain_db_schema", + side_effect=WriterInUseError("another writer is using brainlayer.db (pid 123)"), + ) as ensure_schema: + drained = drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=2, log_path=log_path) + + assert drained == 0 + assert ensure_schema.call_count == 1 + assert first.read_text() == first_before + assert second.read_text() == second_before + def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(self, tmp_path, monkeypatch): """Burn drain must preserve store files if first-run schema init is contended.""" from brainlayer.vector_store import WriterInUseError @@ -394,6 +438,50 @@ def test_burn_drain_preserves_store_queue_when_schema_bootstrap_writer_in_use(se assert queued.exists() assert "batch preserved" in log_path.read_text() + def test_burn_drain_missing_chunks_retry_counts_events_once(self, tmp_path, monkeypatch): + """A failed first schema attempt must not inflate committed event counts.""" + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + log_path = tmp_path / "drain.log" + queue_dir.mkdir() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + with sqlite3.connect(db_path) as conn: + conn.execute("CREATE TABLE marker(id TEXT)") + + queued = queue_dir / "store-missing-chunks-retry.jsonl" + queued.write_text( + json.dumps({"kind": "unknown", "content": "noop counted only after commit"}) + + "\n" + + json.dumps( + { + "kind": "store_memory", + "chunk_id": "manual-missing-chunks-retry", + "content": "burn drain retry should count committed events once", + "memory_type": "note", + "project": "test", + } + ) + + "\n" + ) + + result = burn_drain_once( + db_path=db_path, + queue_dir=queue_dir, + batch_size=1, + log_path=log_path, + ) + + assert result.applied_events == 2 + assert result.failed_files == 0 + assert result.files_deleted == 1 + assert not queued.exists() + with sqlite3.connect(db_path) as conn: + row = conn.execute( + "SELECT id FROM chunks WHERE id = ?", + ("manual-missing-chunks-retry",), + ).fetchone() + assert row == ("manual-missing-chunks-retry",) + def test_flush_preserves_legacy_pending_chunk_id(self, tmp_path): """Legacy pending-stores flush must persist the caller-visible queued ID.""" pending_path = tmp_path / "pending-stores.jsonl"