diff --git a/.github/workflows/telemetry-ci.yml b/.github/workflows/telemetry-ci.yml index 115e1e2..9d2133a 100644 --- a/.github/workflows/telemetry-ci.yml +++ b/.github/workflows/telemetry-ci.yml @@ -166,6 +166,7 @@ jobs: tests/test_page_lock.py \ tests/test_stats_publisher.py \ tests/test_status_server.py \ + tests/test_heartbeat.py \ -v # ── Integration tests: full docker-compose stack ───────────────────────── diff --git a/universal-telemetry-software/src/config.py b/universal-telemetry-software/src/config.py index 564014f..3a803c0 100644 --- a/universal-telemetry-software/src/config.py +++ b/universal-telemetry-software/src/config.py @@ -22,6 +22,8 @@ REDIS_STATS_CHANNEL = "system_stats" REDIS_DIAG_CHANNEL = "link_diagnostics" REDIS_WS_CLIENTS_KEY = "websocket_bridge:clients" +REDIS_HEARTBEAT_CHANNEL = "telemetry_heartbeat" +HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if no pubsub message arrives for this long # ── Feature flags ───────────────────────────────────────────────────────────── ENABLE_UPLINK = os.getenv("ENABLE_UPLINK", "false").lower() == "true" diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index ba8706a..a358726 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import socket import struct import time @@ -14,9 +15,10 @@ from src.config import ( REMOTE_IP, UDP_PORT, TCP_PORT, REDIS_URL, REDIS_CAN_CHANNEL, REDIS_UPLINK_CHANNEL, ENABLE_UPLINK, - REDIS_WS_CLIENTS_KEY, + REDIS_WS_CLIENTS_KEY, REDIS_HEARTBEAT_CHANNEL, ) from src import redis_utils, utils +from src.heartbeat import pump_pubsub_with_heartbeat, run_heartbeat_writer from src.version import get_git_hash BATCH_SIZE = 20 @@ -863,51 +865,63 @@ async def uplink_relay(): uplink_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) uplink_seq = 0 - try: - r = aioredis.from_url(REDIS_URL) - pubsub = r.pubsub() - await pubsub.subscribe(REDIS_UPLINK_CHANNEL) - logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}") - - async for message in pubsub.listen(): - if message['type'] != 'message': - continue + async def _relay(msg): + nonlocal uplink_seq + if msg['type'] != 'message': + return + try: + data = redis_utils.decode_message(msg['data']) + uplink_msg = json.loads(data) - try: - data = redis_utils.decode_message(message['data']) - uplink_msg = json.loads(data) + can_id = uplink_msg.get("canId") + can_data = uplink_msg.get("data", []) + ref = uplink_msg.get("ref", "unknown") - can_id = uplink_msg.get("canId") - can_data = uplink_msg.get("data", []) - ref = uplink_msg.get("ref", "unknown") + if can_id is None or not isinstance(can_id, int) or can_id < 0: + logger.warning(f"Uplink relay: invalid canId in ref={ref}") + return + if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8: + logger.warning(f"Uplink relay: invalid data in ref={ref}") + return - if can_id is None or not isinstance(can_id, int) or can_id < 0: - logger.warning(f"Uplink relay: invalid canId in ref={ref}") - continue - if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8: - logger.warning(f"Uplink relay: invalid data in ref={ref}") - continue + # Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message + uplink_seq += 1 + data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data)) + can_msg = CANMessage(time.time(), can_id, data_bytes) - # Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message - uplink_seq += 1 - data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data)) - can_msg = CANMessage(time.time(), can_id, data_bytes) + payload = UPLINK_MAGIC + payload += struct.pack("!QH", uplink_seq, 1) + payload += can_msg.pack() - payload = UPLINK_MAGIC - payload += struct.pack("!QH", uplink_seq, 1) - payload += can_msg.pack() + try: + uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT)) + logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}") + except (PermissionError, OSError) as e: + logger.error(f"Uplink UDP send failed: {e}") - try: - uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT)) - logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}") - except (PermissionError, OSError) as e: - logger.error(f"Uplink UDP send failed: {e}") + except Exception as e: + logger.error(f"Uplink relay error: {e}") + # Reconnect loop: the pump returns whenever the pubsub connection + # goes silent past HEARTBEAT_STALE_S, and we re-subscribe here. + try: + while True: + r = None + try: + r = aioredis.from_url(REDIS_URL) + pubsub = r.pubsub() + await pubsub.subscribe(REDIS_UPLINK_CHANNEL, REDIS_HEARTBEAT_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_UPLINK_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}") + await pump_pubsub_with_heartbeat(pubsub, _relay, log=logger) + except asyncio.CancelledError: + raise except Exception as e: - logger.error(f"Uplink relay error: {e}") - - except Exception as e: - logger.error(f"Uplink relay Redis error: {e}") + logger.error(f"Uplink relay Redis error: {e}") + await asyncio.sleep(1.0) + finally: + if r is not None: + with contextlib.suppress(Exception): + await r.aclose() finally: uplink_sock.close() @@ -983,7 +997,11 @@ async def version_checker(): logger.debug(f"Version check error: {e}") await asyncio.sleep(30.0) - tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event)] + # Base mode has a real Redis server; create an async client for the + # heartbeat writer. The writer publishes on the heartbeat channel every + # 1s so pubsub subscribers can detect a dead subscription and reconnect. + _async_redis = aioredis.from_url(REDIS_URL) + tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event), run_heartbeat_writer(_async_redis)] if ENABLE_UPLINK: tasks.append(uplink_relay()) await asyncio.gather(*tasks) diff --git a/universal-telemetry-software/src/heartbeat.py b/universal-telemetry-software/src/heartbeat.py new file mode 100644 index 0000000..88d1262 --- /dev/null +++ b/universal-telemetry-software/src/heartbeat.py @@ -0,0 +1,95 @@ +""" +Pubsub liveness probe — the producer publishes a heartbeat message on a Redis +pubsub channel every second, and every subscriber also subscribes to that +channel. Liveness is measured on the pubsub connection itself: if no message +of any kind (heartbeat or data) arrives for HEARTBEAT_STALE_S, the connection +is presumed half-dead and the subscriber tears down and re-subscribes. + +An out-of-band check (e.g. GET on a heartbeat key) cannot detect this state: +regular commands use a different pool connection that redis-py transparently +reconnects, so the key would look fresh while the pubsub connection is dark. +See docs/superpowers/plans/2026-06-11-stack-resilience.md. +""" +import asyncio +import json +import logging +import time + +from .config import HEARTBEAT_STALE_S, REDIS_HEARTBEAT_CHANNEL + +logger = logging.getLogger(__name__) + +HEARTBEAT_INTERVAL_S = 1.0 + + +async def run_heartbeat_writer(redis_client=None) -> None: + """Publish a heartbeat message on REDIS_HEARTBEAT_CHANNEL every HEARTBEAT_INTERVAL_S. + + Stops on cancellation. Logs and continues on any other exception so transient + Redis blips don't take down the writer; if Redis is persistently down, the + surrounding supervisor (systemd / Docker) is expected to restart the process. + Skips silently if no Redis client is available (car mode without Redis). + """ + if redis_client is None: + return + start_mono = time.monotonic() + while True: + try: + payload = json.dumps({"uptime_s": time.monotonic() - start_mono, + "wall_ts": time.time()}) + await redis_client.publish(REDIS_HEARTBEAT_CHANNEL, payload) + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"Heartbeat publish failed: {e}") + await asyncio.sleep(HEARTBEAT_INTERVAL_S) + + +async def pump_pubsub_with_heartbeat(pubsub, on_message, *, + stale_s: float = HEARTBEAT_STALE_S, + should_stop=None, + log=None): + """Drain a Redis pubsub, returning if the connection goes silent. + + Replaces the naive `async for message in pubsub.listen():` pattern that + silently goes dark when the TCP connection or subscription state is lost + after a car power-cycle. The pubsub must be subscribed to + REDIS_HEARTBEAT_CHANNEL in addition to its data channels; with the producer + publishing every second, more than `stale_s` of silence means the + subscription is dead, so the pump returns and the caller's outer loop + re-subscribes. Heartbeat messages only refresh the liveness clock and are + not forwarded to `on_message`. + + `on_message` is an `async` callable invoked with the raw pubsub message + dict for each non-heartbeat message. `should_stop` is an optional callable + checked every iteration (pass `shutdown_event.is_set` for clean SIGTERM). + Returns on staleness, pubsub errors, or should_stop; raises on cancellation. + """ + _log = log or logger + last_msg_mono = time.monotonic() # armed at subscribe time + while True: + if should_stop is not None and should_stop(): + return + try: + msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True) + except asyncio.CancelledError: + raise + except Exception as e: + _log.warning(f"pubsub.get_message error, reconnecting: {e}") + await asyncio.sleep(0.5) + return # let the outer while-True re-subscribe + + if msg is not None: + last_msg_mono = time.monotonic() + channel = msg.get("channel") + if isinstance(channel, bytes): + channel = channel.decode("utf-8", errors="replace") + if channel == REDIS_HEARTBEAT_CHANNEL: + continue # liveness signal only — don't forward + try: + await on_message(msg) + except Exception as e: + _log.error(f"subscriber handler error: {e}") + elif time.monotonic() - last_msg_mono > stale_s: + _log.warning("heartbeat stale, forcing pubsub reconnect") + return diff --git a/universal-telemetry-software/src/websocket_bridge.py b/universal-telemetry-software/src/websocket_bridge.py index 4d3357a..a3e0945 100644 --- a/universal-telemetry-software/src/websocket_bridge.py +++ b/universal-telemetry-software/src/websocket_bridge.py @@ -15,9 +15,11 @@ REDIS_UPLINK_CHANNEL, REDIS_DIAG_CHANNEL, REDIS_WS_CLIENTS_KEY, + REDIS_HEARTBEAT_CHANNEL, ENABLE_UPLINK, ) from src import redis_utils, utils +from src.heartbeat import pump_pubsub_with_heartbeat logger = logging.getLogger("WebSocketBridge") @@ -297,24 +299,26 @@ async def redis_listener(): try: r = redis.from_url(REDIS_URL, health_check_interval=30) pubsub = r.pubsub() - await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) - logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL, + REDIS_HEARTBEAT_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}") delay = backoff_min # reset backoff once a subscribe succeeds - async for message in pubsub.listen(): - if shutdown_event.is_set(): - break - - if message['type'] == 'message': - data = redis_utils.decode_message(message['data']) - - # Broadcast to all connected clients - if connected_clients: - # Create tasks for sending to each client to avoid blocking - await asyncio.gather( - *[client.send(data) for client in connected_clients], - return_exceptions=True - ) + async def _handler(msg): + if msg['type'] != 'message': + return + data = redis_utils.decode_message(msg['data']) + + # Broadcast to all connected clients + if connected_clients: + # Create tasks for sending to each client to avoid blocking + await asyncio.gather( + *[client.send(data) for client in connected_clients], + return_exceptions=True + ) + + await pump_pubsub_with_heartbeat(pubsub, _handler, + should_stop=shutdown_event.is_set, log=logger) except asyncio.CancelledError: raise except Exception as e: diff --git a/universal-telemetry-software/tests/test_heartbeat.py b/universal-telemetry-software/tests/test_heartbeat.py new file mode 100644 index 0000000..061bfef --- /dev/null +++ b/universal-telemetry-software/tests/test_heartbeat.py @@ -0,0 +1,141 @@ +""" +Heartbeat tests — uses a fake async Redis client / pubsub to avoid network deps. + +Design under test: the producer PUBLISHES a heartbeat on a pubsub channel that +every subscriber also subscribes to. Liveness is then measured on the pubsub +connection itself (time since *any* message arrived), so a half-dead pubsub +connection is detected even while Redis stays reachable for regular commands. +""" +import asyncio +import json +import pytest +from unittest.mock import AsyncMock, MagicMock + +from src import heartbeat +from src.config import REDIS_HEARTBEAT_CHANNEL + + +@pytest.fixture +def fake_redis(): + """Fake async Redis client whose .publish() records calls.""" + client = MagicMock() + client.publish = AsyncMock() + return client + + +async def test_writer_publishes_heartbeat_on_channel(fake_redis, monkeypatch): + monkeypatch.setattr(heartbeat, "HEARTBEAT_INTERVAL_S", 0.01) + task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) + await asyncio.sleep(0.05) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + assert fake_redis.publish.call_count >= 1 + last_call = fake_redis.publish.call_args_list[-1] + assert last_call.args[0] == REDIS_HEARTBEAT_CHANNEL + payload = json.loads(last_call.args[1]) + assert "uptime_s" in payload and "wall_ts" in payload + assert payload["uptime_s"] >= 0 + + +async def test_writer_returns_immediately_when_redis_is_none(): + # Should not block or raise; should be a no-op. + await asyncio.wait_for(heartbeat.run_heartbeat_writer(None), timeout=0.2) + + +async def test_writer_continues_after_publish_failure(fake_redis, monkeypatch): + monkeypatch.setattr(heartbeat, "HEARTBEAT_INTERVAL_S", 0.01) + fake_redis.publish = AsyncMock(side_effect=RuntimeError("redis down")) + task = asyncio.create_task(heartbeat.run_heartbeat_writer(fake_redis)) + await asyncio.sleep(0.1) + assert fake_redis.publish.call_count >= 2, "writer should retry after failure" + assert not task.done(), "writer should recover and keep running" + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +async def test_pump_reconnects_when_pubsub_goes_silent(caplog): + """The actual power-cycle failure mode: the pubsub connection is dead + (no messages arrive — not even heartbeats) while Redis itself stays + reachable for regular commands. The pump must return within ~stale_s so + the outer while-True loop can re-subscribe. No out-of-band GET is + involved: silence on the subscribed connection IS the signal. + """ + pubsub = MagicMock() + pubsub.get_message = AsyncMock(return_value=None) + + received = [] + + async def handler(msg): + received.append(msg) + + with caplog.at_level("WARNING"): + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=0.3), + timeout=2.0, + ) + assert received == [], "no messages arrived — only reconnect was triggered" + assert "heartbeat stale" in caplog.text + + +async def test_pump_filters_heartbeat_and_forwards_data(): + """Heartbeat messages keep the connection 'fresh' but are not forwarded + to the handler; real data messages are forwarded. + """ + hb_msg = {"type": "message", "channel": REDIS_HEARTBEAT_CHANNEL.encode(), + "data": b'{"uptime_s": 1}'} + data_msg = {"type": "message", "channel": b"can_uplink", "data": b'{"canId": 1}'} + msgs = [hb_msg, data_msg] + + async def get_message(**kwargs): + return msgs.pop(0) if msgs else None + + pubsub = MagicMock() + pubsub.get_message = AsyncMock(side_effect=get_message) + + received = [] + + async def handler(msg): + received.append(msg) + + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=0.2), + timeout=2.0, + ) + assert received == [data_msg], "heartbeat filtered, data forwarded" + + +async def test_pump_returns_when_should_stop_set(): + """On shutdown the pump must return promptly instead of draining forever, + so SIGTERM stops the bridge cleanly (no Docker SIGKILL after timeout). + """ + pubsub = MagicMock() + pubsub.get_message = AsyncMock( + return_value={"type": "message", "channel": b"can_uplink", "data": b"x"} + ) + handler = AsyncMock() + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat( + pubsub, handler, stale_s=5.0, should_stop=lambda: True, + ), + timeout=0.5, + ) + handler.assert_not_called() + + +async def test_pump_returns_on_get_message_error(caplog): + pubsub = MagicMock() + pubsub.get_message = AsyncMock(side_effect=ConnectionError("broken pipe")) + handler = AsyncMock() + with caplog.at_level("WARNING"): + await asyncio.wait_for( + heartbeat.pump_pubsub_with_heartbeat(pubsub, handler, stale_s=5.0), + timeout=2.0, + ) + handler.assert_not_called() + assert "reconnecting" in caplog.text