diff --git a/rb/clients.py b/rb/clients.py index 703022f..e7c06e2 100644 --- a/rb/clients.py +++ b/rb/clients.py @@ -386,8 +386,11 @@ def join(self, timeout=None): to be hit. """ remaining = timeout + failed = False - while self._cb_poll and (remaining is None or remaining > 0): + while (self._cb_poll and + (remaining is None or remaining > 0) and + not failed): now = time.time() rv = self._cb_poll.poll(remaining) if remaining is not None: @@ -411,8 +414,15 @@ def join(self, timeout=None): elif event in ('read', 'close'): try: command_buffer.wait_for_responses(self) - finally: - self._release_command_buffer(command_buffer) + except Exception: + failed = True + self._release_command_buffer(command_buffer) + + # If anything failed we want to cancel and release all command + # buffers + if failed: + self.cancel() + raise ConnectionError('Connection failure on join') if self._cb_poll and timeout is not None: raise TimeoutError('Did not receive all data in time.') @@ -420,7 +430,10 @@ def join(self, timeout=None): def cancel(self): """Cancels all outstanding requests.""" for command_buffer in self._cb_poll: - self._release_command_buffer(command_buffer) + try: + self._release_command_buffer(command_buffer) + except Exception: + pass class FanoutClient(MappingClient):