Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion hooks/brainlayer-prompt-search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import json
import os
import queue
import re
import sqlite3
import sys
import threading
import time
from datetime import datetime, timedelta, timezone
from hashlib import sha256
Expand All @@ -34,6 +36,7 @@
MAX_ADAPTIVE_INJECTION = 3
MAX_HYBRID_CANDIDATES = 8
DEGRADED_PREFIX = "⚠️ DEGRADED: BrainLayer"
DEFAULT_EMBED_TIMEOUT_MS = 1000.0


def degraded_notice(reason):
Expand All @@ -44,6 +47,38 @@ def emit_degraded(reason):
print(degraded_notice(reason))


def embed_timeout_ms():
raw = os.environ.get("BRAINLAYER_EMBED_TIMEOUT_MS", str(DEFAULT_EMBED_TIMEOUT_MS))
try:
value = float(raw)
except (TypeError, ValueError):
return DEFAULT_EMBED_TIMEOUT_MS
if not value or value < 0:
return DEFAULT_EMBED_TIMEOUT_MS
return min(value, 30_000.0)


def run_with_timeout(func, timeout_ms, *args, **kwargs):
results = queue.Queue(maxsize=1)

def target():
try:
results.put((True, func(*args, **kwargs)))
except BaseException as exc:
results.put((False, exc))

thread = threading.Thread(target=target, name="brainlayer-hook-hybrid", daemon=True)
thread.start()
thread.join(timeout_ms / 1000.0)
if thread.is_alive():
raise TimeoutError(f"hybrid search exceeded {timeout_ms:.0f}ms")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout leaves background work running

Medium Severity

When the embed/hybrid deadline is exceeded, the caller falls back to FTS, but the timed-out work keeps running in a daemon thread or default executor. Stalled embedding and hybrid work can continue to hold the model and database, competing with the FTS path and later searches in the same process.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2ae7109. Configure here.


ok, payload = results.get_nowait()
if ok:
return payload
raise payload


# Prompts shorter than this are probably greetings/commands — skip search
MIN_PROMPT_LENGTH = 15
HEBREW_CANDIDATE_RE = re.compile(r"[\u0590-\u05FF]{2,}")
Expand Down Expand Up @@ -944,7 +979,7 @@ def run_fts_search(db_path, keywords, limit):
def search_prompt_chunks(prompt, db_path, keywords, limit):
"""Search with hybrid first, then fall back to FTS-only behavior."""
try:
return run_hybrid_search(prompt, db_path, keywords, limit), True
return run_with_timeout(run_hybrid_search, embed_timeout_ms(), prompt, db_path, keywords, limit), True
except Exception:
return run_fts_search(db_path, keywords, limit), False

Expand Down
69 changes: 63 additions & 6 deletions src/brainlayer/mcp/search_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
_ORIGIN_ORDER_LABEL = "- Order: origin (earliest among expanded hybrid candidates)"
_HELPER_SOCKET_GLOB = "/tmp/brainbar-hybrid-*.sock"
_HELPER_SOCKET_TIMEOUT_SECONDS = 2.0
_DEFAULT_EMBED_TIMEOUT_MS = 1000.0

from ._format import format_kg_search, format_recalled_context, format_search_results, format_stats
from ._shared import (
Expand All @@ -60,6 +61,17 @@ def _get_vector_store():
return _get_search_vector_store()


def _embed_timeout_ms() -> float:
raw = os.environ.get("BRAINLAYER_EMBED_TIMEOUT_MS", str(_DEFAULT_EMBED_TIMEOUT_MS))
try:
value = float(raw)
except (TypeError, ValueError):
return _DEFAULT_EMBED_TIMEOUT_MS
if not value or value < 0:
return _DEFAULT_EMBED_TIMEOUT_MS
return min(value, 30_000.0)


def _origin_candidate_count(num_results: int) -> int:
return min(_MAX_PUBLIC_NUM_RESULTS, max(num_results, _ORIGIN_CANDIDATE_LIMIT))

Expand Down Expand Up @@ -1767,20 +1779,44 @@ async def _search(

normalized_project = _normalize_project_name(project)
loop = asyncio.get_running_loop()
model = _get_embedding_model()
embed_started = search_profile.now()
query_embedding = None
search_mode = "hybrid"
fallback_reason = None
try:
query_embedding = await loop.run_in_executor(None, model.embed_query, query)
embed_timeout_ms = _embed_timeout_ms()
query_embedding = await asyncio.wait_for(
loop.run_in_executor(
None,
lambda: _get_embedding_model().embed_query(query),
),
timeout=embed_timeout_ms / 1000.0,
Comment on lines +1789 to +1793

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid leaving timed-out embeddings in the shared pool

When the embedding backend/model load is the thing that stalls, this wait_for only times out the asyncio wrapper; the callable submitted via run_in_executor(None, ...) keeps occupying a worker in the shared default executor. Repeated timed-out brain_search calls can therefore exhaust that pool, and with the helper route enabled subsequent searches can block on the helper route's own default-executor call before reaching the FTS fallback. Please isolate/bound the embedding worker or trip a circuit breaker instead of submitting another uncancellable shared-pool job on every search.

Useful? React with 👍 / 👎.

Comment on lines +1788 to +1793

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Bound timed-out embedding calls

When the embedding backend hangs in the long-lived MCP server, asyncio.wait_for times out the asyncio wrapper but cannot stop the run_in_executor thread that is already running embed_query. Each brain_search during that outage can leave another default-executor worker occupied, eventually saturating the executor or growing its queue even though the request falls back to FTS. Gate concurrent embed attempts or use a bounded/circuit-breaker path before scheduling more workers.

Useful? React with 👍 / 👎.

)
except TimeoutError as exc:
search_mode = "fts_only"
fallback_reason = "embed_timeout"
search_profile.emit(
profile_scope,
"embed",
profile_query_id,
search_profile.dur_ms(embed_started),
error=exc.__class__.__name__,
timeout_ms=embed_timeout_ms,
fallback="fts_only",
)
except Exception as exc:
search_mode = "fts_only"
fallback_reason = f"embed_error:{exc.__class__.__name__}"
search_profile.emit(
profile_scope,
"embed",
profile_query_id,
search_profile.dur_ms(embed_started),
error=exc.__class__.__name__,
fallback="fts_only",
)
raise
search_profile.emit(profile_scope, "embed", profile_query_id, search_profile.dur_ms(embed_started))
else:
search_profile.emit(profile_scope, "embed", profile_query_id, search_profile.dur_ms(embed_started))

if source == "all":
source_filter = None
Expand Down Expand Up @@ -1890,7 +1926,14 @@ async def _search(
}
)
structured_results.append(item)
structured = {"query": query, "total": len(structured_results), "results": structured_results}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty results omit fallback metadata

Low Severity

After an embedding timeout or error, successful code paths attach search_mode, fallback_reason, and FTS-only messaging to responses. The early return when hybrid search finds no documents still returns a bare empty payload, so clients cannot tell an FTS-only fallback occurred and users do not see the FTS-only notice.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2ae7109. Configure here.

structured = {
"query": query,
"total": len(structured_results),
"results": structured_results,
"search_mode": search_mode,
}
if fallback_reason:
structured["fallback_reason"] = fallback_reason
if order == "origin":
structured["order"] = order
structured["order_scope"] = _ORIGIN_ORDER_SCOPE
Expand All @@ -1900,9 +1943,16 @@ async def _search(
len(structured_results),
order=order if order == "origin" else None,
)
if search_mode == "fts_only":
formatted_text = (
f"{formatted_text}\n\n"
f"Search mode: FTS-only fallback ({fallback_reason}); vector embedding was skipped."
)
return ([TextContent(type="text", text=formatted_text)], structured)

output_parts = [f"## Search Results for: {query}\n"]
if search_mode == "fts_only":
output_parts.append(f"Search mode: FTS-only fallback ({fallback_reason}); vector embedding was skipped.")
if order == "origin":
output_parts.append(_ORIGIN_ORDER_LABEL)
structured_results = []
Expand Down Expand Up @@ -1973,7 +2023,14 @@ async def _search(
output_parts.append(doc)
output_parts.append("\n---")

structured = {"query": query, "total": len(structured_results), "results": structured_results}
structured = {
"query": query,
"total": len(structured_results),
"results": structured_results,
"search_mode": search_mode,
}
if fallback_reason:
structured["fallback_reason"] = fallback_reason
if order == "origin":
structured["order"] = order
structured["order_scope"] = _ORIGIN_ORDER_SCOPE
Expand Down
15 changes: 13 additions & 2 deletions src/brainlayer/search_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ def _rerank_binary_results_with_float(

def hybrid_search(
self,
query_embedding: List[float],
query_embedding: Optional[List[float]],
query_text: str,
fts_query_override: Optional[str] = None,
n_results: int = 10,
Expand Down Expand Up @@ -1810,8 +1810,19 @@ def hybrid_search(

# 1. Semantic search leg — prefer binary vectors, fall back to float vectors
# when the binary index is unavailable (for example readonly live DBs).
# A None embedding is an intentional FTS-only fallback: query embedding
# must never block lexical reads.
candidate_fetch_count = max(n_results * 3, _MMR_CANDIDATE_LIMIT)
if getattr(self, "_binary_index_available", False):
if query_embedding is None:
semantic = {"ids": [[]], "documents": [[]], "metadatas": [[]], "distances": [[]]}
search_profile.emit(
profile_scope,
"semantic_skip",
profile_query_id,
0.0,
reason="missing_query_embedding",
)
elif getattr(self, "_binary_index_available", False):
binary_started = search_profile.now()
semantic = self._binary_search(
query_embedding=query_embedding,
Expand Down
43 changes: 43 additions & 0 deletions tests/test_adaptive_injection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for score-based adaptive prompt injection in the BrainLayer hook."""

import importlib.util
import time
from pathlib import Path

import pytest
Expand Down Expand Up @@ -155,6 +156,48 @@ def test_fallback_to_fts_only(self, prompt_search, monkeypatch):
assert used_hybrid is False
assert [row["id"] for row in rows] == ["fts-best"]

def test_slow_hybrid_search_falls_back_to_fts_only_within_timeout(self, prompt_search, monkeypatch):
fts_rows = [_row("fts-timeout", 0.0)]

def slow_hybrid(*args, **kwargs):
time.sleep(0.05)
return [_row("late-hybrid", 0.02)]

monkeypatch.setenv("BRAINLAYER_EMBED_TIMEOUT_MS", "1")
monkeypatch.setattr(prompt_search, "run_hybrid_search", slow_hybrid)
monkeypatch.setattr(prompt_search, "run_fts_search", lambda *args, **kwargs: fts_rows)

started = time.monotonic()
rows, used_hybrid = prompt_search.search_prompt_chunks(
prompt="keyword fallback query",
db_path="/tmp/test.db",
keywords=["keyword", "fallback"],
limit=8,
)
elapsed = time.monotonic() - started

assert elapsed < 0.5
assert used_hybrid is False
assert [row["id"] for row in rows] == ["fts-timeout"]

def test_fast_hybrid_search_stays_on_hybrid_path(self, prompt_search, monkeypatch):
hybrid_rows = [_row("hybrid-best", 0.02)]
fts_rows = [_row("fts-unused", 0.0)]

monkeypatch.setenv("BRAINLAYER_EMBED_TIMEOUT_MS", "1000")
monkeypatch.setattr(prompt_search, "run_hybrid_search", lambda *args, **kwargs: hybrid_rows)
monkeypatch.setattr(prompt_search, "run_fts_search", lambda *args, **kwargs: fts_rows)

rows, used_hybrid = prompt_search.search_prompt_chunks(
prompt="keyword fallback query",
db_path="/tmp/test.db",
keywords=["keyword", "fallback"],
limit=8,
)

assert used_hybrid is True
assert [row["id"] for row in rows] == ["hybrid-best"]

def test_hybrid_search_opens_vector_store_readonly(self, prompt_search, monkeypatch, tmp_path):
opened = []

Expand Down
17 changes: 17 additions & 0 deletions tests/test_hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,23 @@ def test_hybrid_search_fts_only_fallback(self, store):

assert "fts-hit" in results["ids"][0]

def test_hybrid_search_accepts_none_embedding_for_fts_only_fallback(self, store):
_insert_chunk(
store,
chunk_id="fts-none-embedding-hit",
content="exact keyword fallback when embedding is unavailable",
embedding=_embed("distant vector"),
)
store.build_binary_index()

results = store.hybrid_search(
query_embedding=None,
query_text="embedding unavailable",
n_results=5,
)

assert results["ids"][0] == ["fts-none-embedding-hit"]

def test_hybrid_search_fts_only_returns_provenance_metadata(self, store):
cursor = store.conn.cursor()
columns = {row[1] for row in cursor.execute("PRAGMA table_info(chunks)")}
Expand Down
Loading
Loading