From 3995c87f9aca3d51ad602130e1d03bffede82eb3 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 00:41:09 -0400 Subject: [PATCH 01/12] Add heartbeat key and staleness threshold to config --- universal-telemetry-software/src/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/universal-telemetry-software/src/config.py b/universal-telemetry-software/src/config.py index 564014f..3b03bee 100644 --- a/universal-telemetry-software/src/config.py +++ b/universal-telemetry-software/src/config.py @@ -22,6 +22,8 @@ REDIS_STATS_CHANNEL = "system_stats" REDIS_DIAG_CHANNEL = "link_diagnostics" REDIS_WS_CLIENTS_KEY = "websocket_bridge:clients" +REDIS_HEARTBEAT_KEY = "telemetry:heartbeat" +HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if heartbeat older than this # ── Feature flags ───────────────────────────────────────────────────────────── ENABLE_UPLINK = os.getenv("ENABLE_UPLINK", "false").lower() == "true" From 4f2bc3a9cc636a6159645a0daf17250c2885ddc3 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 00:47:33 -0400 Subject: [PATCH 02/12] Add async heartbeat writer module --- universal-telemetry-software/src/heartbeat.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 universal-telemetry-software/src/heartbeat.py diff --git a/universal-telemetry-software/src/heartbeat.py b/universal-telemetry-software/src/heartbeat.py new file mode 100644 index 0000000..b09a2a2 --- /dev/null +++ b/universal-telemetry-software/src/heartbeat.py @@ -0,0 +1,38 @@ +""" +Heartbeat writer — publishes a per-process liveness key to Redis every second +so Redis pub/sub subscribers can detect a half-dead producer (TCP up, subscription +state lost) and reconnect. See docs/superpowers/plans/2026-06-11-stack-resilience.md. +""" +import asyncio +import json +import logging +import time + +from .config import REDIS_HEARTBEAT_KEY +from .redis_utils import get_async_client + +logger = logging.getLogger(__name__) + +HEARTBEAT_INTERVAL_S = 1.0 + + +async def run_heartbeat_writer(redis_client=None) -> None: + """Write a heartbeat key to Redis every HEARTBEAT_INTERVAL_S. + + Stops on cancellation; logs and exits on any other exception so the surrounding + supervisor (systemd / Docker) can restart the process. Skips silently if + no Redis client is available (car mode without Redis). + """ + if redis_client is None: + return + start_mono = time.monotonic() + while True: + try: + payload = json.dumps({"uptime_s": time.monotonic() - start_mono, + "wall_ts": time.time()}) + await redis_client.set(REDIS_HEARTBEAT_KEY, payload, ex=30) + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"Heartbeat write failed: {e}") + await asyncio.sleep(HEARTBEAT_INTERVAL_S) From e2d8fb9730ae194b73201b3c60d42064efea385c Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 00:52:26 -0400 Subject: [PATCH 03/12] Add heartbeat writer unit tests --- .../tests/test_heartbeat.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 universal-telemetry-software/tests/test_heartbeat.py diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py new file mode 100644 index 0000000..b306cd4 --- /dev/null +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -0,0 +1,56 @@ +""" +Heartbeat writer tests — uses a fake async Redis client to avoid network deps. +""" +import asyncio +import json +import pytest +import pytest_asyncio +from unittest.mock import AsyncMock, MagicMock + +from src import heartbeat +from src.config import REDIS_HEARTBEAT_KEY + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture +def fake_redis(): + """Fake async Redis client whose .set() records calls and timestamps.""" + client = MagicMock() + client.set = AsyncMock() + return client + + +async def test_writes_heartbeat_with_uptime(fake_redis): + task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) + await asyncio.sleep(0.05) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + assert fake_redis.set.call_count >= 1 + last_call = fake_redis.set.call_args_list[-1] + assert last_call.args[0] == REDIS_HEARTBEAT_KEY + payload = json.loads(last_call.args[1]) + assert "uptime_s" in payload and "wall_ts" in payload + assert payload["uptime_s"] >= 0 + # ex=30 sets the expiry so a dead process's heartbeat is naturally GC'd + assert last_call.kwargs.get("ex") == 30 + + +async def test_returns_immediately_when_redis_is_none(): + # Should not block or raise; should be a no-op. + await asyncio.wait_for(heartbeat.run_heartbeat_writer(None), timeout=0.2) + + +async def test_continues_after_set_failure(fake_redis): + fake_redis.set = AsyncMock(side_effect=RuntimeError("redis down")) + task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) + await asyncio.sleep(0.05) # give it time to fail at least once + assert not task.done(), "writer should recover and keep running" + task.cancel() + try: + await task + except asyncio.CancelledError: + pass From 374865138e3318a2e503b78afa1e2ffb3498cdb8 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 01:18:23 -0400 Subject: [PATCH 04/12] Tighten heartbeat tests and align writer docstring with behavior --- universal-telemetry-software/src/heartbeat.py | 7 ++++--- universal-telemetry-software/tests/test_heartbeat.py | 6 ++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/universal-telemetry-software/src/heartbeat.py b/universal-telemetry-software/src/heartbeat.py index b09a2a2..cebe0b4 100644 --- a/universal-telemetry-software/src/heartbeat.py +++ b/universal-telemetry-software/src/heartbeat.py @@ -19,9 +19,10 @@ async def run_heartbeat_writer(redis_client=None) -> None: """Write a heartbeat key to Redis every HEARTBEAT_INTERVAL_S. - Stops on cancellation; logs and exits on any other exception so the surrounding - supervisor (systemd / Docker) can restart the process. Skips silently if - no Redis client is available (car mode without Redis). + Stops on cancellation. Logs and continues on any other exception so transient + Redis blips don't take down the writer; if Redis is persistently down, the + surrounding supervisor (systemd / Docker) is expected to restart the process. + Skips silently if no Redis client is available (car mode without Redis). """ if redis_client is None: return diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py index b306cd4..488e22e 100644 --- a/universal-telemetry-software/tests/test_heartbeat.py +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -4,14 +4,11 @@ import asyncio import json import pytest -import pytest_asyncio from unittest.mock import AsyncMock, MagicMock from src import heartbeat from src.config import REDIS_HEARTBEAT_KEY -pytestmark = pytest.mark.asyncio - @pytest.fixture def fake_redis(): @@ -47,7 +44,8 @@ async def test_returns_immediately_when_redis_is_none(): async def test_continues_after_set_failure(fake_redis): fake_redis.set = AsyncMock(side_effect=RuntimeError("redis down")) task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) - await asyncio.sleep(0.05) # give it time to fail at least once + await asyncio.sleep(1.05) # past the first 1.0s HEARTBEAT_INTERVAL_S so the loop has retried + assert fake_redis.set.call_count >= 2, "writer should retry after failure" assert not task.done(), "writer should recover and keep running" task.cancel() try: From 77a4f2fd6074e39cc8fb786a143c0522de13d35f Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 01:20:56 -0400 Subject: [PATCH 05/12] Add _pump_pubsub_with_heartbeat helper to TelemetryNode --- universal-telemetry-software/src/data.py | 58 ++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index ba8706a..a6fab9b 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -89,6 +89,64 @@ def publish(self, channel, data): except asyncio.QueueFull: pass # drop under backpressure rather than block the CAN reader + @staticmethod + async def _pump_pubsub_with_heartbeat(pubsub, redis_client, on_message, *, + stale_s: float = 5.0, + log=None): + """Drain a Redis pubsub, restarting it if the heartbeat goes stale. + + Replaces the naive `async for message in pubsub.listen():` pattern that + silently goes dark when the TCP connection or subscription state is lost + after a car power-cycle. Uses non-blocking get_message(timeout=1.0) so we + get a chance to check the heartbeat key on every iteration. + + `on_message` is an `async` callable invoked with the decoded payload string + for each non-None, non-subscribe-confirmation message. Returns only on + cancellation; outer supervisor is expected to restart the task. + """ + from .config import REDIS_HEARTBEAT_KEY + _log = log or logger + last_hb_mono = 0.0 + while True: + try: + msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True) + except asyncio.CancelledError: + raise + except Exception as e: + _log.warning(f"pubsub.get_message error, reconnecting: {e}") + await asyncio.sleep(0.5) + return # let the outer while-True re-subscribe + + if msg is not None: + try: + await on_message(msg) + except Exception as e: + _log.error(f"subscriber handler error: {e}") + last_hb_mono = time.monotonic() # data flowing — heartbeat check not needed + continue + + # No message this second — check heartbeat + try: + raw = await redis_client.get(REDIS_HEARTBEAT_KEY) if redis_client else None + except Exception as e: + _log.debug(f"heartbeat read failed: {e}") + raw = None + + if raw is None: + # First-time setup or Redis is down — don't reconnect, just wait. + if last_hb_mono == 0.0: + continue + # Heartbeat missing while we've been running: producer is gone or + # the key expired. Tear down so the outer loop re-subscribes. + if time.monotonic() - last_hb_mono > stale_s: + _log.warning("heartbeat stale, forcing pubsub reconnect") + return + else: + last_hb_mono = time.monotonic() + if time.monotonic() - last_hb_mono > stale_s: + _log.warning("heartbeat stale, forcing pubsub reconnect") + return + def _handle_ecu_timestamp(self, data: bytes) -> None: """Extract epoch_ms from ECU VCU_Timestamp message and update clock offset.""" if len(data) < 8: From e6c65911dd94074c03c550082645cb736f012113 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 09:41:51 -0400 Subject: [PATCH 06/12] Use heartbeat-aware pump in data.py uplink-relay pubsub --- universal-telemetry-software/src/data.py | 25 ++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index a6fab9b..5285aec 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -100,9 +100,11 @@ async def _pump_pubsub_with_heartbeat(pubsub, redis_client, on_message, *, after a car power-cycle. Uses non-blocking get_message(timeout=1.0) so we get a chance to check the heartbeat key on every iteration. - `on_message` is an `async` callable invoked with the decoded payload string - for each non-None, non-subscribe-confirmation message. Returns only on - cancellation; outer supervisor is expected to restart the task. + `on_message` is an `async` callable invoked with the raw pubsub message + dict (whatever `get_message()` returns, typically + `{"type": "message", "channel": ..., "data": ...}`) for each non-None, + non-subscribe-confirmation message. Returns only on cancellation; outer + supervisor is expected to restart the task. """ from .config import REDIS_HEARTBEAT_KEY _log = log or logger @@ -927,12 +929,11 @@ async def uplink_relay(): await pubsub.subscribe(REDIS_UPLINK_CHANNEL) logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}") - async for message in pubsub.listen(): - if message['type'] != 'message': - continue - + async def _relay(msg): + if msg['type'] != 'message': + return try: - data = redis_utils.decode_message(message['data']) + data = redis_utils.decode_message(msg['data']) uplink_msg = json.loads(data) can_id = uplink_msg.get("canId") @@ -941,10 +942,10 @@ async def uplink_relay(): if can_id is None or not isinstance(can_id, int) or can_id < 0: logger.warning(f"Uplink relay: invalid canId in ref={ref}") - continue + return if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8: logger.warning(f"Uplink relay: invalid data in ref={ref}") - continue + return # Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message uplink_seq += 1 @@ -964,6 +965,10 @@ async def uplink_relay(): except Exception as e: logger.error(f"Uplink relay error: {e}") + await TelemetryNode._pump_pubsub_with_heartbeat( + pubsub, r, _relay, log=logger, + ) + except Exception as e: logger.error(f"Uplink relay Redis error: {e}") finally: From 22af36f72d8c3a72ed125bc94e568ffd62d13dff Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 09:50:07 -0400 Subject: [PATCH 07/12] Declare uplink_seq nonlocal in data.py _relay; warn in plan --- universal-telemetry-software/src/data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index 5285aec..28f7670 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -930,6 +930,7 @@ async def uplink_relay(): logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}") async def _relay(msg): + nonlocal uplink_seq if msg['type'] != 'message': return try: From 5e440d2b6681a88bd2e886821d28e52a366100be Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 09:56:34 -0400 Subject: [PATCH 08/12] Use heartbeat-aware pump in websocket_bridge pubsub --- .../src/websocket_bridge.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/universal-telemetry-software/src/websocket_bridge.py b/universal-telemetry-software/src/websocket_bridge.py index 4d3357a..21523ac 100644 --- a/universal-telemetry-software/src/websocket_bridge.py +++ b/universal-telemetry-software/src/websocket_bridge.py @@ -18,6 +18,7 @@ ENABLE_UPLINK, ) from src import redis_utils, utils +from src.data import TelemetryNode logger = logging.getLogger("WebSocketBridge") @@ -301,20 +302,22 @@ async def redis_listener(): logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") delay = backoff_min # reset backoff once a subscribe succeeds - async for message in pubsub.listen(): + async def _handler(msg): if shutdown_event.is_set(): - break - - if message['type'] == 'message': - data = redis_utils.decode_message(message['data']) - - # Broadcast to all connected clients - if connected_clients: - # Create tasks for sending to each client to avoid blocking - await asyncio.gather( - *[client.send(data) for client in connected_clients], - return_exceptions=True - ) + return + if msg['type'] != 'message': + return + data = redis_utils.decode_message(msg['data']) + + # Broadcast to all connected clients + if connected_clients: + # Create tasks for sending to each client to avoid blocking + await asyncio.gather( + *[client.send(data) for client in connected_clients], + return_exceptions=True + ) + + await TelemetryNode._pump_pubsub_with_heartbeat(pubsub, r, _handler, log=logger) except asyncio.CancelledError: raise except Exception as e: From 81ecae90a7d9fc75f112df6f5fc51e2b487a4240 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 10:30:57 -0400 Subject: [PATCH 09/12] Register heartbeat writer as an async task --- universal-telemetry-software/src/data.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index 28f7670..2487abd 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -17,6 +17,7 @@ REDIS_WS_CLIENTS_KEY, ) from src import redis_utils, utils +from src.heartbeat import run_heartbeat_writer from src.version import get_git_hash BATCH_SIZE = 20 @@ -608,6 +609,8 @@ async def uplink_receiver(): throughput_listener_task(), utils.heartbeat_coro(self.telemetry_event), inject_heartbeat(), + # Car has no Redis — writer is a clean no-op when redis_client is None. + run_heartbeat_writer(None), ] if ENABLE_UPLINK: tasks.append(uplink_receiver()) @@ -1047,7 +1050,11 @@ async def version_checker(): logger.debug(f"Version check error: {e}") await asyncio.sleep(30.0) - tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event)] + # Base mode has a real Redis server; create an async client for the + # heartbeat writer. The writer writes telemetry:heartbeat every 1s so + # pubsub subscribers can detect a half-dead producer and reconnect. + _async_redis = aioredis.from_url(REDIS_URL) + tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event), run_heartbeat_writer(_async_redis)] if ENABLE_UPLINK: tasks.append(uplink_relay()) await asyncio.gather(*tasks) From 2fa627d61e8fc680274863d3ba805dfc6508f5c7 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 10:37:19 -0400 Subject: [PATCH 10/12] Add end-to-end recovery test for stale-heartbeat reconnect --- .../tests/test_heartbeat.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py index 488e22e..ebbbf4d 100644 --- a/universal-telemetry-software/tests/test_heartbeat.py +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -3,6 +3,7 @@ """ import asyncio import json +import time import pytest from unittest.mock import AsyncMock, MagicMock @@ -52,3 +53,56 @@ async def test_continues_after_set_failure(fake_redis): await task except asyncio.CancelledError: pass + + +async def test_pump_reconnects_when_heartbeat_goes_stale(monkeypatch): + """When the heartbeat key disappears, the pump must return so the outer + while-True loop can re-subscribe. This is the actual failure mode we hit + when the car power-cycles and the base's pubsub sits connected-but-broken. + + The helper's "stale" check is purely time-since-last-seen: it does not + inspect wall_ts / payload age. So the failing scenario is: the heartbeat + key *existed* (last_hb_mono > 0.0), then disappears, and we wait longer + than stale_s. The helper returns and the outer loop re-subscribes. + """ + from src.data import TelemetryNode + + # Fake pubsub: get_message always returns None (no data flowing). + pubsub = MagicMock() + pubsub.get_message = AsyncMock(return_value=None) + + # Fake redis client: heartbeat exists on the first read (so last_hb_mono + # gets set), then disappears forever. That triggers the reconnect path. + fresh_payload = json.dumps({"uptime_s": 1, "wall_ts": time.time()}) + call_count = [0] + + async def get_side_effect(*args, **kwargs): + call_count[0] += 1 + return fresh_payload if call_count[0] == 1 else None + + redis_client = MagicMock() + redis_client.get = AsyncMock(side_effect=get_side_effect) + + # Handler should never be called: no messages ever arrived, only the + # heartbeat-driven reconnect should have happened. + received = [] + + async def handler(msg): + received.append(msg) + + # Tiny stale_s so the test finishes in <2s. + task = asyncio.create_task( + TelemetryNode._pump_pubsub_with_heartbeat( + pubsub, redis_client, handler, stale_s=0.5, + ) + ) + # First iteration: get_message (~1s timeout) returns None, then redis.get + # returns fresh_payload -> last_hb_mono becomes nonzero. Second iteration: + # get_message returns None, redis.get returns None, last_hb_mono > 0 and + # > 0.5s old -> pump returns. Sleep 1.8s to be safe past the 1.0s get_message + # timeout on iteration 1 plus 0.5s stale on iteration 2. + await asyncio.sleep(1.8) + assert task.done(), "pump should have returned after stale heartbeat" + assert received == [], "no messages arrived — only reconnect was triggered" + # Sanity: we exercised the missing-heartbeat path at least once. + assert call_count[0] >= 2, "redis.get should have been polled past the first hit" From 3b2b48d30d033408fc1b1bd7efaacbb8a60e10ee Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 10:51:02 -0400 Subject: [PATCH 11/12] Tighten pump-recovery test: caplog assertion, more headroom --- .../tests/test_heartbeat.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py index ebbbf4d..609f2d5 100644 --- a/universal-telemetry-software/tests/test_heartbeat.py +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -55,7 +55,7 @@ async def test_continues_after_set_failure(fake_redis): pass -async def test_pump_reconnects_when_heartbeat_goes_stale(monkeypatch): +async def test_pump_reconnects_when_heartbeat_goes_stale(caplog): """When the heartbeat key disappears, the pump must return so the outer while-True loop can re-subscribe. This is the actual failure mode we hit when the car power-cycles and the base's pubsub sits connected-but-broken. @@ -99,10 +99,16 @@ async def handler(msg): # First iteration: get_message (~1s timeout) returns None, then redis.get # returns fresh_payload -> last_hb_mono becomes nonzero. Second iteration: # get_message returns None, redis.get returns None, last_hb_mono > 0 and - # > 0.5s old -> pump returns. Sleep 1.8s to be safe past the 1.0s get_message - # timeout on iteration 1 plus 0.5s stale on iteration 2. - await asyncio.sleep(1.8) - assert task.done(), "pump should have returned after stale heartbeat" - assert received == [], "no messages arrived — only reconnect was triggered" - # Sanity: we exercised the missing-heartbeat path at least once. - assert call_count[0] >= 2, "redis.get should have been polled past the first hit" + # > 0.5s old -> pump returns. Sleep 2.5s to be safe past the 1.0s get_message + # timeout on iteration 1 plus 0.5s stale on iteration 2, with headroom for + # slow CI runners. + with caplog.at_level("WARNING"): + await asyncio.sleep(2.5) + assert task.done(), "pump should have returned after stale heartbeat" + assert received == [], "no messages arrived — only reconnect was triggered" + # Sanity: we exercised the missing-heartbeat path at least once. + assert call_count[0] >= 2, "redis.get should have been polled past the first hit" + assert "heartbeat stale" in caplog.text, ( + "expected the reconnect branch to log a warning, " + "but the pump may have ended for a different reason" + ) From cf64e56510163eb9ac0c14e4187063a6c79272d2 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Fri, 12 Jun 2026 13:48:38 -0400 Subject: [PATCH 12/12] Measure pubsub liveness via a heartbeat channel and self-heal dead subscriptions Publish a heartbeat on the telemetry_heartbeat pubsub channel every second and have each subscriber measure liveness on the pubsub connection itself: if no message arrives for HEARTBEAT_STALE_S, tear down and re-subscribe. An out-of-band key check can't see a half-dead pubsub connection because regular commands use a different pool connection that redis-py silently reconnects. Wrap the uplink relay in a reconnect loop so the pump returning actually re-subscribes instead of exiting, and pass shutdown_event.is_set as should_stop so SIGTERM stops the bridge cleanly. Add the heartbeat tests to CI. --- .github/workflows/telemetry-ci.yml | 1 + universal-telemetry-software/src/config.py | 4 +- universal-telemetry-software/src/data.py | 163 ++++++------------ universal-telemetry-software/src/heartbeat.py | 74 +++++++- .../src/websocket_bridge.py | 13 +- .../tests/test_heartbeat.py | 143 ++++++++------- 6 files changed, 215 insertions(+), 183 deletions(-) diff --git a/.github/workflows/telemetry-ci.yml b/.github/workflows/telemetry-ci.yml index 115e1e2..9d2133a 100644 --- a/.github/workflows/telemetry-ci.yml +++ b/.github/workflows/telemetry-ci.yml @@ -166,6 +166,7 @@ jobs: tests/test_page_lock.py \ tests/test_stats_publisher.py \ tests/test_status_server.py \ + tests/test_heartbeat.py \ -v # ── Integration tests: full docker-compose stack ───────────────────────── diff --git a/universal-telemetry-software/src/config.py b/universal-telemetry-software/src/config.py index 3b03bee..3a803c0 100644 --- a/universal-telemetry-software/src/config.py +++ b/universal-telemetry-software/src/config.py @@ -22,8 +22,8 @@ REDIS_STATS_CHANNEL = "system_stats" REDIS_DIAG_CHANNEL = "link_diagnostics" REDIS_WS_CLIENTS_KEY = "websocket_bridge:clients" -REDIS_HEARTBEAT_KEY = "telemetry:heartbeat" -HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if heartbeat older than this +REDIS_HEARTBEAT_CHANNEL = "telemetry_heartbeat" +HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if no pubsub message arrives for this long # ── Feature flags ───────────────────────────────────────────────────────────── ENABLE_UPLINK = os.getenv("ENABLE_UPLINK", "false").lower() == "true" diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index 2487abd..a358726 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import socket import struct import time @@ -14,10 +15,10 @@ from src.config import ( REMOTE_IP, UDP_PORT, TCP_PORT, REDIS_URL, REDIS_CAN_CHANNEL, REDIS_UPLINK_CHANNEL, ENABLE_UPLINK, - REDIS_WS_CLIENTS_KEY, + REDIS_WS_CLIENTS_KEY, REDIS_HEARTBEAT_CHANNEL, ) from src import redis_utils, utils -from src.heartbeat import run_heartbeat_writer +from src.heartbeat import pump_pubsub_with_heartbeat, run_heartbeat_writer from src.version import get_git_hash BATCH_SIZE = 20 @@ -90,66 +91,6 @@ def publish(self, channel, data): except asyncio.QueueFull: pass # drop under backpressure rather than block the CAN reader - @staticmethod - async def _pump_pubsub_with_heartbeat(pubsub, redis_client, on_message, *, - stale_s: float = 5.0, - log=None): - """Drain a Redis pubsub, restarting it if the heartbeat goes stale. - - Replaces the naive `async for message in pubsub.listen():` pattern that - silently goes dark when the TCP connection or subscription state is lost - after a car power-cycle. Uses non-blocking get_message(timeout=1.0) so we - get a chance to check the heartbeat key on every iteration. - - `on_message` is an `async` callable invoked with the raw pubsub message - dict (whatever `get_message()` returns, typically - `{"type": "message", "channel": ..., "data": ...}`) for each non-None, - non-subscribe-confirmation message. Returns only on cancellation; outer - supervisor is expected to restart the task. - """ - from .config import REDIS_HEARTBEAT_KEY - _log = log or logger - last_hb_mono = 0.0 - while True: - try: - msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True) - except asyncio.CancelledError: - raise - except Exception as e: - _log.warning(f"pubsub.get_message error, reconnecting: {e}") - await asyncio.sleep(0.5) - return # let the outer while-True re-subscribe - - if msg is not None: - try: - await on_message(msg) - except Exception as e: - _log.error(f"subscriber handler error: {e}") - last_hb_mono = time.monotonic() # data flowing — heartbeat check not needed - continue - - # No message this second — check heartbeat - try: - raw = await redis_client.get(REDIS_HEARTBEAT_KEY) if redis_client else None - except Exception as e: - _log.debug(f"heartbeat read failed: {e}") - raw = None - - if raw is None: - # First-time setup or Redis is down — don't reconnect, just wait. - if last_hb_mono == 0.0: - continue - # Heartbeat missing while we've been running: producer is gone or - # the key expired. Tear down so the outer loop re-subscribes. - if time.monotonic() - last_hb_mono > stale_s: - _log.warning("heartbeat stale, forcing pubsub reconnect") - return - else: - last_hb_mono = time.monotonic() - if time.monotonic() - last_hb_mono > stale_s: - _log.warning("heartbeat stale, forcing pubsub reconnect") - return - def _handle_ecu_timestamp(self, data: bytes) -> None: """Extract epoch_ms from ECU VCU_Timestamp message and update clock offset.""" if len(data) < 8: @@ -609,8 +550,6 @@ async def uplink_receiver(): throughput_listener_task(), utils.heartbeat_coro(self.telemetry_event), inject_heartbeat(), - # Car has no Redis — writer is a clean no-op when redis_client is None. - run_heartbeat_writer(None), ] if ENABLE_UPLINK: tasks.append(uplink_receiver()) @@ -926,55 +865,63 @@ async def uplink_relay(): uplink_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) uplink_seq = 0 - try: - r = aioredis.from_url(REDIS_URL) - pubsub = r.pubsub() - await pubsub.subscribe(REDIS_UPLINK_CHANNEL) - logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}") - - async def _relay(msg): - nonlocal uplink_seq - if msg['type'] != 'message': - return - try: - data = redis_utils.decode_message(msg['data']) - uplink_msg = json.loads(data) - - can_id = uplink_msg.get("canId") - can_data = uplink_msg.get("data", []) - ref = uplink_msg.get("ref", "unknown") + async def _relay(msg): + nonlocal uplink_seq + if msg['type'] != 'message': + return + try: + data = redis_utils.decode_message(msg['data']) + uplink_msg = json.loads(data) - if can_id is None or not isinstance(can_id, int) or can_id < 0: - logger.warning(f"Uplink relay: invalid canId in ref={ref}") - return - if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8: - logger.warning(f"Uplink relay: invalid data in ref={ref}") - return + can_id = uplink_msg.get("canId") + can_data = uplink_msg.get("data", []) + ref = uplink_msg.get("ref", "unknown") - # Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message - uplink_seq += 1 - data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data)) - can_msg = CANMessage(time.time(), can_id, data_bytes) + if can_id is None or not isinstance(can_id, int) or can_id < 0: + logger.warning(f"Uplink relay: invalid canId in ref={ref}") + return + if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8: + logger.warning(f"Uplink relay: invalid data in ref={ref}") + return - payload = UPLINK_MAGIC - payload += struct.pack("!QH", uplink_seq, 1) - payload += can_msg.pack() + # Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message + uplink_seq += 1 + data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data)) + can_msg = CANMessage(time.time(), can_id, data_bytes) - try: - uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT)) - logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}") - except (PermissionError, OSError) as e: - logger.error(f"Uplink UDP send failed: {e}") + payload = UPLINK_MAGIC + payload += struct.pack("!QH", uplink_seq, 1) + payload += can_msg.pack() - except Exception as e: - logger.error(f"Uplink relay error: {e}") + try: + uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT)) + logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}") + except (PermissionError, OSError) as e: + logger.error(f"Uplink UDP send failed: {e}") - await TelemetryNode._pump_pubsub_with_heartbeat( - pubsub, r, _relay, log=logger, - ) + except Exception as e: + logger.error(f"Uplink relay error: {e}") - except Exception as e: - logger.error(f"Uplink relay Redis error: {e}") + # Reconnect loop: the pump returns whenever the pubsub connection + # goes silent past HEARTBEAT_STALE_S, and we re-subscribe here. + try: + while True: + r = None + try: + r = aioredis.from_url(REDIS_URL) + pubsub = r.pubsub() + await pubsub.subscribe(REDIS_UPLINK_CHANNEL, REDIS_HEARTBEAT_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_UPLINK_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}") + await pump_pubsub_with_heartbeat(pubsub, _relay, log=logger) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"Uplink relay Redis error: {e}") + await asyncio.sleep(1.0) + finally: + if r is not None: + with contextlib.suppress(Exception): + await r.aclose() finally: uplink_sock.close() @@ -1051,8 +998,8 @@ async def version_checker(): await asyncio.sleep(30.0) # Base mode has a real Redis server; create an async client for the - # heartbeat writer. The writer writes telemetry:heartbeat every 1s so - # pubsub subscribers can detect a half-dead producer and reconnect. + # heartbeat writer. The writer publishes on the heartbeat channel every + # 1s so pubsub subscribers can detect a dead subscription and reconnect. _async_redis = aioredis.from_url(REDIS_URL) tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event), run_heartbeat_writer(_async_redis)] if ENABLE_UPLINK: diff --git a/universal-telemetry-software/src/heartbeat.py b/universal-telemetry-software/src/heartbeat.py index cebe0b4..88d1262 100644 --- a/universal-telemetry-software/src/heartbeat.py +++ b/universal-telemetry-software/src/heartbeat.py @@ -1,15 +1,21 @@ """ -Heartbeat writer — publishes a per-process liveness key to Redis every second -so Redis pub/sub subscribers can detect a half-dead producer (TCP up, subscription -state lost) and reconnect. See docs/superpowers/plans/2026-06-11-stack-resilience.md. +Pubsub liveness probe — the producer publishes a heartbeat message on a Redis +pubsub channel every second, and every subscriber also subscribes to that +channel. Liveness is measured on the pubsub connection itself: if no message +of any kind (heartbeat or data) arrives for HEARTBEAT_STALE_S, the connection +is presumed half-dead and the subscriber tears down and re-subscribes. + +An out-of-band check (e.g. GET on a heartbeat key) cannot detect this state: +regular commands use a different pool connection that redis-py transparently +reconnects, so the key would look fresh while the pubsub connection is dark. +See docs/superpowers/plans/2026-06-11-stack-resilience.md. """ import asyncio import json import logging import time -from .config import REDIS_HEARTBEAT_KEY -from .redis_utils import get_async_client +from .config import HEARTBEAT_STALE_S, REDIS_HEARTBEAT_CHANNEL logger = logging.getLogger(__name__) @@ -17,7 +23,7 @@ async def run_heartbeat_writer(redis_client=None) -> None: - """Write a heartbeat key to Redis every HEARTBEAT_INTERVAL_S. + """Publish a heartbeat message on REDIS_HEARTBEAT_CHANNEL every HEARTBEAT_INTERVAL_S. Stops on cancellation. Logs and continues on any other exception so transient Redis blips don't take down the writer; if Redis is persistently down, the @@ -30,10 +36,60 @@ async def run_heartbeat_writer(redis_client=None) -> None: while True: try: payload = json.dumps({"uptime_s": time.monotonic() - start_mono, - "wall_ts": time.time()}) - await redis_client.set(REDIS_HEARTBEAT_KEY, payload, ex=30) + "wall_ts": time.time()}) + await redis_client.publish(REDIS_HEARTBEAT_CHANNEL, payload) except asyncio.CancelledError: raise except Exception as e: - logger.warning(f"Heartbeat write failed: {e}") + logger.warning(f"Heartbeat publish failed: {e}") await asyncio.sleep(HEARTBEAT_INTERVAL_S) + + +async def pump_pubsub_with_heartbeat(pubsub, on_message, *, + stale_s: float = HEARTBEAT_STALE_S, + should_stop=None, + log=None): + """Drain a Redis pubsub, returning if the connection goes silent. + + Replaces the naive `async for message in pubsub.listen():` pattern that + silently goes dark when the TCP connection or subscription state is lost + after a car power-cycle. The pubsub must be subscribed to + REDIS_HEARTBEAT_CHANNEL in addition to its data channels; with the producer + publishing every second, more than `stale_s` of silence means the + subscription is dead, so the pump returns and the caller's outer loop + re-subscribes. Heartbeat messages only refresh the liveness clock and are + not forwarded to `on_message`. + + `on_message` is an `async` callable invoked with the raw pubsub message + dict for each non-heartbeat message. `should_stop` is an optional callable + checked every iteration (pass `shutdown_event.is_set` for clean SIGTERM). + Returns on staleness, pubsub errors, or should_stop; raises on cancellation. + """ + _log = log or logger + last_msg_mono = time.monotonic() # armed at subscribe time + while True: + if should_stop is not None and should_stop(): + return + try: + msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True) + except asyncio.CancelledError: + raise + except Exception as e: + _log.warning(f"pubsub.get_message error, reconnecting: {e}") + await asyncio.sleep(0.5) + return # let the outer while-True re-subscribe + + if msg is not None: + last_msg_mono = time.monotonic() + channel = msg.get("channel") + if isinstance(channel, bytes): + channel = channel.decode("utf-8", errors="replace") + if channel == REDIS_HEARTBEAT_CHANNEL: + continue # liveness signal only — don't forward + try: + await on_message(msg) + except Exception as e: + _log.error(f"subscriber handler error: {e}") + elif time.monotonic() - last_msg_mono > stale_s: + _log.warning("heartbeat stale, forcing pubsub reconnect") + return diff --git a/universal-telemetry-software/src/websocket_bridge.py b/universal-telemetry-software/src/websocket_bridge.py index 21523ac..a3e0945 100644 --- a/universal-telemetry-software/src/websocket_bridge.py +++ b/universal-telemetry-software/src/websocket_bridge.py @@ -15,10 +15,11 @@ REDIS_UPLINK_CHANNEL, REDIS_DIAG_CHANNEL, REDIS_WS_CLIENTS_KEY, + REDIS_HEARTBEAT_CHANNEL, ENABLE_UPLINK, ) from src import redis_utils, utils -from src.data import TelemetryNode +from src.heartbeat import pump_pubsub_with_heartbeat logger = logging.getLogger("WebSocketBridge") @@ -298,13 +299,12 @@ async def redis_listener(): try: r = redis.from_url(REDIS_URL, health_check_interval=30) pubsub = r.pubsub() - await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) - logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL, + REDIS_HEARTBEAT_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}") delay = backoff_min # reset backoff once a subscribe succeeds async def _handler(msg): - if shutdown_event.is_set(): - return if msg['type'] != 'message': return data = redis_utils.decode_message(msg['data']) @@ -317,7 +317,8 @@ async def _handler(msg): return_exceptions=True ) - await TelemetryNode._pump_pubsub_with_heartbeat(pubsub, r, _handler, log=logger) + await pump_pubsub_with_heartbeat(pubsub, _handler, + should_stop=shutdown_event.is_set, log=logger) except asyncio.CancelledError: raise except Exception as e: diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py index 609f2d5..061bfef 100644 --- a/universal-telemetry-software/tests/test_heartbeat.py +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -1,25 +1,30 @@ """ -Heartbeat writer tests — uses a fake async Redis client to avoid network deps. +Heartbeat tests — uses a fake async Redis client / pubsub to avoid network deps. + +Design under test: the producer PUBLISHES a heartbeat on a pubsub channel that +every subscriber also subscribes to. Liveness is then measured on the pubsub +connection itself (time since *any* message arrived), so a half-dead pubsub +connection is detected even while Redis stays reachable for regular commands. """ import asyncio import json -import time import pytest from unittest.mock import AsyncMock, MagicMock from src import heartbeat -from src.config import REDIS_HEARTBEAT_KEY +from src.config import REDIS_HEARTBEAT_CHANNEL @pytest.fixture def fake_redis(): - """Fake async Redis client whose .set() records calls and timestamps.""" + """Fake async Redis client whose .publish() records calls.""" client = MagicMock() - client.set = AsyncMock() + client.publish = AsyncMock() return client -async def test_writes_heartbeat_with_uptime(fake_redis): +async def test_writer_publishes_heartbeat_on_channel(fake_redis, monkeypatch): + monkeypatch.setattr(heartbeat, "HEARTBEAT_INTERVAL_S", 0.01) task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) await asyncio.sleep(0.05) task.cancel() @@ -27,26 +32,25 @@ async def test_writes_heartbeat_with_uptime(fake_redis): await task except asyncio.CancelledError: pass - assert fake_redis.set.call_count >= 1 - last_call = fake_redis.set.call_args_list[-1] - assert last_call.args[0] == REDIS_HEARTBEAT_KEY + assert fake_redis.publish.call_count >= 1 + last_call = fake_redis.publish.call_args_list[-1] + assert last_call.args[0] == REDIS_HEARTBEAT_CHANNEL payload = json.loads(last_call.args[1]) assert "uptime_s" in payload and "wall_ts" in payload assert payload["uptime_s"] >= 0 - # ex=30 sets the expiry so a dead process's heartbeat is naturally GC'd - assert last_call.kwargs.get("ex") == 30 -async def test_returns_immediately_when_redis_is_none(): +async def test_writer_returns_immediately_when_redis_is_none(): # Should not block or raise; should be a no-op. await asyncio.wait_for(heartbeat.run_heartbeat_writer(None), timeout=0.2) -async def test_continues_after_set_failure(fake_redis): - fake_redis.set = AsyncMock(side_effect=RuntimeError("redis down")) +async def test_writer_continues_after_publish_failure(fake_redis, monkeypatch): + monkeypatch.setattr(heartbeat, "HEARTBEAT_INTERVAL_S", 0.01) + fake_redis.publish = AsyncMock(side_effect=RuntimeError("redis down")) task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) - await asyncio.sleep(1.05) # past the first 1.0s HEARTBEAT_INTERVAL_S so the loop has retried - assert fake_redis.set.call_count >= 2, "writer should retry after failure" + await asyncio.sleep(0.1) + assert fake_redis.publish.call_count >= 2, "writer should retry after failure" assert not task.done(), "writer should recover and keep running" task.cancel() try: @@ -55,60 +59,83 @@ async def test_continues_after_set_failure(fake_redis): pass -async def test_pump_reconnects_when_heartbeat_goes_stale(caplog): - """When the heartbeat key disappears, the pump must return so the outer - while-True loop can re-subscribe. This is the actual failure mode we hit - when the car power-cycles and the base's pubsub sits connected-but-broken. - - The helper's "stale" check is purely time-since-last-seen: it does not - inspect wall_ts / payload age. So the failing scenario is: the heartbeat - key *existed* (last_hb_mono > 0.0), then disappears, and we wait longer - than stale_s. The helper returns and the outer loop re-subscribes. +async def test_pump_reconnects_when_pubsub_goes_silent(caplog): + """The actual power-cycle failure mode: the pubsub connection is dead + (no messages arrive — not even heartbeats) while Redis itself stays + reachable for regular commands. The pump must return within ~stale_s so + the outer while-True loop can re-subscribe. No out-of-band GET is + involved: silence on the subscribed connection IS the signal. """ - from src.data import TelemetryNode - - # Fake pubsub: get_message always returns None (no data flowing). pubsub = MagicMock() pubsub.get_message = AsyncMock(return_value=None) - # Fake redis client: heartbeat exists on the first read (so last_hb_mono - # gets set), then disappears forever. That triggers the reconnect path. - fresh_payload = json.dumps({"uptime_s": 1, "wall_ts": time.time()}) - call_count = [0] + received = [] + + async def handler(msg): + received.append(msg) + + with caplog.at_level("WARNING"): + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=0.3), + timeout=2.0, + ) + assert received == [], "no messages arrived — only reconnect was triggered" + assert "heartbeat stale" in caplog.text + + +async def test_pump_filters_heartbeat_and_forwards_data(): + """Heartbeat messages keep the connection 'fresh' but are not forwarded + to the handler; real data messages are forwarded. + """ + hb_msg = {"type": "message", "channel": REDIS_HEARTBEAT_CHANNEL.encode(), + "data": b'{"uptime_s": 1}'} + data_msg = {"type": "message", "channel": b"can_uplink", "data": b'{"canId": 1}'} + msgs = [hb_msg, data_msg] - async def get_side_effect(*args, **kwargs): - call_count[0] += 1 - return fresh_payload if call_count[0] == 1 else None + async def get_message(**kwargs): + return msgs.pop(0) if msgs else None - redis_client = MagicMock() - redis_client.get = AsyncMock(side_effect=get_side_effect) + pubsub = MagicMock() + pubsub.get_message = AsyncMock(side_effect=get_message) - # Handler should never be called: no messages ever arrived, only the - # heartbeat-driven reconnect should have happened. received = [] async def handler(msg): received.append(msg) - # Tiny stale_s so the test finishes in <2s. - task = asyncio.create_task( - TelemetryNode._pump_pubsub_with_heartbeat( - pubsub, redis_client, handler, stale_s=0.5, - ) + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=0.2), + timeout=2.0, + ) + assert received == [data_msg], "heartbeat filtered, data forwarded" + + +async def test_pump_returns_when_should_stop_set(): + """On shutdown the pump must return promptly instead of draining forever, + so SIGTERM stops the bridge cleanly (no Docker SIGKILL after timeout). + """ + pubsub = MagicMock() + pubsub.get_message = AsyncMock( + return_value={"type": "message", "channel": b"can_uplink", "data": b"x"} + ) + handler = AsyncMock() + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat( + pubsub, handler, stale_s=5.0, should_stop=lambda: True, + ), + timeout=0.5, ) - # First iteration: get_message (~1s timeout) returns None, then redis.get - # returns fresh_payload -> last_hb_mono becomes nonzero. Second iteration: - # get_message returns None, redis.get returns None, last_hb_mono > 0 and - # > 0.5s old -> pump returns. Sleep 2.5s to be safe past the 1.0s get_message - # timeout on iteration 1 plus 0.5s stale on iteration 2, with headroom for - # slow CI runners. + handler.assert_not_called() + + +async def test_pump_returns_on_get_message_error(caplog): + pubsub = MagicMock() + pubsub.get_message = AsyncMock(side_effect=ConnectionError("broken pipe")) + handler = AsyncMock() with caplog.at_level("WARNING"): - await asyncio.sleep(2.5) - assert task.done(), "pump should have returned after stale heartbeat" - assert received == [], "no messages arrived — only reconnect was triggered" - # Sanity: we exercised the missing-heartbeat path at least once. - assert call_count[0] >= 2, "redis.get should have been polled past the first hit" - assert "heartbeat stale" in caplog.text, ( - "expected the reconnect branch to log a warning, " - "but the pump may have ended for a different reason" + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=5.0), + timeout=2.0, ) + handler.assert_not_called() + assert "reconnecting" in caplog.text