Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions api/live_playlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,30 +233,27 @@ async def update_variant_playlist(
if dvr_window_seconds > 0:
cutoff = now.timestamp() - dvr_window_seconds

# Fetch segments for this quality
# Fetch segments for this quality, pushing DVR cutoff filter to the
# database to avoid loading all segments into memory. (Issue #559)
query = (
live_stream_segments.select()
.where(live_stream_segments.c.stream_id == stream_id)
.where(live_stream_segments.c.quality == quality)
.order_by(live_stream_segments.c.sequence_number)
)

if cutoff:
cutoff_dt = datetime.fromtimestamp(cutoff, tz=timezone.utc)
query = query.where(live_stream_segments.c.received_at >= cutoff_dt)

query = query.order_by(live_stream_segments.c.sequence_number)

segments = await fetch_all_with_retry(query)

if not segments:
logger.debug(f"No segments yet for {slug}/{quality}")
return

# Apply DVR window filter
if cutoff:
filtered_segments = []
for seg in segments:
received_at = seg["received_at"]
if received_at and received_at.timestamp() >= cutoff:
filtered_segments.append(dict(seg))
segments = filtered_segments
else:
segments = [dict(seg) for seg in segments]
segments = [dict(seg) for seg in segments]

if not segments:
return
Expand Down
56 changes: 33 additions & 23 deletions api/live_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,26 @@ async def detect_stale_streams() -> int:
)

for stream in live_streams_query:
# Check if there's been a recent segment (race condition check)
if stream["last_segment_at"] and stream["last_segment_at"] >= stale_cutoff:
continue

# Transition to ending
await db_execute_with_retry(
# Atomic UPDATE with WHERE conditions on status AND timestamp.
# This eliminates the race condition where a segment arrives between
# our SELECT and UPDATE — the UPDATE simply won't match any rows
# if the stream is no longer stale. (Issue #551)
result = await db_execute_with_retry(
live_streams.update()
.where(live_streams.c.id == stream["id"])
.where(live_streams.c.status == "live")
.where(
sa.or_(
live_streams.c.last_segment_at < stale_cutoff,
live_streams.c.last_segment_at.is_(None),
)
)
.values(status="ending")
)

logger.info(f"Stream {stream['slug']} marked as ending (no segments received)")
transitions += 1
if result > 0:
logger.info(f"Stream {stream['slug']} marked as ending (no segments received)")
transitions += 1

# Check ending streams for final timeout
ending_streams = await fetch_all_with_retry(
Expand All @@ -153,29 +159,33 @@ async def detect_stale_streams() -> int:
)

for stream in ending_streams:
# Check if there's been a recent segment (grace period)
if stream["last_segment_at"] and stream["last_segment_at"] >= final_cutoff:
continue

# Finalize stream
await db_execute_with_retry(
# Atomic UPDATE with WHERE conditions on status AND timestamp.
# Same race-condition fix as the live->ending transition. (Issue #551)
result = await db_execute_with_retry(
live_streams.update()
.where(live_streams.c.id == stream["id"])
.where(live_streams.c.status == "ending")
.where(
sa.or_(
live_streams.c.last_segment_at < final_cutoff,
live_streams.c.last_segment_at.is_(None),
)
)
.values(status="ended", ended_at=now)
)

logger.info(f"Stream {stream['slug']} marked as ended (stale timeout)")
if result > 0:
logger.info(f"Stream {stream['slug']} marked as ended (stale timeout)")

# Finalize playlists and trigger VOD recording
if stream["auto_record_vod"]:
try:
await finalize_playlists_for_vod(stream["id"])
await trigger_vod_recording(stream["id"])
except Exception as e:
logger.error(f"Failed to create VOD for stream {stream['slug']}: {e}")
# Finalize playlists and trigger VOD recording
if stream["auto_record_vod"]:
try:
await finalize_playlists_for_vod(stream["id"])
await trigger_vod_recording(stream["id"])
except Exception as e:
logger.error(f"Failed to create VOD for stream {stream['slug']}: {e}")

transitions += 1
transitions += 1

return transitions

Expand Down
39 changes: 39 additions & 0 deletions api/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,45 @@ async def close(self) -> None:
except Exception as e:
logger.debug(f"Error closing pub/sub: {e}")

async def force_close(self, timeout: float = 2.0) -> None:
"""
Force cleanup of pub/sub connection even if graceful close hangs.

Unlike close(), this captures the underlying pub/sub reference before
clearing state, then attempts a timed close. If that also times out,
it forcibly disconnects the underlying connection. (Issue #562)
"""
pubsub = self._pubsub
channels = list(self._subscribed_channels)
patterns = list(self._subscribed_patterns)
self._pubsub = None
self._subscribed_channels.clear()
self._subscribed_patterns.clear()

Comment on lines 414 to 428
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force_close() clears _subscribed_channels / _subscribed_patterns and then calls pubsub.close() without attempting unsubscribe/punsubscribe like close() does. This makes cleanup semantics inconsistent and can leave server-side subscriptions around until the connection is torn down (and also makes it harder to reason about behavior when pubsub.close() succeeds). Consider capturing the channel/pattern lists before clearing state and attempting a best-effort unsubscribe/punsubscribe (with a short timeout) before the timed close, similar to close().

Copilot uses AI. Check for mistakes.
if not pubsub:
return

# Best-effort unsubscribe before closing (consistent with close())
try:
if channels:
await asyncio.wait_for(pubsub.unsubscribe(*channels), timeout=1.0)
if patterns:
await asyncio.wait_for(pubsub.punsubscribe(*patterns), timeout=1.0)
except Exception as e:
logger.debug(f"Best-effort unsubscribe during force_close failed (non-critical): {e}")

try:
await asyncio.wait_for(pubsub.close(), timeout=timeout)
except asyncio.TimeoutError:
logger.error("Pub/Sub force_close timed out - disconnecting underlying connection")
try:
if hasattr(pubsub, 'connection') and pubsub.connection:
await pubsub.connection.disconnect()
except Exception as e:
logger.error(f"Failed to disconnect pub/sub connection: {e}")
except Exception as e:
logger.debug(f"Error during force_close: {e}")

Comment on lines +441 to +452
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are existing unit tests covering Subscriber.close() cleanup behavior, but the newly added force_close() (including the timeout path that disconnects the underlying connection) isn't covered. Adding tests for both the normal path (calls pubsub.close()) and timeout path (forces disconnect) would help prevent Redis connection leaks from regressing.

Copilot uses AI. Check for mistakes.
@property
def is_active(self) -> bool:
"""Check if subscription is active."""
Expand Down
70 changes: 37 additions & 33 deletions api/studio_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from api.studio import require_csrf
from config import (
DATABASE_URL,
LIVE_ENABLED,
RATE_LIMIT_ENABLED,
RATE_LIMIT_STORAGE_URL,
Expand Down Expand Up @@ -329,44 +330,47 @@ async def recompute_analytics(
total_watch_minutes = 0.0 # Would need session tracking
average_watch_time_seconds = 0.0 # Would need session tracking

# Upsert analytics summary
# Check if exists
existing_query = sa.select(stream_analytics_summary.c.stream_id).where(
stream_analytics_summary.c.stream_id == stream_id
# Atomic upsert to eliminate the TOCTOU race condition where concurrent
# requests could both see "not exists" and attempt inserts. (Issue #550)
# Uses PostgreSQL INSERT ... ON CONFLICT DO UPDATE when available,
# falls back to UPDATE-then-INSERT for other dialects (e.g., SQLite).
analytics_values = dict(
peak_viewers=peak_viewers,
average_viewers=average_viewers,
total_unique_viewers=total_unique_viewers,
total_chat_messages=total_chat_messages,
total_watch_minutes=total_watch_minutes,
average_watch_time_seconds=average_watch_time_seconds,
stream_duration_seconds=stream_duration_seconds,
computed_at=now,
)
existing = await fetch_one_with_retry(existing_query)

if existing:
# Update
update_query = (
stream_analytics_summary.update()
.where(stream_analytics_summary.c.stream_id == stream_id)
.values(
peak_viewers=peak_viewers,
average_viewers=average_viewers,
total_unique_viewers=total_unique_viewers,
total_chat_messages=total_chat_messages,
total_watch_minutes=total_watch_minutes,
average_watch_time_seconds=average_watch_time_seconds,
stream_duration_seconds=stream_duration_seconds,
computed_at=now,
)
if DATABASE_URL.startswith("postgresql"):
from sqlalchemy.dialects.postgresql import insert as pg_insert

upsert_stmt = pg_insert(stream_analytics_summary).values(
stream_id=stream_id,
**analytics_values,
).on_conflict_do_update(
index_elements=[stream_analytics_summary.c.stream_id],
set_=analytics_values,
)
await db_execute_with_retry(update_query)
await db_execute_with_retry(upsert_stmt)
else:
# Insert
insert_query = stream_analytics_summary.insert().values(
stream_id=stream_id,
peak_viewers=peak_viewers,
average_viewers=average_viewers,
total_unique_viewers=total_unique_viewers,
total_chat_messages=total_chat_messages,
total_watch_minutes=total_watch_minutes,
average_watch_time_seconds=average_watch_time_seconds,
stream_duration_seconds=stream_duration_seconds,
computed_at=now,
# Fallback for non-PostgreSQL: try UPDATE first, INSERT if no rows matched.
# Not fully atomic but acceptable for SQLite (single-writer).
result = await db_execute_with_retry(
stream_analytics_summary.update()
.where(stream_analytics_summary.c.stream_id == stream_id)
.values(**analytics_values)
)
await db_execute_with_retry(insert_query)
if result == 0:
await db_execute_with_retry(
stream_analytics_summary.insert().values(
stream_id=stream_id,
**analytics_values,
)
)

logger.info(
f"Recomputed analytics for stream {stream_slug}",
Expand Down
14 changes: 7 additions & 7 deletions api/studio_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,17 @@ async def event_generator():
# Cleanup connection tracking FIRST - must succeed even if subscriber fails
await _remove_connection(stream_id, connection_id)

# Then cleanup subscriber with timeout - allowed to fail
# Cleanup subscriber using force_close() which handles its own
# timeouts internally. Unlike close(), force_close() captures the
# underlying pub/sub reference before clearing state, so if the
# close operation hangs it can still forcibly disconnect the
# connection and prevent Redis pool exhaustion. (Issue #562)
if subscriber:
try:
await asyncio.wait_for(subscriber.close(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning(
f"SSE subscriber close timed out for {connection_id}"
)
await subscriber.force_close(timeout=5.0)
except Exception as e:
logger.warning(
f"SSE subscriber close error for {connection_id}: {e}"
f"SSE subscriber cleanup error for {connection_id}: {e}"
)

logger.debug(f"SSE connection {connection_id} closed")
Expand Down
88 changes: 88 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Cleanup and resource management
"""

import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -449,6 +450,93 @@ def test_is_active_without_pubsub(self):
assert subscriber.is_active is False


class TestSubscriberForceClose:
"""Tests for Subscriber.force_close method (Issue #562)."""

@pytest.mark.asyncio
async def test_force_close_cleans_up_normally(self):
"""Should unsubscribe and close when close() completes in time."""
subscriber = Subscriber()
mock_pubsub = AsyncMock()
subscriber._pubsub = mock_pubsub
subscriber._subscribed_channels = {"channel1", "channel2"}
subscriber._subscribed_patterns = {"pattern:*"}

await subscriber.force_close(timeout=2.0)

# Should attempt unsubscribe before close
mock_pubsub.unsubscribe.assert_called_once()
call_args = set(mock_pubsub.unsubscribe.call_args[0])
assert call_args == {"channel1", "channel2"}
mock_pubsub.punsubscribe.assert_called_once_with("pattern:*")
mock_pubsub.close.assert_called_once()
assert subscriber._pubsub is None
assert len(subscriber._subscribed_channels) == 0
assert len(subscriber._subscribed_patterns) == 0

@pytest.mark.asyncio
async def test_force_close_disconnects_on_timeout(self):
"""Should forcibly disconnect when close() hangs."""
subscriber = Subscriber()
mock_pubsub = AsyncMock()
mock_connection = AsyncMock()
mock_pubsub.connection = mock_connection

# Make close() hang forever
async def hanging_close():
await asyncio.sleep(999)

mock_pubsub.close = hanging_close
subscriber._pubsub = mock_pubsub
subscriber._subscribed_channels = {"channel1"}

await subscriber.force_close(timeout=0.1)

# Should have fallen back to disconnect
mock_connection.disconnect.assert_called_once()
assert subscriber._pubsub is None
assert len(subscriber._subscribed_channels) == 0

@pytest.mark.asyncio
async def test_force_close_noop_without_pubsub(self):
"""Should be a no-op when no pubsub object exists."""
subscriber = Subscriber()
subscriber._pubsub = None

# Should not raise
await subscriber.force_close()

assert subscriber._pubsub is None

@pytest.mark.asyncio
async def test_force_close_handles_unsubscribe_error(self):
"""Should still close even if unsubscribe fails."""
subscriber = Subscriber()
mock_pubsub = AsyncMock()
mock_pubsub.unsubscribe = AsyncMock(side_effect=Exception("Redis gone"))
subscriber._pubsub = mock_pubsub
subscriber._subscribed_channels = {"channel1"}

await subscriber.force_close(timeout=2.0)

# Should still close the connection despite unsubscribe failure
mock_pubsub.close.assert_called_once()
assert subscriber._pubsub is None

@pytest.mark.asyncio
async def test_force_close_handles_close_error(self):
"""Should not raise when close() raises a non-timeout error."""
subscriber = Subscriber()
mock_pubsub = AsyncMock()
mock_pubsub.close = AsyncMock(side_effect=ConnectionError("Already closed"))
subscriber._pubsub = mock_pubsub

# Should not raise
await subscriber.force_close(timeout=2.0)

assert subscriber._pubsub is None


class TestSubscriberListen:
"""Tests for Subscriber.listen method."""

Expand Down
Loading