Skip to content

Comments

fix: Race conditions, memory usage, and Redis cleanup#567

Merged
filthyrake merged 2 commits intodevfrom
fix/reliability-and-performance-batch-551-550-559-562-548
Feb 7, 2026
Merged

fix: Race conditions, memory usage, and Redis cleanup#567
filthyrake merged 2 commits intodevfrom
fix/reliability-and-performance-batch-551-550-559-562-548

Conversation

@filthyrake
Copy link
Owner

Summary

Batch fix for 4 open reliability and performance issues:

Note: #548 (WebSocket broadcast sequential sending) was already fixed in 7a0ce90broadcast_to_stream already uses asyncio.gather with bounded concurrency.

Files Changed

File Issue Change
api/live_tasks.py #551 Atomic UPDATE with WHERE on status + timestamp
api/studio_analytics.py #550 Transaction-wrapped upsert
api/live_playlist.py #559 Database-level DVR cutoff filtering
api/studio_sse.py #562 Force pub/sub reset on close timeout

Test plan

  • All 4 modified files compile cleanly
  • Existing analytics test suite passes (47/47 — 6 pre-existing integration failures unrelated to changes)
  • Verify stale stream detection doesn't prematurely terminate streams under load
  • Verify analytics recomputation handles concurrent requests without duplicate key errors
  • Verify playlist generation memory usage is reduced for long streams
  • Verify SSE connections don't leak Redis connections over time

Closes #551, closes #550, closes #559, closes #562

🤖 Generated with Claude Code

…, #562)

- Eliminate stale stream detection race condition with atomic UPDATE WHERE (#551)
- Fix analytics recomputation race condition with PostgreSQL ON CONFLICT upsert (#550)
- Push DVR cutoff filter to database query to avoid O(n) memory usage (#559)
- Add Subscriber.force_close() to prevent Redis pub/sub connection leaks (#562)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@filthyrake filthyrake force-pushed the fix/reliability-and-performance-batch-551-550-559-562-548 branch from e4df857 to 32c1631 Compare February 7, 2026 14:46
@filthyrake filthyrake requested a review from Copilot February 7, 2026 14:52
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR targets reliability/performance issues in the live streaming subsystem by making state transitions and analytics writes race-safe, reducing playlist memory usage, and hardening SSE Redis pub/sub cleanup.

Changes:

  • Make stale stream transitions atomic via conditional UPDATE ... WHERE checks (eliminates check-then-act races).
  • Replace analytics “check then insert/update” with a single database-level upsert (ON CONFLICT DO UPDATE).
  • Push DVR cutoff filtering into the segment query to avoid loading/filtering large segment lists in Python.
  • Add Subscriber.force_close() and use it in SSE cleanup to prevent Redis pub/sub resource leaks on close timeouts.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
api/live_tasks.py Uses conditional updates to prevent stale-detection races when segments arrive concurrently.
api/studio_analytics.py Switches analytics summary persistence to an atomic upsert.
api/live_playlist.py Applies DVR window cutoff at the database level to reduce memory usage.
api/studio_sse.py Uses force_close() for subscriber cleanup to mitigate Redis pool exhaustion.
api/pubsub.py Introduces Subscriber.force_close() to enforce cleanup even when graceful close hangs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 414 to 426
async def force_close(self, timeout: float = 2.0) -> None:
"""
Force cleanup of pub/sub connection even if graceful close hangs.

Unlike close(), this captures the underlying pub/sub reference before
clearing state, then attempts a timed close. If that also times out,
it forcibly disconnects the underlying connection. (Issue #562)
"""
pubsub = self._pubsub
self._pubsub = None
self._subscribed_channels.clear()
self._subscribed_patterns.clear()

Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force_close() clears _subscribed_channels / _subscribed_patterns and then calls pubsub.close() without attempting unsubscribe/punsubscribe like close() does. This makes cleanup semantics inconsistent and can leave server-side subscriptions around until the connection is torn down (and also makes it harder to reason about behavior when pubsub.close() succeeds). Consider capturing the channel/pattern lists before clearing state and attempting a best-effort unsubscribe/punsubscribe (with a short timeout) before the timed close, similar to close().

Copilot uses AI. Check for mistakes.
Comment on lines +430 to +441
try:
await asyncio.wait_for(pubsub.close(), timeout=timeout)
except asyncio.TimeoutError:
logger.error("Pub/Sub force_close timed out - disconnecting underlying connection")
try:
if hasattr(pubsub, 'connection') and pubsub.connection:
await pubsub.connection.disconnect()
except Exception as e:
logger.error(f"Failed to disconnect pub/sub connection: {e}")
except Exception as e:
logger.debug(f"Error during force_close: {e}")

Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are existing unit tests covering Subscriber.close() cleanup behavior, but the newly added force_close() (including the timeout path that disconnects the underlying connection) isn't covered. Adding tests for both the normal path (calls pubsub.close()) and timeout path (forces disconnect) would help prevent Redis connection leaks from regressing.

Copilot uses AI. Check for mistakes.
Comment on lines 348 to 352
upsert_stmt = pg_insert(stream_analytics_summary).values(
stream_id=stream_id,
**analytics_values,
).on_conflict_do_update(
index_elements=[stream_analytics_summary.c.stream_id],
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This upsert is built with PostgreSQL-specific pg_insert(...).on_conflict_do_update(...). The codebase supports SQLite via DATABASE_URL=sqlite:///..., so this will raise at runtime under SQLite. Consider a dialect-agnostic upsert strategy (e.g., transaction + insert with IntegrityError fallback to update), or branching on the active dialect to use the appropriate ON CONFLICT implementation.

Copilot uses AI. Check for mistakes.
…lity

- Add best-effort unsubscribe/punsubscribe to force_close() before closing,
  consistent with close() semantics
- Make analytics upsert dialect-aware: uses PostgreSQL ON CONFLICT DO UPDATE
  when available, falls back to UPDATE-then-INSERT for SQLite
- Add 5 tests for force_close() covering normal path, timeout/disconnect
  fallback, no-op, unsubscribe error, and close error scenarios

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@filthyrake filthyrake merged commit 32d5770 into dev Feb 7, 2026
2 of 3 checks passed
@filthyrake filthyrake deleted the fix/reliability-and-performance-batch-551-550-559-562-548 branch February 7, 2026 15:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant