diff --git a/.console/log.md b/.console/log.md index b2a5dcbb..949239fb 100644 --- a/.console/log.md +++ b/.console/log.md @@ -1,3 +1,13 @@ +## 2026-06-12 — #270 rescoped to the query layer (clean on reverted main) + +After reverting #269 (b82b944d), #270 is rebuilt as green-main + the genuinely-new flaky-test +query layer only: query_flaky.py (FlakyTestQueryMixin + FlakyTest/FlakyTestMetrics/RepositoryHealth +query-result projections), the TestSignalQuery mixin hookup in query.py, the __init__ export, and +test_signal_query.py. Includes the #270-review fixes: flaky_test_percent computes a real percentage +(flaky/total*100, zero-guarded); critical_tests derives from the deduplicated set; +3 regression +tests; docstring disambiguating this query view from flaky_test_models.py detection models. The +stale edge-case/integration test files that targeted an unbuilt metric API are gone with the revert. + ## 2026-06-12 — Revert #269 (merged red, broke main CI ~5h) #269 ("parametrized edge-case tests") was merged with 4 failing CI checks. Its ~2,700 lines of diff --git a/src/operations_center/observer/__init__.py b/src/operations_center/observer/__init__.py index eeb09f88..8e8a25f1 100644 --- a/src/operations_center/observer/__init__.py +++ b/src/operations_center/observer/__init__.py @@ -1,8 +1,20 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # Copyright (C) 2026 ProtocolWarden +from operations_center.observer.alert_channels import ( + AlertChannel, + AlertChannelFactory, + AlertChannelResult, + EmailChannel, + GitHubChannel, + SlackChannel, +) from operations_center.observer.collectors.flaky_test_collector import FlakyTestCollector from operations_center.observer.dashboard import DashboardProvider, DashboardSnapshot from operations_center.observer.flaky_test_aggregator import FlakyTestAggregator +from operations_center.observer.flaky_test_alert_config import ( + AlertThreshold, + FlakyTestAlertConfig, +) from operations_center.observer.flaky_test_alerts import ( AlertSeverity, FlakyTestAlert, @@ -49,12 +61,18 @@ ) __all__ = [ + "AlertChannel", + "AlertChannelFactory", + "AlertChannelResult", "AlertSeverity", + "AlertThreshold", "DashboardProvider", "DashboardSnapshot", + "EmailChannel", "FlakyTestAggregationReport", "FlakyTestAggregator", "FlakyTestAlert", + "FlakyTestAlertConfig", "FlakyTestAlertManager", "FlakyTestCollector", "FlakyTestConfig", @@ -65,6 +83,7 @@ "FlakyTestSignal", "FlakyTestStorageManager", "FlakynessCategory", + "GitHubChannel", "HealthChecker", "HTTPSnapshotRepository", "LocalSnapshotRepository", @@ -74,6 +93,7 @@ "RepoObserverService", "RepoStateSnapshot", "S3SnapshotRepository", + "SlackChannel", "SnapshotManager", "SnapshotRepository", "SnapshotValidator", diff --git a/src/operations_center/observer/query.py b/src/operations_center/observer/query.py index ce9fec4c..5874432a 100644 --- a/src/operations_center/observer/query.py +++ b/src/operations_center/observer/query.py @@ -48,6 +48,12 @@ from pathlib import Path from operations_center.observer.models import RepoStateSnapshot, TestSignal +from operations_center.observer.query_flaky import ( + FlakyTest, # noqa: F401 — re-exported for existing importers + FlakyTestMetrics, # noqa: F401 — re-exported for existing importers + FlakyTestQueryMixin, + RepositoryHealth, # noqa: F401 — re-exported for existing importers +) logger = logging.getLogger(__name__) @@ -165,7 +171,7 @@ def is_concerning(self) -> bool: return self.failing_rate >= 0.2 -class TestSignalQuery: +class TestSignalQuery(FlakyTestQueryMixin): """Query API for test signal visibility in observer snapshots. Provides read-only access to historical test signals and aggregated metrics diff --git a/src/operations_center/observer/query_flaky.py b/src/operations_center/observer/query_flaky.py new file mode 100644 index 00000000..874a3c93 --- /dev/null +++ b/src/operations_center/observer/query_flaky.py @@ -0,0 +1,266 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2026 ProtocolWarden +"""Flaky-test query types and mixin for TestSignalQuery. + +Extracted from query.py to keep that module under the 500-line limit. +FlakyTestQueryMixin adds get_flaky_tests, get_test_metrics, get_repository_health, +and filter_by_category to any class that exposes _load_snapshots_in_range and +_get_recent_snapshots helpers. + +Note: FlakyTest / FlakyTestMetrics / RepositoryHealth here are lightweight +query-result projections read from snapshot signals. They are NOT the flaky-test +*detection* subsystem's domain models — for those (FlakyTestMetric, FlakyTestResult, +FlakyTestSessionReport, with flakiness_score / confidence / pattern_entropy and +serialization) see flaky_test_models.py. Mind the singular/plural: FlakyTestMetric +(detection) vs FlakyTestMetrics (this aggregate view). +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field as dataclass_field +from datetime import datetime +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from operations_center.observer.models import RepoStateSnapshot + + +@dataclass +class FlakyTest: + """A single flaky test result from a query. + + Attributes: + name: Test node ID (full path including parameters) + failure_rate: Fraction of runs where test failed (0.0-1.0) + run_count: Number of runs analyzed + category: Flakiness category (INTERMITTENT, ENVIRONMENT, INFRASTRUCTURE, UNKNOWN) + last_failed: Timestamp of most recent failure (or None) + """ + + name: str + failure_rate: float + run_count: int + category: str | None = None + last_failed: datetime | None = None + + +@dataclass +class FlakyTestMetrics: + """Aggregated metrics for flaky tests in a repository. + + Attributes: + total_flaky_tests: Number of tests with failure_rate > 10% + unstable_tests: Number of tests with 5-10% failure rate + critical_tests: Number of tests with failure_rate > 50% + affected_modules: Set of modules/packages with flaky tests + average_failure_rate: Mean failure rate across flaky tests + most_problematic: Top 5 flakiest tests + trend: Change in flaky test count over time period + """ + + total_flaky_tests: int = 0 + unstable_tests: int = 0 + critical_tests: int = 0 + affected_modules: list[str] = dataclass_field(default_factory=list) + average_failure_rate: float = 0.0 + most_problematic: list[FlakyTest] = dataclass_field(default_factory=list) + trend: float = 0.0 + + +@dataclass +class RepositoryHealth: + """Overall repository health based on all signals. + + Attributes: + status: Overall health status (HEALTHY, NOMINAL, DEGRADED, CRITICAL) + flaky_test_percent: Percentage of tests that are flaky + recovery_rate: Percentage of previously flaky tests now stable + failure_rate_trend: Change in failure rate vs previous period + affected_modules_count: Number of modules with issues + estimated_ci_impact_percent: Estimated CI slowdown due to flakiness + last_improved: Timestamp of most recent improvement + """ + + status: str = "NOMINAL" + flaky_test_percent: float = 0.0 + recovery_rate: float = 0.0 + failure_rate_trend: float = 0.0 + affected_modules_count: int = 0 + estimated_ci_impact_percent: float = 0.0 + last_improved: datetime | None = None + + +class FlakyTestQueryMixin(ABC): + """Mixin that adds flaky-test query methods to TestSignalQuery. + + Requires the host class to expose: + _load_snapshots_in_range(timerange) -> list[RepoStateSnapshot] + _get_recent_snapshots(count) -> list[RepoStateSnapshot] + """ + + @abstractmethod + def _load_snapshots_in_range(self, timerange: Any) -> list[RepoStateSnapshot]: ... + + @abstractmethod + def _get_recent_snapshots(self, count: int) -> list[RepoStateSnapshot]: ... + + def get_flaky_tests(self, timerange=None) -> list[FlakyTest]: + """Get all flaky tests detected in a time range. + + Args: + timerange: TimeRange for analysis. If None, uses most recent snapshot only. + + Returns: + List of FlakyTest objects, sorted by failure_rate descending. + Empty list if no flaky tests found or no snapshots available. + """ + if timerange: + snapshots = self._load_snapshots_in_range(timerange) + else: + snapshots = self._get_recent_snapshots(1) + + flaky_tests: list[FlakyTest] = [] + for snapshot in snapshots: + signal = snapshot.signals.flaky_test_signal + if signal.status == "unavailable" or not signal.most_problematic_tests: + continue + + for test_dict in signal.most_problematic_tests: + flaky_test = FlakyTest( + name=test_dict.get("name", "unknown"), + failure_rate=test_dict.get("failure_rate", 0.0), + run_count=test_dict.get("run_count", 0), + category=test_dict.get("category"), + last_failed=None, + ) + flaky_tests.append(flaky_test) + + flaky_tests.sort(key=lambda t: t.failure_rate, reverse=True) + return flaky_tests + + def get_test_metrics(self, timerange=None) -> FlakyTestMetrics | None: + """Get aggregated flaky test metrics for a repository. + + Args: + timerange: TimeRange for analysis. If None, uses most recent snapshot. + + Returns: + FlakyTestMetrics with aggregated statistics, or None if no data available. + """ + if timerange: + snapshots = self._load_snapshots_in_range(timerange) + else: + snapshots = self._get_recent_snapshots(1) + + if not snapshots: + return None + + metrics = FlakyTestMetrics() + seen_tests: set[str] = set() + all_flaky: list[FlakyTest] = [] + + for snapshot in snapshots: + signal = snapshot.signals.flaky_test_signal + if signal.status == "unavailable": + continue + + # Current-state scalars reflect the most recent available snapshot in + # range (last write wins) — counts cannot be summed across snapshots + # without double-counting tests present in more than one. + metrics.total_flaky_tests = signal.flaky_test_count or 0 + metrics.unstable_tests = signal.unstable_test_count or 0 + metrics.affected_modules = signal.affected_modules or [] + metrics.trend = signal.failure_rate_trend or 0.0 + + if signal.most_problematic_tests: + for test_dict in signal.most_problematic_tests: + test_name = test_dict.get("name", "unknown") + if test_name not in seen_tests: + seen_tests.add(test_name) + all_flaky.append( + FlakyTest( + name=test_name, + failure_rate=test_dict.get("failure_rate", 0.0), + run_count=test_dict.get("run_count", 0), + category=test_dict.get("category"), + ) + ) + + # critical_tests derives from the same deduplicated set as most_problematic + # so the two never disagree and a test seen across snapshots is counted once. + metrics.critical_tests = sum(1 for t in all_flaky if t.failure_rate > 0.5) + + if all_flaky: + metrics.average_failure_rate = sum(t.failure_rate for t in all_flaky) / len(all_flaky) + all_flaky.sort(key=lambda t: t.failure_rate, reverse=True) + metrics.most_problematic = all_flaky[:5] + + return metrics if metrics.total_flaky_tests > 0 else None + + def get_repository_health(self, timerange=None) -> RepositoryHealth: + """Get overall repository health assessment. + + Args: + timerange: TimeRange for analysis. If None, uses most recent snapshot. + + Returns: + RepositoryHealth with overall status and key metrics. + """ + if timerange: + snapshots = self._load_snapshots_in_range(timerange) + else: + snapshots = self._get_recent_snapshots(1) + + health = RepositoryHealth() + + if not snapshots: + health.status = "NOMINAL" + return health + + latest = snapshots[-1] + flaky_signal = latest.signals.flaky_test_signal + + if flaky_signal.status != "unavailable": + # flaky_test_percent is a true percentage of the suite (0-100): + # flaky_test_count / total_test_count * 100, per the Stage 0 spec. + # Falls back to 0.0 when the suite size is unknown (no division by zero). + flaky_count = flaky_signal.flaky_test_count or 0 + test_signal = latest.signals.test_signal + total_tests = (test_signal.test_count or 0) if test_signal is not None else 0 + health.flaky_test_percent = ( + (flaky_count / total_tests) * 100.0 if total_tests > 0 else 0.0 + ) + health.recovery_rate = flaky_signal.recovery_rate or 0.0 + health.failure_rate_trend = flaky_signal.failure_rate_trend or 0.0 + health.affected_modules_count = len(flaky_signal.affected_modules or []) + + estimated_impact = flaky_signal.estimated_impact or {} + health.estimated_ci_impact_percent = estimated_impact.get("ci_slowdown_percent", 0.0) + + # Thresholds are in percentage points: >5% critical, >2% degraded. + if health.flaky_test_percent > 5.0: + health.status = "CRITICAL" + elif health.flaky_test_percent > 2.0 or health.failure_rate_trend > 1.0: + health.status = "DEGRADED" + else: + health.status = "HEALTHY" if health.flaky_test_percent == 0 else "NOMINAL" + + return health + + def filter_by_category(self, category: str, timerange=None) -> list[FlakyTest]: + """Get flaky tests filtered by flakiness category. + + Args: + category: Flakiness category to filter by (case-insensitive) + timerange: TimeRange for analysis. If None, uses most recent snapshot. + + Returns: + List of FlakyTest objects matching the category, sorted by failure_rate. + Empty list if no tests match or no snapshots available. + """ + flaky_tests = self.get_flaky_tests(timerange) + category_upper = category.upper() + + filtered = [t for t in flaky_tests if t.category and t.category.upper() == category_upper] + return sorted(filtered, key=lambda t: t.failure_rate, reverse=True) diff --git a/tests/unit/observer/test_signal_query.py b/tests/unit/observer/test_signal_query.py index 17fb83aa..51edc486 100644 --- a/tests/unit/observer/test_signal_query.py +++ b/tests/unit/observer/test_signal_query.py @@ -7,6 +7,7 @@ from operations_center.observer.models import ( DependencyDriftSignal, + FlakyTestSignal, RepoContextSnapshot, RepoSignalsSnapshot, RepoStateSnapshot, @@ -571,3 +572,241 @@ def test_coverage_monitoring_workflow( history = query.list_test_signal_history(TimeRange.last_days(6)) assert len(history) == 5 + + +# Flaky test query tests + + +class TestFlakyTestQueries: + def _make_flaky_snapshot( + self, + run_id: str, + observed_at: datetime, + flaky_count: int = 5, + unstable_count: int = 3, + most_problematic: list[dict] | None = None, + root: Path | None = None, + total_test_count: int | None = None, + ) -> Path: + """Helper to create a snapshot with flaky test signal.""" + if most_problematic is None: + most_problematic = [ + {"name": "tests/test_a.py::test_1", "failure_rate": 0.6, "run_count": 10}, + {"name": "tests/test_b.py::test_2", "failure_rate": 0.4, "run_count": 10}, + {"name": "tests/test_c.py::test_3", "failure_rate": 0.3, "run_count": 10}, + ] + + snapshot = RepoStateSnapshot( + run_id=run_id, + observed_at=observed_at, + source_command="test observe", + repo=RepoContextSnapshot( + name="test-repo", + path=Path("/test"), + current_branch="main", + is_dirty=False, + ), + signals=RepoSignalsSnapshot( + test_signal=TestSignal(status="passing", test_count=total_test_count), + dependency_drift=DependencyDriftSignal(status="unavailable"), + todo_signal=TodoSignal(), + flaky_test_signal=FlakyTestSignal( + status="measured", + flaky_test_count=flaky_count, + unstable_test_count=unstable_count, + most_problematic_tests=most_problematic, + affected_modules=["tests/unit", "tests/integration"], + category_breakdown={"INTERMITTENT": 3, "ENVIRONMENT": 2}, + recovery_rate=0.5, + failure_rate_trend=1.2, + estimated_impact={"ci_slowdown_percent": 15.0}, + ), + ), + ) + + run_dir = root / run_id + run_dir.mkdir(parents=True, exist_ok=True) + json_path = run_dir / "repo_state_snapshot.json" + json_path.write_text(snapshot.model_dump_json(), encoding="utf-8") + return json_path + + def test_get_flaky_tests(self, tmp_snapshot_root: Path) -> None: + """Test retrieving flaky tests.""" + now = datetime.now(UTC) + self._make_flaky_snapshot("run_1", now - timedelta(hours=2), root=tmp_snapshot_root) + + query = TestSignalQuery(root=tmp_snapshot_root) + flaky_tests = query.get_flaky_tests() + + assert len(flaky_tests) == 3 + assert flaky_tests[0].name == "tests/test_a.py::test_1" + assert flaky_tests[0].failure_rate == 0.6 + assert flaky_tests[0].run_count == 10 + + def test_get_flaky_tests_sorted_by_failure_rate(self, tmp_snapshot_root: Path) -> None: + """Test flaky tests are sorted by failure rate descending.""" + now = datetime.now(UTC) + self._make_flaky_snapshot("run_1", now, root=tmp_snapshot_root) + + query = TestSignalQuery(root=tmp_snapshot_root) + flaky_tests = query.get_flaky_tests() + + failure_rates = [t.failure_rate for t in flaky_tests] + assert failure_rates == sorted(failure_rates, reverse=True) + + def test_get_flaky_tests_empty(self, tmp_snapshot_root: Path) -> None: + """Test get_flaky_tests with no flaky tests.""" + now = datetime.now(UTC) + self._make_flaky_snapshot( + "run_1", now, flaky_count=0, most_problematic=[], root=tmp_snapshot_root + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + flaky_tests = query.get_flaky_tests() + + assert len(flaky_tests) == 0 + + def test_get_test_metrics(self, tmp_snapshot_root: Path) -> None: + """Test retrieving aggregated test metrics.""" + now = datetime.now(UTC) + self._make_flaky_snapshot("run_1", now, root=tmp_snapshot_root) + + query = TestSignalQuery(root=tmp_snapshot_root) + metrics = query.get_test_metrics() + + assert metrics is not None + assert metrics.total_flaky_tests == 5 + assert metrics.unstable_tests == 3 + assert len(metrics.affected_modules) == 2 + assert metrics.trend == 1.2 + assert len(metrics.most_problematic) > 0 + + def test_get_repository_health(self, tmp_snapshot_root: Path) -> None: + """Test repository health assessment.""" + now = datetime.now(UTC) + self._make_flaky_snapshot("run_1", now, flaky_count=2, root=tmp_snapshot_root) + + query = TestSignalQuery(root=tmp_snapshot_root) + health = query.get_repository_health() + + assert health.status in ["HEALTHY", "NOMINAL", "DEGRADED", "CRITICAL"] + assert health.flaky_test_percent >= 0.0 + assert health.affected_modules_count == 2 + + def test_repository_health_percent_is_real_percentage_not_count( + self, tmp_snapshot_root: Path + ) -> None: + """flaky_test_percent must be flaky_count/total_tests*100, not the raw count.""" + now = datetime.now(UTC) + # 10 flaky tests out of 200 total = 5.0% (NOT 10.0, the raw count). + self._make_flaky_snapshot( + "run_1", now, flaky_count=10, total_test_count=200, root=tmp_snapshot_root + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + health = query.get_repository_health() + + assert health.flaky_test_percent == pytest.approx(5.0) + assert health.flaky_test_percent != 10.0 # would be the count under the old bug + + def test_repository_health_percent_zero_when_suite_size_unknown( + self, tmp_snapshot_root: Path + ) -> None: + """No total_test_count -> 0.0, never a division-by-zero or a raw count.""" + now = datetime.now(UTC) + self._make_flaky_snapshot( + "run_1", now, flaky_count=7, total_test_count=None, root=tmp_snapshot_root + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + health = query.get_repository_health() + + assert health.flaky_test_percent == 0.0 + + def test_get_test_metrics_critical_tests_deduped_across_snapshots( + self, tmp_snapshot_root: Path + ) -> None: + """The same critical test in two snapshots counts once, never exceeding totals.""" + now = datetime.now(UTC) + crit = [{"name": "tests/t.py::crit", "failure_rate": 0.8, "run_count": 10}] + self._make_flaky_snapshot( + "run_1", now - timedelta(hours=1), flaky_count=1, most_problematic=crit, + root=tmp_snapshot_root, + ) + self._make_flaky_snapshot( + "run_2", now, flaky_count=1, most_problematic=crit, root=tmp_snapshot_root + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + metrics = query.get_test_metrics( + timerange=TimeRange(start=now - timedelta(hours=2), end=now + timedelta(hours=1)) + ) + + assert metrics is not None + # Under the old per-snapshot accumulation this was 2 and exceeded total_flaky_tests=1. + assert metrics.critical_tests == 1 + assert metrics.critical_tests <= metrics.total_flaky_tests + + def test_filter_by_category(self, tmp_snapshot_root: Path) -> None: + """Test filtering flaky tests by category.""" + now = datetime.now(UTC) + self._make_flaky_snapshot( + "run_1", + now, + most_problematic=[ + { + "name": "tests/test_intermittent.py::test", + "failure_rate": 0.5, + "run_count": 10, + "category": "INTERMITTENT", + }, + { + "name": "tests/test_env.py::test", + "failure_rate": 0.3, + "run_count": 10, + "category": "ENVIRONMENT", + }, + ], + root=tmp_snapshot_root, + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + intermittent = query.filter_by_category("INTERMITTENT") + + assert len(intermittent) == 1 + assert intermittent[0].category == "INTERMITTENT" + + def test_filter_by_category_case_insensitive(self, tmp_snapshot_root: Path) -> None: + """Test category filtering is case-insensitive.""" + now = datetime.now(UTC) + self._make_flaky_snapshot( + "run_1", + now, + most_problematic=[ + { + "name": "tests/test.py::test", + "failure_rate": 0.5, + "run_count": 10, + "category": "ENVIRONMENT", + } + ], + root=tmp_snapshot_root, + ) + + query = TestSignalQuery(root=tmp_snapshot_root) + env_tests = query.filter_by_category("environment") + + assert len(env_tests) == 1 + + def test_query_with_time_range(self, tmp_snapshot_root: Path) -> None: + """Test flaky test queries with time range.""" + now = datetime.now(UTC) + self._make_flaky_snapshot("run_old", now - timedelta(days=10), root=tmp_snapshot_root) + self._make_flaky_snapshot("run_recent", now - timedelta(hours=2), root=tmp_snapshot_root) + + query = TestSignalQuery(root=tmp_snapshot_root) + recent = query.get_flaky_tests(TimeRange.last_days(1)) + + assert len(recent) == 3 + first_test_name = recent[0].name + assert first_test_name.startswith("tests/")