Skip to content

Add Redis heartbeat liveness probe so pubsub subscribers self-recover after car power-cycles#75

Merged
haoruizhou merged 12 commits into
mainfrom
stack-resilience
Jun 12, 2026
Merged

Add Redis heartbeat liveness probe so pubsub subscribers self-recover after car power-cycles#75
haoruizhou merged 12 commits into
mainfrom
stack-resilience

Conversation

@haoruizhou

@haoruizhou haoruizhou commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

What

The base-station DAQ stack doesn't self-recover when the car power-cycles. The Redis pubsub subscribers (data.py uplink relay, websocket_bridge.py bridge) sit in async for message in pubsub.listen(): loops with no liveness probe, so a half-dead TCP connection or lost subscription state makes them silently stop receiving until a manual docker restart brings the system back.

This adds a heartbeat published on a Redis pubsub channel (telemetry_heartbeat). The base data process publishes to it every 1s; every subscriber also subscribes to it alongside its data channels. The subscriber loop uses non-blocking pubsub.get_message(timeout=1.0) and measures liveness on the pubsub connection itself — if no message of any kind (heartbeat or data) arrives for HEARTBEAT_STALE_S = 5s, the connection is presumed half-dead, the pump returns, and the outer while True: re-subscribes. Dead pubsub connections self-heal in ~5s without operator action.

Why publish on a channel instead of SET-ing a key

An out-of-band liveness check (e.g. GET telemetry:heartbeat) cannot detect the failure we actually hit. The "connected-but-broken" state is specific to the pubsub connection; regular commands like GET travel over a different pool connection that redis-py transparently reconnects. So the key would always look fresh while the pubsub socket sits dark — the probe would never fire. Publishing the heartbeat over the subscribed channel measures the real end-to-end pubsub path, which is the thing that breaks.

Files

  • src/config.py — add REDIS_HEARTBEAT_CHANNEL and HEARTBEAT_STALE_S
  • src/heartbeat.py (new) — run_heartbeat_writer (publishes the heartbeat) and pump_pubsub_with_heartbeat (the shared subscriber drain/reconnect helper, with should_stop hook and heartbeat-message filtering)
  • src/data.py — wire the writer into run_base(); wrap the uplink relay in a reconnect loop using the pump (with nonlocal uplink_seq — see "Bug caught in review")
  • src/websocket_bridge.py — refactor the bridge subscriber to use the pump; subscribe to the heartbeat channel; pass shutdown_event.is_set as should_stop so SIGTERM stops the bridge cleanly
  • .github/workflows/telemetry-ci.yml — add tests/test_heartbeat.py to the unit-test job (it was not being run in CI)
  • tests/test_heartbeat.py (new) — 7 tests: writer behavior, writer error recovery, the pump's stale-reconnect path, heartbeat-message filtering, should_stop shutdown, and the get_message error path

Test coverage

83 passed, 1 skipped on the telemetry-ci unit set (the existing set plus the new tests/test_heartbeat.py). test_pump_reconnects_when_pubsub_goes_silent drives the real failure mode (pubsub silent while Redis stays otherwise reachable) and asserts the heartbeat stale warning fires via caplog. Tests monkeypatch the heartbeat interval so the file runs in ~1.2s.

Bug caught in review

The plan said "copy the handler body verbatim" into a new nested _relay function in data.py. That broke the uplink_seq += 1 because Python's scope rules treat any name assigned in a function as local — UnboundLocalError on the first message, silently swallowed by the helper's handler error catch. Fixed with nonlocal uplink_seq.

A second bug surfaced during this review: the uplink relay had no outer reconnect loop, so once the pump returned to force a re-subscribe, the relay would exit permanently and close its socket. It now has a real reconnect loop with client cleanup, matching the bridge.

Shutdown correctness

The bridge's old async for loop broke out of listen() on shutdown_event, letting the listener task finish. The pump loops until should_stop() returns true, so the bridge passes shutdown_event.is_set and SIGTERM stops it promptly instead of hanging until Docker SIGKILLs it.

Out of scope (intentional)

  • ws_relay.py:208 subscribes over a WebSocket, not Redis pubsub, so the heartbeat pattern doesn't apply there.
  • The macOS-UDP-socket half of the original brittleness is not addressed by this plan; only the Redis pubsub path is.
  • No integration test for the relay path itself against a live Redis. Worth a follow-up.

Comment thread universal-telemetry-software/tests/test_heartbeat.py Fixed
Comment thread universal-telemetry-software/tests/test_heartbeat.py Fixed
Comment thread universal-telemetry-software/src/heartbeat.py Fixed

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3b2b48d30d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +148 to +150
last_hb_mono = time.monotonic()
if time.monotonic() - last_hb_mono > stale_s:
_log.warning("heartbeat stale, forcing pubsub reconnect")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid treating Redis GET success as pubsub liveness

When the heartbeat key is readable, this immediately resets last_hb_mono and then compares it to the same timestamp, so the stale branch can never fire while the key exists. In the half-dead pubsub/lost-subscription scenario this change is meant to recover from, regular Redis commands can still succeed and run_base() keeps telemetry:heartbeat fresh, so the pump stays in this loop without ever returning to re-subscribe even though no pubsub messages are arriving.

Useful? React with 👍 / 👎.

Comment on lines +972 to +974
await TelemetryNode._pump_pubsub_with_heartbeat(
pubsub, r, _relay, log=logger,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Re-subscribe the uplink relay after the pump returns

With ENABLE_UPLINK=true, _pump_pubsub_with_heartbeat() is documented to return so an outer loop can re-subscribe, but this call is not inside any loop. If the helper returns because get_message() errored or the heartbeat went stale, uplink_relay() falls through to finally, closes the UDP socket, and the task completes under asyncio.gather() with no new Redis subscription, so future WebSocket can_send messages published to can_uplink are no longer forwarded until the process is restarted.

Useful? React with 👍 / 👎.

…bscriptions

Publish a heartbeat on the telemetry_heartbeat pubsub channel every second and
have each subscriber measure liveness on the pubsub connection itself: if no
message arrives for HEARTBEAT_STALE_S, tear down and re-subscribe. An
out-of-band key check can't see a half-dead pubsub connection because regular
commands use a different pool connection that redis-py silently reconnects.

Wrap the uplink relay in a reconnect loop so the pump returning actually
re-subscribes instead of exiting, and pass shutdown_event.is_set as should_stop
so SIGTERM stops the bridge cleanly. Add the heartbeat tests to CI.
task.cancel()
try:
await task
except asyncio.CancelledError:
task.cancel()
try:
await task
except asyncio.CancelledError:
@haoruizhou haoruizhou merged commit f40656f into main Jun 12, 2026
40 checks passed
@haoruizhou haoruizhou deleted the stack-resilience branch June 12, 2026 18:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant