-
Notifications
You must be signed in to change notification settings - Fork 3
fix: Race conditions, memory usage, and Redis cleanup #567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -411,6 +411,45 @@ async def close(self) -> None: | |
| except Exception as e: | ||
| logger.debug(f"Error closing pub/sub: {e}") | ||
|
|
||
| async def force_close(self, timeout: float = 2.0) -> None: | ||
| """ | ||
| Force cleanup of pub/sub connection even if graceful close hangs. | ||
|
|
||
| Unlike close(), this captures the underlying pub/sub reference before | ||
| clearing state, then attempts a timed close. If that also times out, | ||
| it forcibly disconnects the underlying connection. (Issue #562) | ||
| """ | ||
| pubsub = self._pubsub | ||
| channels = list(self._subscribed_channels) | ||
| patterns = list(self._subscribed_patterns) | ||
| self._pubsub = None | ||
| self._subscribed_channels.clear() | ||
| self._subscribed_patterns.clear() | ||
|
|
||
| if not pubsub: | ||
| return | ||
|
|
||
| # Best-effort unsubscribe before closing (consistent with close()) | ||
| try: | ||
| if channels: | ||
| await asyncio.wait_for(pubsub.unsubscribe(*channels), timeout=1.0) | ||
| if patterns: | ||
| await asyncio.wait_for(pubsub.punsubscribe(*patterns), timeout=1.0) | ||
| except Exception as e: | ||
| logger.debug(f"Best-effort unsubscribe during force_close failed (non-critical): {e}") | ||
|
|
||
| try: | ||
| await asyncio.wait_for(pubsub.close(), timeout=timeout) | ||
| except asyncio.TimeoutError: | ||
| logger.error("Pub/Sub force_close timed out - disconnecting underlying connection") | ||
| try: | ||
| if hasattr(pubsub, 'connection') and pubsub.connection: | ||
| await pubsub.connection.disconnect() | ||
| except Exception as e: | ||
| logger.error(f"Failed to disconnect pub/sub connection: {e}") | ||
| except Exception as e: | ||
| logger.debug(f"Error during force_close: {e}") | ||
|
|
||
|
Comment on lines
+441
to
+452
|
||
| @property | ||
| def is_active(self) -> bool: | ||
| """Check if subscription is active.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
force_close()clears_subscribed_channels/_subscribed_patternsand then callspubsub.close()without attemptingunsubscribe/punsubscribelikeclose()does. This makes cleanup semantics inconsistent and can leave server-side subscriptions around until the connection is torn down (and also makes it harder to reason about behavior whenpubsub.close()succeeds). Consider capturing the channel/pattern lists before clearing state and attempting a best-effort unsubscribe/punsubscribe (with a short timeout) before the timed close, similar toclose().