Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions universal-telemetry-software/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import time
import uuid
import multiprocessing
Expand Down Expand Up @@ -284,11 +285,26 @@ def start_timescale_bridge():
try:
while True:
time.sleep(1)
# Monitor children
for p in processes:
if not p.is_alive():
logger.error(f"Process {p.name} died!")
# Optional: Restart logic
# Monitor children. A dead child means the pipeline is degraded —
# e.g. the telemetry or WebSocket process crashed. The parent stays
# alive in this loop, so neither Docker's `restart: unless-stopped`
# nor systemd's `Restart=always` ever sees the failure and the stack
# silently keeps running half-dead. Fail fast instead: tear down the
# surviving children and exit non-zero so the supervisor restarts the
# whole stack cleanly.
dead = [p for p in processes if not p.is_alive()]
if dead:
for p in dead:
logger.error(
f"Process {p.name} died (exitcode={p.exitcode}). "
"Shutting down for supervisor restart."
)
for p in processes:
if p.is_alive():
p.terminate()
for p in processes:
p.join(timeout=5)
sys.exit(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
for p in processes:
Expand Down
71 changes: 49 additions & 22 deletions universal-telemetry-software/src/websocket_bridge.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import redis.asyncio as redis
import websockets
import os
Expand Down Expand Up @@ -277,31 +278,57 @@ async def direct_queue_listener(queue: asyncio.Queue):


async def redis_listener():
"""Listens to Redis and broadcasts to all WS clients."""
try:
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
"""Listens to Redis and broadcasts to all WS clients.

Wrapped in a reconnect loop. A dropped Redis pub/sub connection — an idle
timeout, a transient blip on the Docker bridge network, or a Redis restart —
must not silently kill the data feed. Without this loop the coroutine would
exit on the first ConnectionError while the WebSocket server kept running:
PECAN stays connected but never receives another frame, so the dashboard
goes dead with no error visible anywhere. `health_check_interval` lets
redis-py detect a half-open connection instead of blocking forever in
listen().
"""
backoff_min, backoff_max = 0.5, 10.0
delay = backoff_min

async for message in pubsub.listen():
while not shutdown_event.is_set():
r = None
try:
r = redis.from_url(REDIS_URL, health_check_interval=30)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
delay = backoff_min # reset backoff once a subscribe succeeds

async for message in pubsub.listen():
if shutdown_event.is_set():
break

if message['type'] == 'message':
data = redis_utils.decode_message(message['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)
except asyncio.CancelledError:
raise
except Exception as e:
if shutdown_event.is_set():
break

if message['type'] == 'message':
data = redis_utils.decode_message(message['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)
except Exception as e:
logger.error(f"Redis error: {e}")
finally:
logger.info("Redis listener stopping...")
logger.error(f"Redis listener error: {e} — reconnecting in {delay:.1f}s")
await asyncio.sleep(delay)
delay = min(delay * 2, backoff_max)
finally:
if r is not None:
with contextlib.suppress(Exception):
await r.aclose()

logger.info("Redis listener stopping...")


async def _handle_client_message(websocket, raw: str, redis_client):
Expand Down
Loading