Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions server/installer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,8 @@ services:
build:
context: ./sandbox
dockerfile: Dockerfile.sandbox
# slicks lives outside the build context (sibling repo at
# /home/ubuntu/projects/slicks), so we pass it in as an additional
# build context. The Dockerfile uses `COPY --from=slicks ...` to
# pull files from it. Override SLICKS_HOST_PATH to relocate.
additional_contexts:
- slicks=${SLICKS_HOST_PATH:-/home/ubuntu/projects/slicks}
# slicks is installed from PyPI (pinned in sandbox/requirements-docker.txt),
# so no external build context is needed.
container_name: sandbox
restart: unless-stopped
environment:
Expand All @@ -281,11 +277,6 @@ services:
TIMESCALE_TABLE: "${TIMESCALE_TABLE:-${POSTGRES_TABLE:-wfr26}}"
TIMESCALE_SEASON: "${TIMESCALE_SEASON:-${POSTGRES_TABLE:-wfr26}}"
POSTGRES_TABLE: "${POSTGRES_TABLE:-wfr26}"
volumes:
# slicks source (TimescaleDB-migration branch). Editable-installed at
# image build time; the bind mount below lets live source edits show
# up on the next container recreate without an image rebuild.
- ${SLICKS_HOST_PATH:-/home/ubuntu/projects/slicks}:/slicks_src:rw
depends_on:
timescaledb:
condition: service_healthy
Expand Down
14 changes: 2 additions & 12 deletions server/installer/sandbox/Dockerfile.sandbox
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# Install uv (replaces pip for dependency management)
RUN pip install --no-cache-dir uv

# Install runtime deps via uv. slicks itself is installed editable below.
# Install runtime deps (including slicks, pinned in requirements-docker.txt)
# from PyPI via uv.
COPY requirements-docker.txt /tmp/requirements-docker.txt
RUN uv pip install --system --no-cache -r /tmp/requirements-docker.txt \
&& rm -rf /root/.cache/uv

# Install slicks editable. The `slicks` build context is passed in by
# docker-compose.yml's `additional_contexts:` (lives outside this repo's
# build context). At runtime, the docker-compose `volumes:` entry
# bind-mounts the same host path over /slicks_src so live source edits
# show up on the next container recreate (no image rebuild needed for
# code-only changes; pyproject changes still need a rebuild).
COPY --from=slicks pyproject.toml /slicks_src/pyproject.toml
COPY --from=slicks README.md /slicks_src/README.md
COPY --from=slicks src /slicks_src/src
RUN uv pip install --system --no-cache -e /slicks_src

# Tell Kaleido where Chromium lives
ENV CHROME_PATH=/usr/bin/chromium

Expand Down
10 changes: 5 additions & 5 deletions server/installer/sandbox/requirements-docker.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Docker container requirements for sandbox execution environment
# These are the dependencies needed inside the Docker container
#
# NOTE: `slicks` is NOT pinned here — it is installed editable from
# /slicks (a host bind mount) in Dockerfile.sandbox so source edits
# land in the running container without a pip rebuild. The published
# 0.2.x line on PyPI is the InfluxDB backend, which is not what we want.

# WFR data pipeline, installed straight from PyPI. Unpinned so the sandbox
# always picks up the newest release (the TimescaleDB backend; the retired
# 0.2.x InfluxDB line will never be "newest" again).
slicks

# SQL access for TimescaleDB (slicks depends on these; listing explicitly
# in case slicks changes its extras and to keep the layer order stable)
Expand Down
51 changes: 46 additions & 5 deletions universal-telemetry-software/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import time
import uuid
import multiprocessing
Expand All @@ -18,6 +19,16 @@
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Main")

# Processes that carry the live telemetry feed. If one of these dies the stack
# is genuinely broken, so we tear everything down and exit non-zero to let the
# supervisor (Docker `restart:`/systemd `Restart=always`) restart cleanly.
# Auxiliary processes (TimescaleBridge, TX bridge, status server, link
# diagnostics, video, audio, LEDs, PoE) are best-effort: if e.g. the optional
# Timescale logging DB is unreachable, that must NOT take the live feed down —
# restarting the whole stack on its death would only crash-loop the feed we are
# trying to keep alive. Those are logged loudly but tolerated.
CRITICAL_PROCESSES = {"Telemetry", "CarServices", "WebSocket"}


def _timescale_dsn_reachable() -> bool:
"""Return True when the configured Timescale/Postgres DSN accepts a connection."""
Expand Down Expand Up @@ -284,11 +295,41 @@ def start_timescale_bridge():
try:
while True:
time.sleep(1)
# Monitor children
for p in processes:
if not p.is_alive():
logger.error(f"Process {p.name} died!")
# Optional: Restart logic
# Monitor children. A dead child means the pipeline is degraded. The
# parent stays alive in this loop, so neither Docker's
# `restart: unless-stopped` nor systemd's `Restart=always` ever sees
# the failure and the stack silently keeps running half-dead.
#
# For a critical process (the live-feed path — telemetry / WS bridge)
# fail fast: tear down the surviving children and exit non-zero so the
# supervisor restarts the whole stack cleanly. For an auxiliary
# process, log loudly and stop tracking it, but keep the live feed
# running — nuking the stack because, say, the optional Timescale DB
# is down would only crash-loop the very feed we are protecting.
dead = [p for p in processes if not p.is_alive()]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid restarting telemetry for optional video failures

With this check treating any child death as fatal, an optional media process can now take down the core telemetry stack: main.py starts Video whenever ENABLE_VIDEO is true, and the car systemd unit sets ENABLE_VIDEO=true, while run_video() returns when GStreamer reports an error. In that camera/misconfigured-GStreamer scenario the parent exits non-zero and systemd restarts the whole service repeatedly, so CAN telemetry never stays up even though only the optional video feed failed; consider limiting fail-fast to critical children or making optional child failures non-fatal.

Useful? React with 👍 / 👎.

if dead:
dead_critical = [p for p in dead if p.name in CRITICAL_PROCESSES]
for p in dead:
level = logger.error if p in dead_critical else logger.warning
fate = (
"Shutting down for supervisor restart."
if p in dead_critical
else "Auxiliary process — live feed kept running."
)
level(f"Process {p.name} died (exitcode={p.exitcode}). {fate}")

if dead_critical:
for p in processes:
if p.is_alive():
p.terminate()
for p in processes:
p.join(timeout=5)
sys.exit(1)

# Stop tracking dead auxiliary processes so we don't re-log them
# every second.
for p in dead:
processes.remove(p)
except KeyboardInterrupt:
logger.info("Shutting down...")
for p in processes:
Expand Down
71 changes: 49 additions & 22 deletions universal-telemetry-software/src/websocket_bridge.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import redis.asyncio as redis
import websockets
import os
Expand Down Expand Up @@ -277,31 +278,57 @@ async def direct_queue_listener(queue: asyncio.Queue):


async def redis_listener():
"""Listens to Redis and broadcasts to all WS clients."""
try:
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
"""Listens to Redis and broadcasts to all WS clients.

Wrapped in a reconnect loop. A dropped Redis pub/sub connection — an idle
timeout, a transient blip on the Docker bridge network, or a Redis restart —
must not silently kill the data feed. Without this loop the coroutine would
exit on the first ConnectionError while the WebSocket server kept running:
PECAN stays connected but never receives another frame, so the dashboard
goes dead with no error visible anywhere. `health_check_interval` lets
redis-py detect a half-open connection instead of blocking forever in
listen().
"""
backoff_min, backoff_max = 0.5, 10.0
delay = backoff_min

async for message in pubsub.listen():
while not shutdown_event.is_set():
r = None
try:
r = redis.from_url(REDIS_URL, health_check_interval=30)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
delay = backoff_min # reset backoff once a subscribe succeeds

async for message in pubsub.listen():
if shutdown_event.is_set():
break

if message['type'] == 'message':
data = redis_utils.decode_message(message['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)
except asyncio.CancelledError:
raise
except Exception as e:
if shutdown_event.is_set():
break

if message['type'] == 'message':
data = redis_utils.decode_message(message['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)
except Exception as e:
logger.error(f"Redis error: {e}")
finally:
logger.info("Redis listener stopping...")
logger.error(f"Redis listener error: {e} — reconnecting in {delay:.1f}s")
await asyncio.sleep(delay)
delay = min(delay * 2, backoff_max)
finally:
if r is not None:
with contextlib.suppress(Exception):
await r.aclose()

logger.info("Redis listener stopping...")


async def _handle_client_message(websocket, raw: str, redis_client):
Expand Down
Loading