fix: Race conditions, memory usage, and Redis cleanup#567
Conversation
…, #562) - Eliminate stale stream detection race condition with atomic UPDATE WHERE (#551) - Fix analytics recomputation race condition with PostgreSQL ON CONFLICT upsert (#550) - Push DVR cutoff filter to database query to avoid O(n) memory usage (#559) - Add Subscriber.force_close() to prevent Redis pub/sub connection leaks (#562) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
e4df857 to
32c1631
Compare
There was a problem hiding this comment.
Pull request overview
This PR targets reliability/performance issues in the live streaming subsystem by making state transitions and analytics writes race-safe, reducing playlist memory usage, and hardening SSE Redis pub/sub cleanup.
Changes:
- Make stale stream transitions atomic via conditional
UPDATE ... WHEREchecks (eliminates check-then-act races). - Replace analytics “check then insert/update” with a single database-level upsert (
ON CONFLICT DO UPDATE). - Push DVR cutoff filtering into the segment query to avoid loading/filtering large segment lists in Python.
- Add
Subscriber.force_close()and use it in SSE cleanup to prevent Redis pub/sub resource leaks on close timeouts.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
api/live_tasks.py |
Uses conditional updates to prevent stale-detection races when segments arrive concurrently. |
api/studio_analytics.py |
Switches analytics summary persistence to an atomic upsert. |
api/live_playlist.py |
Applies DVR window cutoff at the database level to reduce memory usage. |
api/studio_sse.py |
Uses force_close() for subscriber cleanup to mitigate Redis pool exhaustion. |
api/pubsub.py |
Introduces Subscriber.force_close() to enforce cleanup even when graceful close hangs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def force_close(self, timeout: float = 2.0) -> None: | ||
| """ | ||
| Force cleanup of pub/sub connection even if graceful close hangs. | ||
|
|
||
| Unlike close(), this captures the underlying pub/sub reference before | ||
| clearing state, then attempts a timed close. If that also times out, | ||
| it forcibly disconnects the underlying connection. (Issue #562) | ||
| """ | ||
| pubsub = self._pubsub | ||
| self._pubsub = None | ||
| self._subscribed_channels.clear() | ||
| self._subscribed_patterns.clear() | ||
|
|
There was a problem hiding this comment.
force_close() clears _subscribed_channels / _subscribed_patterns and then calls pubsub.close() without attempting unsubscribe/punsubscribe like close() does. This makes cleanup semantics inconsistent and can leave server-side subscriptions around until the connection is torn down (and also makes it harder to reason about behavior when pubsub.close() succeeds). Consider capturing the channel/pattern lists before clearing state and attempting a best-effort unsubscribe/punsubscribe (with a short timeout) before the timed close, similar to close().
| try: | ||
| await asyncio.wait_for(pubsub.close(), timeout=timeout) | ||
| except asyncio.TimeoutError: | ||
| logger.error("Pub/Sub force_close timed out - disconnecting underlying connection") | ||
| try: | ||
| if hasattr(pubsub, 'connection') and pubsub.connection: | ||
| await pubsub.connection.disconnect() | ||
| except Exception as e: | ||
| logger.error(f"Failed to disconnect pub/sub connection: {e}") | ||
| except Exception as e: | ||
| logger.debug(f"Error during force_close: {e}") | ||
|
|
There was a problem hiding this comment.
There are existing unit tests covering Subscriber.close() cleanup behavior, but the newly added force_close() (including the timeout path that disconnects the underlying connection) isn't covered. Adding tests for both the normal path (calls pubsub.close()) and timeout path (forces disconnect) would help prevent Redis connection leaks from regressing.
api/studio_analytics.py
Outdated
| upsert_stmt = pg_insert(stream_analytics_summary).values( | ||
| stream_id=stream_id, | ||
| **analytics_values, | ||
| ).on_conflict_do_update( | ||
| index_elements=[stream_analytics_summary.c.stream_id], |
There was a problem hiding this comment.
This upsert is built with PostgreSQL-specific pg_insert(...).on_conflict_do_update(...). The codebase supports SQLite via DATABASE_URL=sqlite:///..., so this will raise at runtime under SQLite. Consider a dialect-agnostic upsert strategy (e.g., transaction + insert with IntegrityError fallback to update), or branching on the active dialect to use the appropriate ON CONFLICT implementation.
…lity - Add best-effort unsubscribe/punsubscribe to force_close() before closing, consistent with close() semantics - Make analytics upsert dialect-aware: uses PostgreSQL ON CONFLICT DO UPDATE when available, falls back to UPDATE-then-INSERT for SQLite - Add 5 tests for force_close() covering normal path, timeout/disconnect fallback, no-op, unsubscribe error, and close error scenarios Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Batch fix for 4 open reliability and performance issues:
UPDATE ... WHEREthat includes status + timestamp conditions, so a segment arriving between SELECT and UPDATE causes the UPDATE to match 0 rows instead of prematurely terminating the streamdatabase.transaction()to prevent concurrent requests from both attempting inserts and hitting duplicate key errorssubscriber.close()times out, preventing connection pool exhaustionNote: #548 (WebSocket broadcast sequential sending) was already fixed in
7a0ce90—broadcast_to_streamalready usesasyncio.gatherwith bounded concurrency.Files Changed
api/live_tasks.pyapi/studio_analytics.pyapi/live_playlist.pyapi/studio_sse.pyTest plan
Closes #551, closes #550, closes #559, closes #562
🤖 Generated with Claude Code