From 60fd029649bc8255245c8294fbbe75dbfb41309f Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Sat, 6 Jun 2026 16:56:25 +0000 Subject: [PATCH] uts: stop the base-station feed from silently dying MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes for the "telemetry stops after a few random minutes" failure on the MacBook base station. websocket_bridge.redis_listener: wrap the pub/sub loop in a reconnect loop with health_check_interval. Previously a single Redis connection blip (idle timeout, transient Docker-bridge hiccup, Redis restart) made the listener coroutine return for good while the WebSocket server kept running — PECAN stayed connected but never received another frame, with no error surfaced. ws_relay already reconnects this way; redis_listener now matches. main.py: the child-process monitor only logged "Process X died!" once per second forever and never recovered. Because the parent stayed alive, neither Docker's `restart: unless-stopped` nor systemd's `Restart=always` ever saw the failure. Now a dead child tears down the surviving children and exits non-zero so the supervisor restarts the whole stack cleanly. --- universal-telemetry-software/main.py | 26 +++++-- .../src/websocket_bridge.py | 71 +++++++++++++------ 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/universal-telemetry-software/main.py b/universal-telemetry-software/main.py index 5dc16b7..fe1b789 100644 --- a/universal-telemetry-software/main.py +++ b/universal-telemetry-software/main.py @@ -1,4 +1,5 @@ import os +import sys import time import uuid import multiprocessing @@ -284,11 +285,26 @@ def start_timescale_bridge(): try: while True: time.sleep(1) - # Monitor children - for p in processes: - if not p.is_alive(): - logger.error(f"Process {p.name} died!") - # Optional: Restart logic + # Monitor children. A dead child means the pipeline is degraded — + # e.g. the telemetry or WebSocket process crashed. The parent stays + # alive in this loop, so neither Docker's `restart: unless-stopped` + # nor systemd's `Restart=always` ever sees the failure and the stack + # silently keeps running half-dead. Fail fast instead: tear down the + # surviving children and exit non-zero so the supervisor restarts the + # whole stack cleanly. + dead = [p for p in processes if not p.is_alive()] + if dead: + for p in dead: + logger.error( + f"Process {p.name} died (exitcode={p.exitcode}). " + "Shutting down for supervisor restart." + ) + for p in processes: + if p.is_alive(): + p.terminate() + for p in processes: + p.join(timeout=5) + sys.exit(1) except KeyboardInterrupt: logger.info("Shutting down...") for p in processes: diff --git a/universal-telemetry-software/src/websocket_bridge.py b/universal-telemetry-software/src/websocket_bridge.py index e0e3004..4d3357a 100644 --- a/universal-telemetry-software/src/websocket_bridge.py +++ b/universal-telemetry-software/src/websocket_bridge.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import redis.asyncio as redis import websockets import os @@ -277,31 +278,57 @@ async def direct_queue_listener(queue: asyncio.Queue): async def redis_listener(): - """Listens to Redis and broadcasts to all WS clients.""" - try: - r = redis.from_url(REDIS_URL) - pubsub = r.pubsub() - await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) - logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + """Listens to Redis and broadcasts to all WS clients. + + Wrapped in a reconnect loop. A dropped Redis pub/sub connection — an idle + timeout, a transient blip on the Docker bridge network, or a Redis restart — + must not silently kill the data feed. Without this loop the coroutine would + exit on the first ConnectionError while the WebSocket server kept running: + PECAN stays connected but never receives another frame, so the dashboard + goes dead with no error visible anywhere. `health_check_interval` lets + redis-py detect a half-open connection instead of blocking forever in + listen(). + """ + backoff_min, backoff_max = 0.5, 10.0 + delay = backoff_min - async for message in pubsub.listen(): + while not shutdown_event.is_set(): + r = None + try: + r = redis.from_url(REDIS_URL, health_check_interval=30) + pubsub = r.pubsub() + await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + delay = backoff_min # reset backoff once a subscribe succeeds + + async for message in pubsub.listen(): + if shutdown_event.is_set(): + break + + if message['type'] == 'message': + data = redis_utils.decode_message(message['data']) + + # Broadcast to all connected clients + if connected_clients: + # Create tasks for sending to each client to avoid blocking + await asyncio.gather( + *[client.send(data) for client in connected_clients], + return_exceptions=True + ) + except asyncio.CancelledError: + raise + except Exception as e: if shutdown_event.is_set(): break - - if message['type'] == 'message': - data = redis_utils.decode_message(message['data']) - - # Broadcast to all connected clients - if connected_clients: - # Create tasks for sending to each client to avoid blocking - await asyncio.gather( - *[client.send(data) for client in connected_clients], - return_exceptions=True - ) - except Exception as e: - logger.error(f"Redis error: {e}") - finally: - logger.info("Redis listener stopping...") + logger.error(f"Redis listener error: {e} — reconnecting in {delay:.1f}s") + await asyncio.sleep(delay) + delay = min(delay * 2, backoff_max) + finally: + if r is not None: + with contextlib.suppress(Exception): + await r.aclose() + + logger.info("Redis listener stopping...") async def _handle_client_message(websocket, raw: str, redis_client):