diff --git a/kafka/conn.py b/kafka/conn.py index a213a4c53..bfabde407 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -274,9 +274,7 @@ def __init__(self, host, port, afi, **configs): # can use a simple dictionary of correlation_id => request data self.in_flight_requests = dict() - self._protocol = KafkaProtocol( - client_id=self.config['client_id'], - api_version=self.config['api_version']) + self._protocol = self._new_protocol_parser() self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None @@ -295,6 +293,12 @@ def __init__(self, host, port, afi, **configs): self.config['metric_group_prefix'], self.node_id) + def _new_protocol_parser(self): + return KafkaProtocol( + ident='%s:%d' % (self.host, self.port), + client_id=self.config['client_id'], + api_version=self.config['api_version']) + def _init_sasl_mechanism(self): if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): self._sasl_mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(host=self.host, **self.config) @@ -934,9 +938,7 @@ def close(self, error=None): self._api_versions_future = None self._sasl_auth_future = None self._init_sasl_mechanism() - self._protocol = KafkaProtocol( - client_id=self.config['client_id'], - api_version=self.config['api_version']) + self._protocol = self._new_protocol_parser() self._send_buffer = b'' if error is None: error = Errors.Cancelled(str(self)) diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 84d9e8718..174178acc 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -22,7 +22,8 @@ class KafkaProtocol(object): Currently only used to check for 0.8.2 protocol quirks, but may be used for more in the future. """ - def __init__(self, client_id=None, api_version=None): + def __init__(self, client_id=None, api_version=None, ident=''): + self._ident = ident if client_id is None: client_id = self._gen_client_id() self._client_id = client_id @@ -53,7 +54,7 @@ def send_request(self, request, correlation_id=None): Returns: correlation_id """ - log.debug('Sending request %s', request) + log.debug('Sending request %s', request.__class__.__name__) if correlation_id is None: correlation_id = self._next_correlation_id() @@ -71,6 +72,8 @@ def send_bytes(self): """Retrieve all pending bytes to send on the network""" data = b''.join(self.bytes_to_send) self.bytes_to_send = [] + if data: + log.debug('%s Send: %r', self._ident, data) return data def receive_bytes(self, data): @@ -92,6 +95,8 @@ def receive_bytes(self, data): i = 0 n = len(data) responses = [] + if data: + log.debug('%s Recv: %r', self._ident, data) while i < n: # Not receiving is the state of reading the payload header