-
Notifications
You must be signed in to change notification settings - Fork 7
fix: bound brain_store busy deferral #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
53b2fa6
b26be70
fa6a1ba
f0f50d9
ecc4834
d625e73
21eb174
73712fc
a97a322
e48b2ed
315c1c2
0fa8521
62bb380
e940230
c048857
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -174,6 +174,39 @@ def _open_connection(db_path: Path) -> apsw.Connection: | |||||||||||||||||||||||||||||||||
| return conn | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _ensure_drain_db_schema(db_path: Path) -> None: | ||||||||||||||||||||||||||||||||||
| from .vector_store import VectorStore | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| store = VectorStore(db_path) | ||||||||||||||||||||||||||||||||||
| store.close() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _db_needs_initial_schema(db_path: Path) -> bool: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| return not db_path.exists() or db_path.stat().st_size == 0 | ||||||||||||||||||||||||||||||||||
| except OSError: | ||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _ensure_drain_db_schema_preserving_queue( | ||||||||||||||||||||||||||||||||||
| db_path: Path, | ||||||||||||||||||||||||||||||||||
| log_path: Path, | ||||||||||||||||||||||||||||||||||
| *, | ||||||||||||||||||||||||||||||||||
| context: str, | ||||||||||||||||||||||||||||||||||
| force: bool = False, | ||||||||||||||||||||||||||||||||||
| ) -> bool: | ||||||||||||||||||||||||||||||||||
| if not force and not _db_needs_initial_schema(db_path): | ||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| _ensure_drain_db_schema(db_path) | ||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||
| except Exception as exc: | ||||||||||||||||||||||||||||||||||
| if not _is_busy_error(exc): | ||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||
| _log(log_path, f"{context} schema init busy; batch preserved: {exc}") | ||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _acquire_queue_lock(queue_dir: Path) -> int: | ||||||||||||||||||||||||||||||||||
| fd = os.open(queue_dir, os.O_RDONLY) | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
|
|
@@ -312,6 +345,14 @@ def _event_payload(event: dict[str, Any]) -> dict[str, Any]: | |||||||||||||||||||||||||||||||||
| return {"kind": "unknown", **event} | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _events_include_store(events: list[dict[str, Any]]) -> bool: | ||||||||||||||||||||||||||||||||||
| return any(_event_payload(event).get("kind") == "store_memory" for event in events) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _is_missing_chunks_error(exc: BaseException) -> bool: | ||||||||||||||||||||||||||||||||||
| return "no such table: chunks" in str(exc).lower() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _event_created_at(event: dict[str, Any]) -> str: | ||||||||||||||||||||||||||||||||||
| raw_created_at = event.get("created_at") | ||||||||||||||||||||||||||||||||||
| if raw_created_at: | ||||||||||||||||||||||||||||||||||
|
|
@@ -441,6 +482,24 @@ def _run_enrichment_provenance_hooks( | |||||||||||||||||||||||||||||||||
| logger.exception("drain auto_supersede failed for chunk_id=%s", chunk_id) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _apply_store_supersedes(conn: apsw.Connection, old_chunk_id: str | None, new_chunk_id: str) -> None: | ||||||||||||||||||||||||||||||||||
| if not old_chunk_id: | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| cols = _columns(conn, "chunks") | ||||||||||||||||||||||||||||||||||
| if "superseded_by" not in cols: | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| if "status" in cols: | ||||||||||||||||||||||||||||||||||
| conn.execute( | ||||||||||||||||||||||||||||||||||
| "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", | ||||||||||||||||||||||||||||||||||
| (new_chunk_id, old_chunk_id), | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (new_chunk_id, old_chunk_id)) | ||||||||||||||||||||||||||||||||||
| for vector_table in ("chunk_vectors", "chunk_vectors_binary"): | ||||||||||||||||||||||||||||||||||
| if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone(): | ||||||||||||||||||||||||||||||||||
| conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (old_chunk_id,)) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: | ||||||||||||||||||||||||||||||||||
| raw_content = event.get("content") | ||||||||||||||||||||||||||||||||||
| if raw_content is None: | ||||||||||||||||||||||||||||||||||
|
|
@@ -468,6 +527,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: | |||||||||||||||||||||||||||||||||
| metadata.update(raw_metadata) | ||||||||||||||||||||||||||||||||||
| elif raw_metadata: | ||||||||||||||||||||||||||||||||||
| logger.warning("Skipping non-object store metadata for chunk_id=%s", event.get("chunk_id")) | ||||||||||||||||||||||||||||||||||
| supersedes = event.get("supersedes") or metadata.get("supersedes") | ||||||||||||||||||||||||||||||||||
| tags = event.get("tags") | ||||||||||||||||||||||||||||||||||
| explicit_chunk_origin = event.get("chunk_origin") or metadata.get("chunk_origin") | ||||||||||||||||||||||||||||||||||
| existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() | ||||||||||||||||||||||||||||||||||
|
|
@@ -485,6 +545,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: | |||||||||||||||||||||||||||||||||
| "last_seen_at": now, | ||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| _apply_store_supersedes(conn, supersedes, chunk_id) | ||||||||||||||||||||||||||||||||||
| return ApplyResult(chunk_id=chunk_id) | ||||||||||||||||||||||||||||||||||
| return ApplyResult(collision_chunk_id=chunk_id) | ||||||||||||||||||||||||||||||||||
| stored_chunk_id = _insert_or_merge_chunk( | ||||||||||||||||||||||||||||||||||
|
|
@@ -509,21 +570,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: | |||||||||||||||||||||||||||||||||
| "chunk_origin": detect_chunk_origin(content, explicit_chunk_origin), | ||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| supersedes = event.get("supersedes") or metadata.get("supersedes") | ||||||||||||||||||||||||||||||||||
| cols = _columns(conn, "chunks") | ||||||||||||||||||||||||||||||||||
| if supersedes and "superseded_by" in cols: | ||||||||||||||||||||||||||||||||||
| if "status" in cols: | ||||||||||||||||||||||||||||||||||
| conn.execute( | ||||||||||||||||||||||||||||||||||
| "UPDATE chunks SET superseded_by = ?, status = 'superseded' WHERE id = ?", | ||||||||||||||||||||||||||||||||||
| (stored_chunk_id, supersedes), | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (stored_chunk_id, supersedes)) | ||||||||||||||||||||||||||||||||||
| for vector_table in ("chunk_vectors", "chunk_vectors_binary"): | ||||||||||||||||||||||||||||||||||
| if conn.execute( | ||||||||||||||||||||||||||||||||||
| "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) | ||||||||||||||||||||||||||||||||||
| ).fetchone(): | ||||||||||||||||||||||||||||||||||
| conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) | ||||||||||||||||||||||||||||||||||
| _apply_store_supersedes(conn, supersedes, stored_chunk_id) | ||||||||||||||||||||||||||||||||||
| entity_id = event.get("entity_id") or metadata.get("entity_id") | ||||||||||||||||||||||||||||||||||
| if entity_id and {"kg_entities", "kg_entity_chunks"}.issubset(_table_names(conn)): | ||||||||||||||||||||||||||||||||||
| if conn.execute("SELECT id FROM kg_entities WHERE id = ?", (entity_id,)).fetchone(): | ||||||||||||||||||||||||||||||||||
|
|
@@ -758,7 +805,9 @@ def _table_names(conn: apsw.Connection) -> set[str]: | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _is_busy_error(exc: BaseException) -> bool: | ||||||||||||||||||||||||||||||||||
| return _is_sqlite_busy_error(exc) | ||||||||||||||||||||||||||||||||||
| from .vector_store import WriterInUseError | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return isinstance(exc, WriterInUseError) or _is_sqlite_busy_error(exc) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _default_embed_fn() -> Callable[[str], list[float]]: | ||||||||||||||||||||||||||||||||||
|
|
@@ -979,49 +1028,72 @@ def burn_drain_once( | |||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| all_events = [event for _, events in batch for event in events] | ||||||||||||||||||||||||||||||||||
| conn = _open_connection(db_path) | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| _ensure_enrichment_update_schema(conn) | ||||||||||||||||||||||||||||||||||
| conn.execute("BEGIN IMMEDIATE") | ||||||||||||||||||||||||||||||||||
| prefetched_state = _prefetch_enrichment_state(conn, all_events) | ||||||||||||||||||||||||||||||||||
| store_chunk_ids: list[str] = [] | ||||||||||||||||||||||||||||||||||
| for _, events in batch: | ||||||||||||||||||||||||||||||||||
| for event in events: | ||||||||||||||||||||||||||||||||||
| if _is_verified_redundant_enrichment(event, prefetched_state): | ||||||||||||||||||||||||||||||||||
| payload = _event_payload(event) | ||||||||||||||||||||||||||||||||||
| enqueue_provenance_resolution_for_entities( | ||||||||||||||||||||||||||||||||||
| conn, | ||||||||||||||||||||||||||||||||||
| payload.get("entities"), | ||||||||||||||||||||||||||||||||||
| chunk_id=payload.get("chunk_id"), | ||||||||||||||||||||||||||||||||||
| commit=False, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| result.skipped_verified_stale += 1 | ||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| applied = _apply_event(conn, event) | ||||||||||||||||||||||||||||||||||
| result.applied_events += 1 | ||||||||||||||||||||||||||||||||||
| if applied.chunk_id: | ||||||||||||||||||||||||||||||||||
| store_chunk_ids.append(applied.chunk_id) | ||||||||||||||||||||||||||||||||||
| if _embedding_enabled(): | ||||||||||||||||||||||||||||||||||
| _embed_store_chunks(conn, store_chunk_ids, embed_fn) | ||||||||||||||||||||||||||||||||||
| conn.execute("COMMIT") | ||||||||||||||||||||||||||||||||||
| conn.setbusytimeout(_checkpoint_busy_timeout_ms()) | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") | ||||||||||||||||||||||||||||||||||
| result.checkpoints += 1 | ||||||||||||||||||||||||||||||||||
| except apsw.Error as exc: | ||||||||||||||||||||||||||||||||||
| _log(log_path, f"burn drain checkpoint skipped: {exc}") | ||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||
| conn.setbusytimeout(_drain_busy_timeout_ms()) | ||||||||||||||||||||||||||||||||||
| except Exception as exc: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| conn.execute("ROLLBACK") | ||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||
| batch_includes_store = _events_include_store(all_events) | ||||||||||||||||||||||||||||||||||
| if batch_includes_store and not _ensure_drain_db_schema_preserving_queue( | ||||||||||||||||||||||||||||||||||
| db_path, | ||||||||||||||||||||||||||||||||||
| log_path, | ||||||||||||||||||||||||||||||||||
| context="burn drain failed before transaction", | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| result.failed_files = len(batch) | ||||||||||||||||||||||||||||||||||
| _log(log_path, f"burn drain failed; batch preserved: {exc}") | ||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||
| conn.close() | ||||||||||||||||||||||||||||||||||
| for schema_attempt in range(2): | ||||||||||||||||||||||||||||||||||
| conn = _open_connection(db_path) | ||||||||||||||||||||||||||||||||||
|
macroscopeapp[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| _ensure_enrichment_update_schema(conn) | ||||||||||||||||||||||||||||||||||
| conn.execute("BEGIN IMMEDIATE") | ||||||||||||||||||||||||||||||||||
| prefetched_state = _prefetch_enrichment_state(conn, all_events) | ||||||||||||||||||||||||||||||||||
| store_chunk_ids: list[str] = [] | ||||||||||||||||||||||||||||||||||
| for _, events in batch: | ||||||||||||||||||||||||||||||||||
| for event in events: | ||||||||||||||||||||||||||||||||||
| if _is_verified_redundant_enrichment(event, prefetched_state): | ||||||||||||||||||||||||||||||||||
| payload = _event_payload(event) | ||||||||||||||||||||||||||||||||||
| enqueue_provenance_resolution_for_entities( | ||||||||||||||||||||||||||||||||||
| conn, | ||||||||||||||||||||||||||||||||||
| payload.get("entities"), | ||||||||||||||||||||||||||||||||||
| chunk_id=payload.get("chunk_id"), | ||||||||||||||||||||||||||||||||||
| commit=False, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| result.skipped_verified_stale += 1 | ||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| applied = _apply_event(conn, event) | ||||||||||||||||||||||||||||||||||
| result.applied_events += 1 | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+1058
to
+1059
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This increments the public Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||||||||||
| if applied.chunk_id: | ||||||||||||||||||||||||||||||||||
| store_chunk_ids.append(applied.chunk_id) | ||||||||||||||||||||||||||||||||||
| if _embedding_enabled(): | ||||||||||||||||||||||||||||||||||
| _embed_store_chunks(conn, store_chunk_ids, embed_fn) | ||||||||||||||||||||||||||||||||||
| conn.execute("COMMIT") | ||||||||||||||||||||||||||||||||||
| conn.setbusytimeout(_checkpoint_busy_timeout_ms()) | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") | ||||||||||||||||||||||||||||||||||
| result.checkpoints += 1 | ||||||||||||||||||||||||||||||||||
| except apsw.Error as exc: | ||||||||||||||||||||||||||||||||||
| _log(log_path, f"burn drain checkpoint skipped: {exc}") | ||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||
| conn.setbusytimeout(_drain_busy_timeout_ms()) | ||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||
| except Exception as exc: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
| conn.execute("ROLLBACK") | ||||||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||
| if batch_includes_store and _is_missing_chunks_error(exc) and schema_attempt == 0: | ||||||||||||||||||||||||||||||||||
| conn.close() | ||||||||||||||||||||||||||||||||||
| conn = None | ||||||||||||||||||||||||||||||||||
| if not _ensure_drain_db_schema_preserving_queue( | ||||||||||||||||||||||||||||||||||
| db_path, | ||||||||||||||||||||||||||||||||||
| log_path, | ||||||||||||||||||||||||||||||||||
| context="burn drain failed before transaction", | ||||||||||||||||||||||||||||||||||
| force=True, | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| result.failed_files = len(batch) | ||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| result.failed_files = len(batch) | ||||||||||||||||||||||||||||||||||
| _log(log_path, f"burn drain failed; batch preserved: {exc}") | ||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||
| if conn is not None: | ||||||||||||||||||||||||||||||||||
| conn.close() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| for path, _events in batch: | ||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||
|
|
@@ -1080,6 +1152,13 @@ def drain_once( | |||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| events_to_apply = events[: _max_events_per_transaction()] | ||||||||||||||||||||||||||||||||||
| remaining_events = events[len(events_to_apply) :] | ||||||||||||||||||||||||||||||||||
| events_include_store = _events_include_store(events_to_apply) | ||||||||||||||||||||||||||||||||||
| if events_include_store and not _ensure_drain_db_schema_preserving_queue( | ||||||||||||||||||||||||||||||||||
| db_path, | ||||||||||||||||||||||||||||||||||
| log_path, | ||||||||||||||||||||||||||||||||||
| context=f"drain failed for {path.name}", | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| for attempt in range(5): | ||||||||||||||||||||||||||||||||||
| conn: apsw.Connection | None = None | ||||||||||||||||||||||||||||||||||
|
|
@@ -1140,6 +1219,18 @@ def drain_once( | |||||||||||||||||||||||||||||||||
| _log(log_path, f"drain busy; retrying in {delay:.2f}s") | ||||||||||||||||||||||||||||||||||
| time.sleep(delay) | ||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
| if events_include_store and _is_missing_chunks_error(exc) and attempt < 4: | ||||||||||||||||||||||||||||||||||
| if conn is not None: | ||||||||||||||||||||||||||||||||||
| conn.close() | ||||||||||||||||||||||||||||||||||
| conn = None | ||||||||||||||||||||||||||||||||||
| if not _ensure_drain_db_schema_preserving_queue( | ||||||||||||||||||||||||||||||||||
| db_path, | ||||||||||||||||||||||||||||||||||
| log_path, | ||||||||||||||||||||||||||||||||||
| context=f"drain failed for {path.name}", | ||||||||||||||||||||||||||||||||||
| force=True, | ||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+1231
to
+1232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the DB file already exists but lacks Useful? React with 👍 / 👎.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 2ca90cc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the DB file is non-empty but still lacks Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+1226
to
+1233
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop draining after preserving a schema-blocked store file. When forced schema bootstrap is busy, Line 1232 only exits the retry loop. The outer file loop then advances, so later queue files can be applied/unlinked while this store file remains queued, reordering queued writes. Return or break the outer loop here, matching the pre-transaction path at Line 1161. As per coding guidelines, Suggested stop-on-preserve fix if not _ensure_drain_db_schema_preserving_queue(
db_path,
log_path,
context=f"drain failed for {path.name}",
force=True,
):
- break
+ return drained
continue📝 Committable suggestion
Suggested change
🤖 Prompt for AI AgentsSource: Coding guidelines
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 2ca90cc. |
||||||||||||||||||||||||||||||||||
| _log(log_path, f"drain failed for {path.name}: {exc}") | ||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a store batch is drained against a missing or empty DB while another BrainLayer process already owns the VectorStore writer pidfile, this
VectorStore(db_path)construction raisesWriterInUseError. The helper is called fromdrain_once/burn_drain_oncebefore their existing failure-preserving retry blocks, and that exception is not treated as an APSW busy error, so the drain daemon exits instead of leaving the JSONL file queued for a later retry; a first-run/startup race can strand queuedbrain_storeevents until the daemon is restarted.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in c048857.
drain_onceandburn_drain_oncenow treatWriterInUseErroras busy-like schema bootstrap contention and preserve queued store batches instead of letting the exception escape. The same preserve path is used for missing-chunks schema bootstrap retries. Added drain and burn-drain regressions that patch schema initialization to raiseWriterInUseErrorand assert the queue file remains for retry.