Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/telemetry-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ jobs:
tests/test_page_lock.py \
tests/test_stats_publisher.py \
tests/test_status_server.py \
tests/test_heartbeat.py \
-v

# ── Integration tests: full docker-compose stack ─────────────────────────
Expand Down
2 changes: 2 additions & 0 deletions universal-telemetry-software/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
REDIS_STATS_CHANNEL = "system_stats"
REDIS_DIAG_CHANNEL = "link_diagnostics"
REDIS_WS_CLIENTS_KEY = "websocket_bridge:clients"
REDIS_HEARTBEAT_CHANNEL = "telemetry_heartbeat"
HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if no pubsub message arrives for this long

# ── Feature flags ─────────────────────────────────────────────────────────────
ENABLE_UPLINK = os.getenv("ENABLE_UPLINK", "false").lower() == "true"
Expand Down
96 changes: 57 additions & 39 deletions universal-telemetry-software/src/data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import socket
import struct
import time
Expand All @@ -14,9 +15,10 @@
from src.config import (
REMOTE_IP, UDP_PORT, TCP_PORT,
REDIS_URL, REDIS_CAN_CHANNEL, REDIS_UPLINK_CHANNEL, ENABLE_UPLINK,
REDIS_WS_CLIENTS_KEY,
REDIS_WS_CLIENTS_KEY, REDIS_HEARTBEAT_CHANNEL,
)
from src import redis_utils, utils
from src.heartbeat import pump_pubsub_with_heartbeat, run_heartbeat_writer
from src.version import get_git_hash

BATCH_SIZE = 20
Expand Down Expand Up @@ -863,51 +865,63 @@ async def uplink_relay():
uplink_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
uplink_seq = 0

try:
r = aioredis.from_url(REDIS_URL)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_UPLINK_CHANNEL)
logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}")

async for message in pubsub.listen():
if message['type'] != 'message':
continue
async def _relay(msg):
nonlocal uplink_seq
if msg['type'] != 'message':
return
try:
data = redis_utils.decode_message(msg['data'])
uplink_msg = json.loads(data)

try:
data = redis_utils.decode_message(message['data'])
uplink_msg = json.loads(data)
can_id = uplink_msg.get("canId")
can_data = uplink_msg.get("data", [])
ref = uplink_msg.get("ref", "unknown")

can_id = uplink_msg.get("canId")
can_data = uplink_msg.get("data", [])
ref = uplink_msg.get("ref", "unknown")
if can_id is None or not isinstance(can_id, int) or can_id < 0:
logger.warning(f"Uplink relay: invalid canId in ref={ref}")
return
if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8:
logger.warning(f"Uplink relay: invalid data in ref={ref}")
return

if can_id is None or not isinstance(can_id, int) or can_id < 0:
logger.warning(f"Uplink relay: invalid canId in ref={ref}")
continue
if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8:
logger.warning(f"Uplink relay: invalid data in ref={ref}")
continue
# Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message
uplink_seq += 1
data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data))
can_msg = CANMessage(time.time(), can_id, data_bytes)

# Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message
uplink_seq += 1
data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data))
can_msg = CANMessage(time.time(), can_id, data_bytes)
payload = UPLINK_MAGIC
payload += struct.pack("!QH", uplink_seq, 1)
payload += can_msg.pack()

payload = UPLINK_MAGIC
payload += struct.pack("!QH", uplink_seq, 1)
payload += can_msg.pack()
try:
uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT))
logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}")
except (PermissionError, OSError) as e:
logger.error(f"Uplink UDP send failed: {e}")

try:
uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT))
logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}")
except (PermissionError, OSError) as e:
logger.error(f"Uplink UDP send failed: {e}")
except Exception as e:
logger.error(f"Uplink relay error: {e}")

# Reconnect loop: the pump returns whenever the pubsub connection
# goes silent past HEARTBEAT_STALE_S, and we re-subscribe here.
try:
while True:
r = None
try:
r = aioredis.from_url(REDIS_URL)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_UPLINK_CHANNEL, REDIS_HEARTBEAT_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_UPLINK_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}")
await pump_pubsub_with_heartbeat(pubsub, _relay, log=logger)
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Uplink relay error: {e}")

except Exception as e:
logger.error(f"Uplink relay Redis error: {e}")
logger.error(f"Uplink relay Redis error: {e}")
await asyncio.sleep(1.0)
finally:
if r is not None:
with contextlib.suppress(Exception):
await r.aclose()
finally:
uplink_sock.close()

Expand Down Expand Up @@ -983,7 +997,11 @@ async def version_checker():
logger.debug(f"Version check error: {e}")
await asyncio.sleep(30.0)

tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event)]
# Base mode has a real Redis server; create an async client for the
# heartbeat writer. The writer publishes on the heartbeat channel every
# 1s so pubsub subscribers can detect a dead subscription and reconnect.
_async_redis = aioredis.from_url(REDIS_URL)
tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event), run_heartbeat_writer(_async_redis)]
if ENABLE_UPLINK:
tasks.append(uplink_relay())
await asyncio.gather(*tasks)
Expand Down
95 changes: 95 additions & 0 deletions universal-telemetry-software/src/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Pubsub liveness probe — the producer publishes a heartbeat message on a Redis
pubsub channel every second, and every subscriber also subscribes to that
channel. Liveness is measured on the pubsub connection itself: if no message
of any kind (heartbeat or data) arrives for HEARTBEAT_STALE_S, the connection
is presumed half-dead and the subscriber tears down and re-subscribes.

An out-of-band check (e.g. GET on a heartbeat key) cannot detect this state:
regular commands use a different pool connection that redis-py transparently
reconnects, so the key would look fresh while the pubsub connection is dark.
See docs/superpowers/plans/2026-06-11-stack-resilience.md.
"""
import asyncio
import json
import logging
import time

from .config import HEARTBEAT_STALE_S, REDIS_HEARTBEAT_CHANNEL

logger = logging.getLogger(__name__)

HEARTBEAT_INTERVAL_S = 1.0


async def run_heartbeat_writer(redis_client=None) -> None:
"""Publish a heartbeat message on REDIS_HEARTBEAT_CHANNEL every HEARTBEAT_INTERVAL_S.

Stops on cancellation. Logs and continues on any other exception so transient
Redis blips don't take down the writer; if Redis is persistently down, the
surrounding supervisor (systemd / Docker) is expected to restart the process.
Skips silently if no Redis client is available (car mode without Redis).
"""
if redis_client is None:
return
start_mono = time.monotonic()
while True:
try:
payload = json.dumps({"uptime_s": time.monotonic() - start_mono,
"wall_ts": time.time()})
await redis_client.publish(REDIS_HEARTBEAT_CHANNEL, payload)
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(f"Heartbeat publish failed: {e}")
await asyncio.sleep(HEARTBEAT_INTERVAL_S)


async def pump_pubsub_with_heartbeat(pubsub, on_message, *,
stale_s: float = HEARTBEAT_STALE_S,
should_stop=None,
log=None):
"""Drain a Redis pubsub, returning if the connection goes silent.

Replaces the naive `async for message in pubsub.listen():` pattern that
silently goes dark when the TCP connection or subscription state is lost
after a car power-cycle. The pubsub must be subscribed to
REDIS_HEARTBEAT_CHANNEL in addition to its data channels; with the producer
publishing every second, more than `stale_s` of silence means the
subscription is dead, so the pump returns and the caller's outer loop
re-subscribes. Heartbeat messages only refresh the liveness clock and are
not forwarded to `on_message`.

`on_message` is an `async` callable invoked with the raw pubsub message
dict for each non-heartbeat message. `should_stop` is an optional callable
checked every iteration (pass `shutdown_event.is_set` for clean SIGTERM).
Returns on staleness, pubsub errors, or should_stop; raises on cancellation.
"""
_log = log or logger
last_msg_mono = time.monotonic() # armed at subscribe time
while True:
if should_stop is not None and should_stop():
return
try:
msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True)
except asyncio.CancelledError:
raise
except Exception as e:
_log.warning(f"pubsub.get_message error, reconnecting: {e}")
await asyncio.sleep(0.5)
return # let the outer while-True re-subscribe

if msg is not None:
last_msg_mono = time.monotonic()
channel = msg.get("channel")
if isinstance(channel, bytes):
channel = channel.decode("utf-8", errors="replace")
if channel == REDIS_HEARTBEAT_CHANNEL:
continue # liveness signal only — don't forward
try:
await on_message(msg)
except Exception as e:
_log.error(f"subscriber handler error: {e}")
elif time.monotonic() - last_msg_mono > stale_s:
_log.warning("heartbeat stale, forcing pubsub reconnect")
return
36 changes: 20 additions & 16 deletions universal-telemetry-software/src/websocket_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
REDIS_UPLINK_CHANNEL,
REDIS_DIAG_CHANNEL,
REDIS_WS_CLIENTS_KEY,
REDIS_HEARTBEAT_CHANNEL,
ENABLE_UPLINK,
)
from src import redis_utils, utils
from src.heartbeat import pump_pubsub_with_heartbeat

logger = logging.getLogger("WebSocketBridge")

Expand Down Expand Up @@ -297,24 +299,26 @@ async def redis_listener():
try:
r = redis.from_url(REDIS_URL, health_check_interval=30)
pubsub = r.pubsub()
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL,
REDIS_HEARTBEAT_CHANNEL)
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}")
delay = backoff_min # reset backoff once a subscribe succeeds

async for message in pubsub.listen():
if shutdown_event.is_set():
break

if message['type'] == 'message':
data = redis_utils.decode_message(message['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)
async def _handler(msg):
if msg['type'] != 'message':
return
data = redis_utils.decode_message(msg['data'])

# Broadcast to all connected clients
if connected_clients:
# Create tasks for sending to each client to avoid blocking
await asyncio.gather(
*[client.send(data) for client in connected_clients],
return_exceptions=True
)

await pump_pubsub_with_heartbeat(pubsub, _handler,
should_stop=shutdown_event.is_set, log=logger)
except asyncio.CancelledError:
raise
except Exception as e:
Expand Down
Loading
Loading