diff --git a/src/brainlayer/health_check.py b/src/brainlayer/health_check.py index 0fc04e97..16e257a3 100644 --- a/src/brainlayer/health_check.py +++ b/src/brainlayer/health_check.py @@ -9,6 +9,7 @@ import socket import sqlite3 import subprocess +import sys from dataclasses import asdict, dataclass, field from datetime import UTC, datetime from pathlib import Path @@ -22,6 +23,19 @@ DEFAULT_HOTLANE_LABEL = "com.brainlayer.hotlane-brainbar" DEFAULT_BRAINBAR_DAEMON_LABEL = "com.brainlayer.brainbar-daemon" DEFAULT_BACKLOG_BATCH = 128 +DEFAULT_HEAL_MIN_CONSECUTIVE_FAILURES = 2 +HEAL_MIN_CONSECUTIVE_FAILURES_ENV = "BRAINLAYER_HEAL_MIN_CONSECUTIVE_FAILURES" + + +def _env_int(name: str, default: int, *, minimum: int = 1) -> int: + raw_value = os.environ.get(name) + if raw_value is None: + return default + try: + value = int(raw_value) + except ValueError: + return default + return max(minimum, value) @dataclass(frozen=True) @@ -42,6 +56,12 @@ class HealthCheckConfig: heal: bool = False socket_timeout_seconds: float = 5.0 max_stalled_ticks: int = 2 + heal_min_consecutive_failures: int = field( + default_factory=lambda: _env_int( + HEAL_MIN_CONSECUTIVE_FAILURES_ENV, + DEFAULT_HEAL_MIN_CONSECUTIVE_FAILURES, + ) + ) @dataclass @@ -218,13 +238,62 @@ def _kickstart(label: str, command_runner: CommandRunner) -> str: return f"kickstart:{label}" -def _kickstart_once(result: HealthCheckResult, label: str, command_runner: CommandRunner) -> None: +def _kickstart_once( + result: HealthCheckResult, + label: str, + issue_code: str, + consecutive_failures: int, + command_runner: CommandRunner, +) -> None: action = f"kickstart:{label}" if action in result.actions: return + print( + f"heal action label={label} issue={issue_code} consecutive_failures={consecutive_failures} action={action}", + file=sys.stderr, + ) result.actions.append(_kickstart(label, command_runner)) +def _previous_heal_failures(state: dict[str, Any]) -> dict[str, int]: + raw_failures = state.get("heal_failures") + if not isinstance(raw_failures, dict): + return {} + failures: dict[str, int] = {} + for issue_code, count in raw_failures.items(): + if not isinstance(issue_code, str) or not isinstance(count, int): + continue + failures[issue_code] = max(0, count) + return failures + + +def _updated_heal_failures( + previous_failures: dict[str, int], + current_issue_codes: set[str], +) -> dict[str, int]: + return {issue_code: previous_failures.get(issue_code, 0) + 1 for issue_code in sorted(current_issue_codes)} + + +def _apply_heals( + *, + result: HealthCheckResult, + issue_labels: dict[str, str], + previous_failures: dict[str, int], + config: HealthCheckConfig, + command_runner: CommandRunner, +) -> dict[str, int]: + current_issue_codes = {issue.code for issue in result.issues} + heal_failures = _updated_heal_failures(previous_failures, current_issue_codes) + if not config.heal: + return heal_failures + threshold = max(1, config.heal_min_consecutive_failures) + for issue_code, label in issue_labels.items(): + consecutive_failures = heal_failures.get(issue_code, 0) + if consecutive_failures >= threshold: + _kickstart_once(result, label, issue_code, consecutive_failures, command_runner) + return heal_failures + + def run_health_check( config: HealthCheckConfig, *, @@ -235,6 +304,9 @@ def run_health_check( ) -> HealthCheckResult: now = now_fn() result = HealthCheckResult(checked_at=now.isoformat(), ok=True) + state = _load_state(config.state_path) + previous_heal_failures = _previous_heal_failures(state) + heal_issue_labels: dict[str, str] = {} hotlane_processes = parse_hotlane_processes(ps_output_fn()) result.hotlane_running = bool(hotlane_processes) @@ -243,7 +315,7 @@ def run_health_check( HealthIssue("hotlane_dead", "critical", "hotlane BrainBar embedding daemon is not running") ) if config.heal: - _kickstart_once(result, config.hotlane_label, command_runner) + heal_issue_labels["hotlane_dead"] = config.hotlane_label else: result.backlog_batch = min(process.backlog_batch for process in hotlane_processes) if any(process.backlog_batch <= 0 for process in hotlane_processes): @@ -251,9 +323,8 @@ def run_health_check( HealthIssue("hotlane_backlog_disabled", "critical", "--backlog-batch is 0; embeddings will not drain") ) if config.heal: - _kickstart_once(result, config.hotlane_label, command_runner) + heal_issue_labels["hotlane_backlog_disabled"] = config.hotlane_label - state = _load_state(config.state_path) previous_missing = state.get("missing_vectors") result.previous_missing_vectors = int(previous_missing) if isinstance(previous_missing, int) else None try: @@ -281,7 +352,7 @@ def run_health_check( ) stalled_ticks = 0 if config.heal: - _kickstart_once(result, config.hotlane_label, command_runner) + heal_issue_labels["missing_embeddings_climbing"] = config.hotlane_label elif result.missing_vectors == result.previous_missing_vectors and result.missing_vectors > 0: stalled_ticks = prior_stalled_ticks + 1 if stalled_ticks >= config.max_stalled_ticks: @@ -293,17 +364,8 @@ def run_health_check( ) ) if config.heal: - _kickstart_once(result, config.hotlane_label, command_runner) + heal_issue_labels["missing_embeddings_not_draining"] = config.hotlane_label result.stalled_ticks = stalled_ticks - if result.missing_vectors is not None: - _write_state( - config.state_path, - { - "missing_vectors": result.missing_vectors, - "stalled_ticks": result.stalled_ticks, - "ts": now.isoformat(), - }, - ) try: response = socket_request_fn(config.socket_path, config.canary_query, config.socket_timeout_seconds) @@ -320,14 +382,29 @@ def run_health_check( ) ) if config.heal: - _kickstart_once(result, config.brainbar_daemon_label, command_runner) + heal_issue_labels[code] = config.brainbar_daemon_label except Exception as exc: result.canary_ok = False result.issues.append( HealthIssue("brain_search_canary_failed", "critical", f"BrainBar brain_search canary failed: {exc}") ) if config.heal: - _kickstart_once(result, config.brainbar_daemon_label, command_runner) - + heal_issue_labels["brain_search_canary_failed"] = config.brainbar_daemon_label + + heal_failures = _apply_heals( + result=result, + issue_labels=heal_issue_labels, + previous_failures=previous_heal_failures, + config=config, + command_runner=command_runner, + ) + if result.missing_vectors is not None or heal_failures: + state_payload: dict[str, Any] = dict(state) + state_payload["heal_failures"] = heal_failures + state_payload["ts"] = now.isoformat() + if result.missing_vectors is not None: + state_payload["missing_vectors"] = result.missing_vectors + state_payload["stalled_ticks"] = result.stalled_ticks + _write_state(config.state_path, state_payload) result.ok = not result.issues return result diff --git a/tests/test_stability_health_check.py b/tests/test_stability_health_check.py index f8f51289..c045823b 100644 --- a/tests/test_stability_health_check.py +++ b/tests/test_stability_health_check.py @@ -55,13 +55,30 @@ def _ok_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> dict } -def test_backlog_batch_zero_alarms_and_kickstarts_hotlane(tmp_path): +def test_backlog_batch_zero_alarms_but_waits_until_repeated_failure_to_kickstart_hotlane(tmp_path, capsys): db_path = tmp_path / "brainlayer.db" state_path = tmp_path / "health-state.json" _make_db(db_path, total=4, vector_rows=3) commands: list[list[str]] = [] - result = run_health_check( + config = HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True) + first_result = run_health_check( + config, + ps_output_fn=lambda: ( + "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n" + ), + socket_request_fn=_ok_canary, + command_runner=lambda args: commands.append(args), + now_fn=lambda: datetime(2026, 6, 19, 4, 25, tzinfo=UTC), + ) + + assert first_result.ok is False + assert "hotlane_backlog_disabled" in [issue.code for issue in first_result.issues] + assert first_result.backlog_batch == 0 + assert commands == [] + assert "kickstart" not in capsys.readouterr().err + + second_result = run_health_check( HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True), ps_output_fn=lambda: ( "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n" @@ -71,10 +88,16 @@ def test_backlog_batch_zero_alarms_and_kickstarts_hotlane(tmp_path): now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC), ) - assert result.ok is False - assert "hotlane_backlog_disabled" in [issue.code for issue in result.issues] - assert result.backlog_batch == 0 - assert any("com.brainlayer.hotlane-brainbar" in " ".join(command) for command in commands) + assert second_result.ok is False + assert "hotlane_backlog_disabled" in [issue.code for issue in second_result.issues] + assert second_result.backlog_batch == 0 + assert len(commands) == 1 + assert "com.brainlayer.hotlane-brainbar" in " ".join(commands[0]) + stderr = capsys.readouterr().err + assert "heal action" in stderr + assert "label=com.brainlayer.hotlane-brainbar" in stderr + assert "issue=hotlane_backlog_disabled" in stderr + assert "consecutive_failures=2" in stderr def test_any_zero_backlog_batch_alarms_when_multiple_hotlanes_are_running(tmp_path): @@ -84,7 +107,7 @@ def test_any_zero_backlog_batch_alarms_when_multiple_hotlanes_are_running(tmp_pa commands: list[list[str]] = [] result = run_health_check( - HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True), + HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True, heal_min_consecutive_failures=1), ps_output_fn=lambda: ( "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 128\n" "456 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0\n" @@ -169,7 +192,42 @@ def test_missing_embeddings_climb_resets_stall_counter(tmp_path): assert saved["stalled_ticks"] == 0 -def test_brainbar_canary_error_alarms_and_kickstarts_brainbar(tmp_path): +def test_heal_state_write_preserves_missing_vector_history_when_count_fails(tmp_path): + state_path = tmp_path / "health-state.json" + state_path.write_text( + json.dumps( + { + "missing_vectors": 7, + "stalled_ticks": 1, + "ts": "2026-06-19T04:25:00+00:00", + } + ), + encoding="utf-8", + ) + + result = run_health_check( + HealthCheckConfig( + db_path=tmp_path / "missing" / "brainlayer.db", + state_path=state_path, + heal=True, + ), + ps_output_fn=lambda: ( + "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py --interval 1 --backlog-batch 0 --enrich-limit 25\n" + ), + socket_request_fn=_ok_canary, + command_runner=lambda _args: None, + now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC), + ) + + assert result.ok is False + assert "missing_embeddings_count_failed" in [issue.code for issue in result.issues] + saved = json.loads(state_path.read_text(encoding="utf-8")) + assert saved["missing_vectors"] == 7 + assert saved["stalled_ticks"] == 1 + assert saved["heal_failures"]["hotlane_backlog_disabled"] == 1 + + +def test_brainbar_canary_error_waits_until_repeated_failure_to_kickstart_brainbar(tmp_path): db_path = tmp_path / "brainlayer.db" state_path = tmp_path / "health-state.json" _make_db(db_path, total=3, vector_rows=3) @@ -185,8 +243,24 @@ def failed_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> d }, } - result = run_health_check( - HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True), + config = HealthCheckConfig(db_path=db_path, state_path=state_path, heal=True) + first_result = run_health_check( + config, + ps_output_fn=lambda: ( + "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py " + "--interval 1 --backlog-batch 128 --enrich-limit 25\n" + ), + socket_request_fn=failed_canary, + command_runner=lambda args: commands.append(args), + now_fn=lambda: datetime(2026, 6, 19, 4, 25, tzinfo=UTC), + ) + + assert first_result.ok is False + assert "brain_search_canary_failed" in [issue.code for issue in first_result.issues] + assert commands == [] + + second_result = run_health_check( + config, ps_output_fn=lambda: ( "123 /usr/bin/python scripts/hotlane_brainbar_daemon.py " "--interval 1 --backlog-batch 128 --enrich-limit 25\n" @@ -196,11 +270,17 @@ def failed_canary(_socket_path: Path, _query: str, _timeout_seconds: float) -> d now_fn=lambda: datetime(2026, 6, 19, 4, 30, tzinfo=UTC), ) - assert result.ok is False - assert "brain_search_canary_failed" in [issue.code for issue in result.issues] + assert second_result.ok is False + assert "brain_search_canary_failed" in [issue.code for issue in second_result.issues] assert any("com.brainlayer.brainbar-daemon" in " ".join(command) for command in commands) +def test_heal_min_consecutive_failures_can_be_overridden_by_env(monkeypatch): + monkeypatch.setenv("BRAINLAYER_HEAL_MIN_CONSECUTIVE_FAILURES", "3") + + assert HealthCheckConfig().heal_min_consecutive_failures == 3 + + def test_health_check_launchagent_runs_every_five_minutes_and_heals(): plist_path = REPO_ROOT / "scripts/launchd/com.brainlayer.health-check.plist" plist = plistlib.loads(plist_path.read_bytes())