From 60fd029649bc8255245c8294fbbe75dbfb41309f Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Sat, 6 Jun 2026 16:56:25 +0000 Subject: [PATCH 1/2] uts: stop the base-station feed from silently dying MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes for the "telemetry stops after a few random minutes" failure on the MacBook base station. websocket_bridge.redis_listener: wrap the pub/sub loop in a reconnect loop with health_check_interval. Previously a single Redis connection blip (idle timeout, transient Docker-bridge hiccup, Redis restart) made the listener coroutine return for good while the WebSocket server kept running — PECAN stayed connected but never received another frame, with no error surfaced. ws_relay already reconnects this way; redis_listener now matches. main.py: the child-process monitor only logged "Process X died!" once per second forever and never recovered. Because the parent stayed alive, neither Docker's `restart: unless-stopped` nor systemd's `Restart=always` ever saw the failure. Now a dead child tears down the surviving children and exits non-zero so the supervisor restarts the whole stack cleanly. --- universal-telemetry-software/main.py | 26 +++++-- .../src/websocket_bridge.py | 71 +++++++++++++------ 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/universal-telemetry-software/main.py b/universal-telemetry-software/main.py index 5dc16b7..fe1b789 100644 --- a/universal-telemetry-software/main.py +++ b/universal-telemetry-software/main.py @@ -1,4 +1,5 @@ import os +import sys import time import uuid import multiprocessing @@ -284,11 +285,26 @@ def start_timescale_bridge(): try: while True: time.sleep(1) - # Monitor children - for p in processes: - if not p.is_alive(): - logger.error(f"Process {p.name} died!") - # Optional: Restart logic + # Monitor children. A dead child means the pipeline is degraded — + # e.g. the telemetry or WebSocket process crashed. The parent stays + # alive in this loop, so neither Docker's `restart: unless-stopped` + # nor systemd's `Restart=always` ever sees the failure and the stack + # silently keeps running half-dead. Fail fast instead: tear down the + # surviving children and exit non-zero so the supervisor restarts the + # whole stack cleanly. + dead = [p for p in processes if not p.is_alive()] + if dead: + for p in dead: + logger.error( + f"Process {p.name} died (exitcode={p.exitcode}). " + "Shutting down for supervisor restart." + ) + for p in processes: + if p.is_alive(): + p.terminate() + for p in processes: + p.join(timeout=5) + sys.exit(1) except KeyboardInterrupt: logger.info("Shutting down...") for p in processes: diff --git a/universal-telemetry-software/src/websocket_bridge.py b/universal-telemetry-software/src/websocket_bridge.py index e0e3004..4d3357a 100644 --- a/universal-telemetry-software/src/websocket_bridge.py +++ b/universal-telemetry-software/src/websocket_bridge.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import redis.asyncio as redis import websockets import os @@ -277,31 +278,57 @@ async def direct_queue_listener(queue: asyncio.Queue): async def redis_listener(): - """Listens to Redis and broadcasts to all WS clients.""" - try: - r = redis.from_url(REDIS_URL) - pubsub = r.pubsub() - await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) - logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + """Listens to Redis and broadcasts to all WS clients. + + Wrapped in a reconnect loop. A dropped Redis pub/sub connection — an idle + timeout, a transient blip on the Docker bridge network, or a Redis restart — + must not silently kill the data feed. Without this loop the coroutine would + exit on the first ConnectionError while the WebSocket server kept running: + PECAN stays connected but never receives another frame, so the dashboard + goes dead with no error visible anywhere. `health_check_interval` lets + redis-py detect a half-open connection instead of blocking forever in + listen(). + """ + backoff_min, backoff_max = 0.5, 10.0 + delay = backoff_min - async for message in pubsub.listen(): + while not shutdown_event.is_set(): + r = None + try: + r = redis.from_url(REDIS_URL, health_check_interval=30) + pubsub = r.pubsub() + await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) + logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") + delay = backoff_min # reset backoff once a subscribe succeeds + + async for message in pubsub.listen(): + if shutdown_event.is_set(): + break + + if message['type'] == 'message': + data = redis_utils.decode_message(message['data']) + + # Broadcast to all connected clients + if connected_clients: + # Create tasks for sending to each client to avoid blocking + await asyncio.gather( + *[client.send(data) for client in connected_clients], + return_exceptions=True + ) + except asyncio.CancelledError: + raise + except Exception as e: if shutdown_event.is_set(): break - - if message['type'] == 'message': - data = redis_utils.decode_message(message['data']) - - # Broadcast to all connected clients - if connected_clients: - # Create tasks for sending to each client to avoid blocking - await asyncio.gather( - *[client.send(data) for client in connected_clients], - return_exceptions=True - ) - except Exception as e: - logger.error(f"Redis error: {e}") - finally: - logger.info("Redis listener stopping...") + logger.error(f"Redis listener error: {e} — reconnecting in {delay:.1f}s") + await asyncio.sleep(delay) + delay = min(delay * 2, backoff_max) + finally: + if r is not None: + with contextlib.suppress(Exception): + await r.aclose() + + logger.info("Redis listener stopping...") async def _handle_client_message(websocket, raw: str, redis_client): From 62b9a60d8c4a270b718e3382b54b410d420b5adc Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Sun, 7 Jun 2026 13:39:42 -0400 Subject: [PATCH 2/2] Install slicks from PyPI; treat critical processes --- server/installer/docker-compose.yml | 13 +---- server/installer/sandbox/Dockerfile.sandbox | 14 +---- .../installer/sandbox/requirements-docker.txt | 10 ++-- universal-telemetry-software/main.py | 55 ++++++++++++++----- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/server/installer/docker-compose.yml b/server/installer/docker-compose.yml index 635442a..38b9d36 100644 --- a/server/installer/docker-compose.yml +++ b/server/installer/docker-compose.yml @@ -261,12 +261,8 @@ services: build: context: ./sandbox dockerfile: Dockerfile.sandbox - # slicks lives outside the build context (sibling repo at - # /home/ubuntu/projects/slicks), so we pass it in as an additional - # build context. The Dockerfile uses `COPY --from=slicks ...` to - # pull files from it. Override SLICKS_HOST_PATH to relocate. - additional_contexts: - - slicks=${SLICKS_HOST_PATH:-/home/ubuntu/projects/slicks} + # slicks is installed from PyPI (pinned in sandbox/requirements-docker.txt), + # so no external build context is needed. container_name: sandbox restart: unless-stopped environment: @@ -281,11 +277,6 @@ services: TIMESCALE_TABLE: "${TIMESCALE_TABLE:-${POSTGRES_TABLE:-wfr26}}" TIMESCALE_SEASON: "${TIMESCALE_SEASON:-${POSTGRES_TABLE:-wfr26}}" POSTGRES_TABLE: "${POSTGRES_TABLE:-wfr26}" - volumes: - # slicks source (TimescaleDB-migration branch). Editable-installed at - # image build time; the bind mount below lets live source edits show - # up on the next container recreate without an image rebuild. - - ${SLICKS_HOST_PATH:-/home/ubuntu/projects/slicks}:/slicks_src:rw depends_on: timescaledb: condition: service_healthy diff --git a/server/installer/sandbox/Dockerfile.sandbox b/server/installer/sandbox/Dockerfile.sandbox index 2a4223b..29852d1 100644 --- a/server/installer/sandbox/Dockerfile.sandbox +++ b/server/installer/sandbox/Dockerfile.sandbox @@ -33,22 +33,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Install uv (replaces pip for dependency management) RUN pip install --no-cache-dir uv -# Install runtime deps via uv. slicks itself is installed editable below. +# Install runtime deps (including slicks, pinned in requirements-docker.txt) +# from PyPI via uv. COPY requirements-docker.txt /tmp/requirements-docker.txt RUN uv pip install --system --no-cache -r /tmp/requirements-docker.txt \ && rm -rf /root/.cache/uv -# Install slicks editable. The `slicks` build context is passed in by -# docker-compose.yml's `additional_contexts:` (lives outside this repo's -# build context). At runtime, the docker-compose `volumes:` entry -# bind-mounts the same host path over /slicks_src so live source edits -# show up on the next container recreate (no image rebuild needed for -# code-only changes; pyproject changes still need a rebuild). -COPY --from=slicks pyproject.toml /slicks_src/pyproject.toml -COPY --from=slicks README.md /slicks_src/README.md -COPY --from=slicks src /slicks_src/src -RUN uv pip install --system --no-cache -e /slicks_src - # Tell Kaleido where Chromium lives ENV CHROME_PATH=/usr/bin/chromium diff --git a/server/installer/sandbox/requirements-docker.txt b/server/installer/sandbox/requirements-docker.txt index 5047f65..0192bbb 100644 --- a/server/installer/sandbox/requirements-docker.txt +++ b/server/installer/sandbox/requirements-docker.txt @@ -1,10 +1,10 @@ # Docker container requirements for sandbox execution environment # These are the dependencies needed inside the Docker container -# -# NOTE: `slicks` is NOT pinned here — it is installed editable from -# /slicks (a host bind mount) in Dockerfile.sandbox so source edits -# land in the running container without a pip rebuild. The published -# 0.2.x line on PyPI is the InfluxDB backend, which is not what we want. + +# WFR data pipeline, installed straight from PyPI. Unpinned so the sandbox +# always picks up the newest release (the TimescaleDB backend; the retired +# 0.2.x InfluxDB line will never be "newest" again). +slicks # SQL access for TimescaleDB (slicks depends on these; listing explicitly # in case slicks changes its extras and to keep the layer order stable) diff --git a/universal-telemetry-software/main.py b/universal-telemetry-software/main.py index fe1b789..d60a654 100644 --- a/universal-telemetry-software/main.py +++ b/universal-telemetry-software/main.py @@ -19,6 +19,16 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("Main") +# Processes that carry the live telemetry feed. If one of these dies the stack +# is genuinely broken, so we tear everything down and exit non-zero to let the +# supervisor (Docker `restart:`/systemd `Restart=always`) restart cleanly. +# Auxiliary processes (TimescaleBridge, TX bridge, status server, link +# diagnostics, video, audio, LEDs, PoE) are best-effort: if e.g. the optional +# Timescale logging DB is unreachable, that must NOT take the live feed down — +# restarting the whole stack on its death would only crash-loop the feed we are +# trying to keep alive. Those are logged loudly but tolerated. +CRITICAL_PROCESSES = {"Telemetry", "CarServices", "WebSocket"} + def _timescale_dsn_reachable() -> bool: """Return True when the configured Timescale/Postgres DSN accepts a connection.""" @@ -285,26 +295,41 @@ def start_timescale_bridge(): try: while True: time.sleep(1) - # Monitor children. A dead child means the pipeline is degraded — - # e.g. the telemetry or WebSocket process crashed. The parent stays - # alive in this loop, so neither Docker's `restart: unless-stopped` - # nor systemd's `Restart=always` ever sees the failure and the stack - # silently keeps running half-dead. Fail fast instead: tear down the - # surviving children and exit non-zero so the supervisor restarts the - # whole stack cleanly. + # Monitor children. A dead child means the pipeline is degraded. The + # parent stays alive in this loop, so neither Docker's + # `restart: unless-stopped` nor systemd's `Restart=always` ever sees + # the failure and the stack silently keeps running half-dead. + # + # For a critical process (the live-feed path — telemetry / WS bridge) + # fail fast: tear down the surviving children and exit non-zero so the + # supervisor restarts the whole stack cleanly. For an auxiliary + # process, log loudly and stop tracking it, but keep the live feed + # running — nuking the stack because, say, the optional Timescale DB + # is down would only crash-loop the very feed we are protecting. dead = [p for p in processes if not p.is_alive()] if dead: + dead_critical = [p for p in dead if p.name in CRITICAL_PROCESSES] for p in dead: - logger.error( - f"Process {p.name} died (exitcode={p.exitcode}). " + level = logger.error if p in dead_critical else logger.warning + fate = ( "Shutting down for supervisor restart." + if p in dead_critical + else "Auxiliary process — live feed kept running." ) - for p in processes: - if p.is_alive(): - p.terminate() - for p in processes: - p.join(timeout=5) - sys.exit(1) + level(f"Process {p.name} died (exitcode={p.exitcode}). {fate}") + + if dead_critical: + for p in processes: + if p.is_alive(): + p.terminate() + for p in processes: + p.join(timeout=5) + sys.exit(1) + + # Stop tracking dead auxiliary processes so we don't re-log them + # every second. + for p in dead: + processes.remove(p) except KeyboardInterrupt: logger.info("Shutting down...") for p in processes: