Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 95 additions & 18 deletions src/brainlayer/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import socket
import sqlite3
import subprocess
import sys
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from pathlib import Path
Expand All @@ -22,6 +23,19 @@
DEFAULT_HOTLANE_LABEL = "com.brainlayer.hotlane-brainbar"
DEFAULT_BRAINBAR_DAEMON_LABEL = "com.brainlayer.brainbar-daemon"
DEFAULT_BACKLOG_BATCH = 128
DEFAULT_HEAL_MIN_CONSECUTIVE_FAILURES = 2
HEAL_MIN_CONSECUTIVE_FAILURES_ENV = "BRAINLAYER_HEAL_MIN_CONSECUTIVE_FAILURES"


def _env_int(name: str, default: int, *, minimum: int = 1) -> int:
raw_value = os.environ.get(name)
if raw_value is None:
return default
try:
value = int(raw_value)
except ValueError:
return default
return max(minimum, value)


@dataclass(frozen=True)
Expand All @@ -42,6 +56,12 @@ class HealthCheckConfig:
heal: bool = False
socket_timeout_seconds: float = 5.0
max_stalled_ticks: int = 2
heal_min_consecutive_failures: int = field(
default_factory=lambda: _env_int(
HEAL_MIN_CONSECUTIVE_FAILURES_ENV,
DEFAULT_HEAL_MIN_CONSECUTIVE_FAILURES,
)
)


@dataclass
Expand Down Expand Up @@ -218,13 +238,62 @@ def _kickstart(label: str, command_runner: CommandRunner) -> str:
return f"kickstart:{label}"


def _kickstart_once(result: HealthCheckResult, label: str, command_runner: CommandRunner) -> None:
def _kickstart_once(
result: HealthCheckResult,
label: str,
issue_code: str,
consecutive_failures: int,
command_runner: CommandRunner,
) -> None:
action = f"kickstart:{label}"
if action in result.actions:
return
print(
f"heal action label={label} issue={issue_code} consecutive_failures={consecutive_failures} action={action}",
file=sys.stderr,
)
result.actions.append(_kickstart(label, command_runner))


def _previous_heal_failures(state: dict[str, Any]) -> dict[str, int]:
raw_failures = state.get("heal_failures")
if not isinstance(raw_failures, dict):
return {}
failures: dict[str, int] = {}
for issue_code, count in raw_failures.items():
if not isinstance(issue_code, str) or not isinstance(count, int):
continue
failures[issue_code] = max(0, count)
return failures


def _updated_heal_failures(
previous_failures: dict[str, int],
current_issue_codes: set[str],
) -> dict[str, int]:
return {issue_code: previous_failures.get(issue_code, 0) + 1 for issue_code in sorted(current_issue_codes)}


def _apply_heals(
*,
result: HealthCheckResult,
issue_labels: dict[str, str],
previous_failures: dict[str, int],
config: HealthCheckConfig,
command_runner: CommandRunner,
) -> dict[str, int]:
current_issue_codes = {issue.code for issue in result.issues}
heal_failures = _updated_heal_failures(previous_failures, current_issue_codes)
if not config.heal:
return heal_failures
threshold = max(1, config.heal_min_consecutive_failures)
for issue_code, label in issue_labels.items():
consecutive_failures = heal_failures.get(issue_code, 0)
if consecutive_failures >= threshold:
_kickstart_once(result, label, issue_code, consecutive_failures, command_runner)
return heal_failures


def run_health_check(
config: HealthCheckConfig,
*,
Expand All @@ -235,6 +304,9 @@ def run_health_check(
) -> HealthCheckResult:
now = now_fn()
result = HealthCheckResult(checked_at=now.isoformat(), ok=True)
state = _load_state(config.state_path)
previous_heal_failures = _previous_heal_failures(state)
heal_issue_labels: dict[str, str] = {}

hotlane_processes = parse_hotlane_processes(ps_output_fn())
result.hotlane_running = bool(hotlane_processes)
Expand All @@ -243,17 +315,16 @@ def run_health_check(
HealthIssue("hotlane_dead", "critical", "hotlane BrainBar embedding daemon is not running")
)
if config.heal:
_kickstart_once(result, config.hotlane_label, command_runner)
heal_issue_labels["hotlane_dead"] = config.hotlane_label
else:
result.backlog_batch = min(process.backlog_batch for process in hotlane_processes)
if any(process.backlog_batch <= 0 for process in hotlane_processes):
result.issues.append(
HealthIssue("hotlane_backlog_disabled", "critical", "--backlog-batch is 0; embeddings will not drain")
)
if config.heal:
_kickstart_once(result, config.hotlane_label, command_runner)
heal_issue_labels["hotlane_backlog_disabled"] = config.hotlane_label

state = _load_state(config.state_path)
previous_missing = state.get("missing_vectors")
result.previous_missing_vectors = int(previous_missing) if isinstance(previous_missing, int) else None
try:
Expand Down Expand Up @@ -281,7 +352,7 @@ def run_health_check(
)
stalled_ticks = 0
if config.heal:
_kickstart_once(result, config.hotlane_label, command_runner)
heal_issue_labels["missing_embeddings_climbing"] = config.hotlane_label
elif result.missing_vectors == result.previous_missing_vectors and result.missing_vectors > 0:
stalled_ticks = prior_stalled_ticks + 1
if stalled_ticks >= config.max_stalled_ticks:
Expand All @@ -293,17 +364,8 @@ def run_health_check(
)
)
if config.heal:
_kickstart_once(result, config.hotlane_label, command_runner)
heal_issue_labels["missing_embeddings_not_draining"] = config.hotlane_label
result.stalled_ticks = stalled_ticks
if result.missing_vectors is not None:
_write_state(
config.state_path,
{
"missing_vectors": result.missing_vectors,
"stalled_ticks": result.stalled_ticks,
"ts": now.isoformat(),
},
)

try:
response = socket_request_fn(config.socket_path, config.canary_query, config.socket_timeout_seconds)
Expand All @@ -320,14 +382,29 @@ def run_health_check(
)
)
if config.heal:
_kickstart_once(result, config.brainbar_daemon_label, command_runner)
heal_issue_labels[code] = config.brainbar_daemon_label
except Exception as exc:
result.canary_ok = False
result.issues.append(
HealthIssue("brain_search_canary_failed", "critical", f"BrainBar brain_search canary failed: {exc}")
)
if config.heal:
_kickstart_once(result, config.brainbar_daemon_label, command_runner)

heal_issue_labels["brain_search_canary_failed"] = config.brainbar_daemon_label

heal_failures = _apply_heals(
result=result,
issue_labels=heal_issue_labels,
previous_failures=previous_heal_failures,
config=config,
command_runner=command_runner,
)
if result.missing_vectors is not None or heal_failures:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve missing-vector state on count failures

When SQLite counting fails (for example during a transient DB lock), result.missing_vectors is None but _apply_heals still returns a nonempty map for the missing_embeddings_count_failed issue. This new or heal_failures branch then rewrites the state file without missing_vectors/stalled_ticks, so the next successful tick has no previous baseline and cannot detect climbing or stalled embeddings. Please skip this write or merge the existing progress fields when the count is unavailable.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 83e094b: the state write now merges from the loaded state first, then refreshes heal_failures/ts and only overwrites missing_vectors/stalled_ticks when a fresh count is available. Added regression coverage for count-failure plus heal counter persistence.

state_payload: dict[str, Any] = dict(state)
state_payload["heal_failures"] = heal_failures
state_payload["ts"] = now.isoformat()
if result.missing_vectors is not None:
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
state_payload["missing_vectors"] = result.missing_vectors
state_payload["stalled_ticks"] = result.stalled_ticks
_write_state(config.state_path, state_payload)
Comment thread
cursor[bot] marked this conversation as resolved.
result.ok = not result.issues
return result
104 changes: 92 additions & 12 deletions tests/test_stability_health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,30 @@ def _ok_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> dict
}


def test_backlog_batch_zero_alarms_and_kickstarts_hotlane(tmp_path):
def test_backlog_batch_zero_alarms_but_waits_until_repeated_failure_to_kickstart_hotlane(tmp_path, capsys):
db_path = tmp_path / "brainlayer.db"
state_path = tmp_path / "health-state.json"
_make_db(db_path, total=4, vector_rows=3)
commands: list[list[str]] = []

result = run_health_check(
config = HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True)
first_result = run_health_check(
config,
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n"
),
socket_request_fn=_ok_canary,
command_runner=lambda args: commands.append(args),
now_fn=lambda: datetime(2026, 6, 19, 4, 25, tzinfo=UTC),
)

assert first_result.ok is False
assert "hotlane_backlog_disabled" in [issue.code for issue in first_result.issues]
assert first_result.backlog_batch == 0
assert commands == []
assert "kickstart" not in capsys.readouterr().err

second_result = run_health_check(
HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True),
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n"
Expand All @@ -71,10 +88,16 @@ def test_backlog_batch_zero_alarms_and_kickstarts_hotlane(tmp_path):
now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC),
)

assert result.ok is False
assert "hotlane_backlog_disabled" in [issue.code for issue in result.issues]
assert result.backlog_batch == 0
assert any("com.brainlayer.hotlane-brainbar" in " ".join(command) for command in commands)
assert second_result.ok is False
assert "hotlane_backlog_disabled" in [issue.code for issue in second_result.issues]
assert second_result.backlog_batch == 0
assert len(commands) == 1
assert "com.brainlayer.hotlane-brainbar" in " ".join(commands[0])
stderr = capsys.readouterr().err
assert "heal action" in stderr
assert "label=com.brainlayer.hotlane-brainbar" in stderr
assert "issue=hotlane_backlog_disabled" in stderr
assert "consecutive_failures=2" in stderr


def test_any_zero_backlog_batch_alarms_when_multiple_hotlanes_are_running(tmp_path):
Expand All @@ -84,7 +107,7 @@ def test_any_zero_backlog_batch_alarms_when_multiple_hotlanes_are_running(tmp_pa
commands: list[list[str]] = []

result = run_health_check(
HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True),
HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True, heal_min_consecutive_failures=1),
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 128\n"
"456 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0\n"
Expand Down Expand Up @@ -169,7 +192,42 @@ def test_missing_embeddings_climb_resets_stall_counter(tmp_path):
assert saved["stalled_ticks"] == 0


def test_brainbar_canary_error_alarms_and_kickstarts_brainbar(tmp_path):
def test_heal_state_write_preserves_missing_vector_history_when_count_fails(tmp_path):
state_path = tmp_path / "health-state.json"
state_path.write_text(
json.dumps(
{
"missing_vectors": 7,
"stalled_ticks": 1,
"ts": "2026-06-19T04:25:00+00:00",
}
),
encoding="utf-8",
)

result = run_health_check(
HealthCheckConfig(
db_path=tmp_path / "missing" / "brainlayer.db",
state_path=state_path,
heal=True,
),
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n"
),
socket_request_fn=_ok_canary,
command_runner=lambda _args: None,
now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC),
)

assert result.ok is False
assert "missing_embeddings_count_failed" in [issue.code for issue in result.issues]
saved = json.loads(state_path.read_text(encoding="utf-8"))
assert saved["missing_vectors"] == 7
assert saved["stalled_ticks"] == 1
assert saved["heal_failures"]["hotlane_backlog_disabled"] == 1


def test_brainbar_canary_error_waits_until_repeated_failure_to_kickstart_brainbar(tmp_path):
db_path = tmp_path / "brainlayer.db"
state_path = tmp_path / "health-state.json"
_make_db(db_path, total=3, vector_rows=3)
Expand All @@ -185,8 +243,24 @@ def failed_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> d
},
}

result = run_health_check(
HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True),
config = HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True)
first_result = run_health_check(
config,
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py "
"--interval 1 --backlog-batch 128 --enrich-limit 25\n"
),
socket_request_fn=failed_canary,
command_runner=lambda args: commands.append(args),
now_fn=lambda: datetime(2026, 6, 19, 4, 25, tzinfo=UTC),
)

assert first_result.ok is False
assert "brain_search_canary_failed" in [issue.code for issue in first_result.issues]
assert commands == []

second_result = run_health_check(
config,
ps_output_fn=lambda: (
"123 /usr/bin/python scripts/hotlane_brainbar_daemon.py "
"--interval 1 --backlog-batch 128 --enrich-limit 25\n"
Expand All @@ -196,11 +270,17 @@ def failed_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> d
now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC),
)

assert result.ok is False
assert "brain_search_canary_failed" in [issue.code for issue in result.issues]
assert second_result.ok is False
assert "brain_search_canary_failed" in [issue.code for issue in second_result.issues]
assert any("com.brainlayer.brainbar-daemon" in " ".join(command) for command in commands)


def test_heal_min_consecutive_failures_can_be_overridden_by_env(monkeypatch):
monkeypatch.setenv("BRAINLAYER_HEAL_MIN_CONSECUTIVE_FAILURES", "3")

assert HealthCheckConfig().heal_min_consecutive_failures == 3


def test_health_check_launchagent_runs_every_five_minutes_and_heals():
plist_path = REPO_ROOT / "scripts/launchd/com.brainlayer.health-check.plist"
plist = plistlib.loads(plist_path.read_bytes())
Expand Down
Loading