Add Redis heartbeat liveness probe so pubsub subscribers self-recover after car power-cycles#75
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3b2b48d30d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| last_hb_mono = time.monotonic() | ||
| if time.monotonic() - last_hb_mono > stale_s: | ||
| _log.warning("heartbeat stale, forcing pubsub reconnect") |
There was a problem hiding this comment.
Avoid treating Redis GET success as pubsub liveness
When the heartbeat key is readable, this immediately resets last_hb_mono and then compares it to the same timestamp, so the stale branch can never fire while the key exists. In the half-dead pubsub/lost-subscription scenario this change is meant to recover from, regular Redis commands can still succeed and run_base() keeps telemetry:heartbeat fresh, so the pump stays in this loop without ever returning to re-subscribe even though no pubsub messages are arriving.
Useful? React with 👍 / 👎.
| await TelemetryNode._pump_pubsub_with_heartbeat( | ||
| pubsub, r, _relay, log=logger, | ||
| ) |
There was a problem hiding this comment.
Re-subscribe the uplink relay after the pump returns
With ENABLE_UPLINK=true, _pump_pubsub_with_heartbeat() is documented to return so an outer loop can re-subscribe, but this call is not inside any loop. If the helper returns because get_message() errored or the heartbeat went stale, uplink_relay() falls through to finally, closes the UDP socket, and the task completes under asyncio.gather() with no new Redis subscription, so future WebSocket can_send messages published to can_uplink are no longer forwarded until the process is restarted.
Useful? React with 👍 / 👎.
…bscriptions Publish a heartbeat on the telemetry_heartbeat pubsub channel every second and have each subscriber measure liveness on the pubsub connection itself: if no message arrives for HEARTBEAT_STALE_S, tear down and re-subscribe. An out-of-band key check can't see a half-dead pubsub connection because regular commands use a different pool connection that redis-py silently reconnects. Wrap the uplink relay in a reconnect loop so the pump returning actually re-subscribes instead of exiting, and pass shutdown_event.is_set as should_stop so SIGTERM stops the bridge cleanly. Add the heartbeat tests to CI.
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: |
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: |
What
The base-station DAQ stack doesn't self-recover when the car power-cycles. The Redis pubsub subscribers (
data.pyuplink relay,websocket_bridge.pybridge) sit inasync for message in pubsub.listen():loops with no liveness probe, so a half-dead TCP connection or lost subscription state makes them silently stop receiving until a manualdocker restartbrings the system back.This adds a heartbeat published on a Redis pubsub channel (
telemetry_heartbeat). The base data process publishes to it every 1s; every subscriber also subscribes to it alongside its data channels. The subscriber loop uses non-blockingpubsub.get_message(timeout=1.0)and measures liveness on the pubsub connection itself — if no message of any kind (heartbeat or data) arrives forHEARTBEAT_STALE_S = 5s, the connection is presumed half-dead, the pump returns, and the outerwhile True:re-subscribes. Dead pubsub connections self-heal in ~5s without operator action.Why publish on a channel instead of SET-ing a key
An out-of-band liveness check (e.g.
GET telemetry:heartbeat) cannot detect the failure we actually hit. The "connected-but-broken" state is specific to the pubsub connection; regular commands likeGETtravel over a different pool connection that redis-py transparently reconnects. So the key would always look fresh while the pubsub socket sits dark — the probe would never fire. Publishing the heartbeat over the subscribed channel measures the real end-to-end pubsub path, which is the thing that breaks.Files
src/config.py— addREDIS_HEARTBEAT_CHANNELandHEARTBEAT_STALE_Ssrc/heartbeat.py(new) —run_heartbeat_writer(publishes the heartbeat) andpump_pubsub_with_heartbeat(the shared subscriber drain/reconnect helper, withshould_stophook and heartbeat-message filtering)src/data.py— wire the writer intorun_base(); wrap the uplink relay in a reconnect loop using the pump (withnonlocal uplink_seq— see "Bug caught in review")src/websocket_bridge.py— refactor the bridge subscriber to use the pump; subscribe to the heartbeat channel; passshutdown_event.is_setasshould_stopso SIGTERM stops the bridge cleanly.github/workflows/telemetry-ci.yml— addtests/test_heartbeat.pyto the unit-test job (it was not being run in CI)tests/test_heartbeat.py(new) — 7 tests: writer behavior, writer error recovery, the pump's stale-reconnect path, heartbeat-message filtering,should_stopshutdown, and the get_message error pathTest coverage
83 passed, 1 skipped on the telemetry-ci unit set (the existing set plus the new
tests/test_heartbeat.py).test_pump_reconnects_when_pubsub_goes_silentdrives the real failure mode (pubsub silent while Redis stays otherwise reachable) and asserts theheartbeat stalewarning fires viacaplog. Tests monkeypatch the heartbeat interval so the file runs in ~1.2s.Bug caught in review
The plan said "copy the handler body verbatim" into a new nested
_relayfunction indata.py. That broke theuplink_seq += 1because Python's scope rules treat any name assigned in a function as local —UnboundLocalErroron the first message, silently swallowed by the helper's handler error catch. Fixed withnonlocal uplink_seq.A second bug surfaced during this review: the uplink relay had no outer reconnect loop, so once the pump returned to force a re-subscribe, the relay would exit permanently and close its socket. It now has a real reconnect loop with client cleanup, matching the bridge.
Shutdown correctness
The bridge's old
async forloop broke out oflisten()onshutdown_event, letting the listener task finish. The pump loops untilshould_stop()returns true, so the bridge passesshutdown_event.is_setand SIGTERM stops it promptly instead of hanging until Docker SIGKILLs it.Out of scope (intentional)
ws_relay.py:208subscribes over a WebSocket, not Redis pubsub, so the heartbeat pattern doesn't apply there.