From cfce1be1936a50dfe3b47285c87eecf4a75b271c Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Thu, 14 Sep 2017 18:46:39 +0200 Subject: [PATCH 1/2] fix: Catch errors for connection releases on cancel --- rb/clients.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rb/clients.py b/rb/clients.py index 703022f..7ce9548 100644 --- a/rb/clients.py +++ b/rb/clients.py @@ -420,7 +420,10 @@ def join(self, timeout=None): def cancel(self): """Cancels all outstanding requests.""" for command_buffer in self._cb_poll: - self._release_command_buffer(command_buffer) + try: + self._release_command_buffer(command_buffer) + except Exception: + pass class FanoutClient(MappingClient): From 5feb3cdc7a799022d734c616cd8982feee42f704 Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Thu, 14 Sep 2017 19:01:40 +0200 Subject: [PATCH 2/2] fix: Cancel command buffers on join failure --- rb/clients.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/rb/clients.py b/rb/clients.py index 7ce9548..e7c06e2 100644 --- a/rb/clients.py +++ b/rb/clients.py @@ -386,8 +386,11 @@ def join(self, timeout=None): to be hit. """ remaining = timeout + failed = False - while self._cb_poll and (remaining is None or remaining > 0): + while (self._cb_poll and + (remaining is None or remaining > 0) and + not failed): now = time.time() rv = self._cb_poll.poll(remaining) if remaining is not None: @@ -411,8 +414,15 @@ def join(self, timeout=None): elif event in ('read', 'close'): try: command_buffer.wait_for_responses(self) - finally: - self._release_command_buffer(command_buffer) + except Exception: + failed = True + self._release_command_buffer(command_buffer) + + # If anything failed we want to cancel and release all command + # buffers + if failed: + self.cancel() + raise ConnectionError('Connection failure on join') if self._cb_poll and timeout is not None: raise TimeoutError('Did not receive all data in time.')