Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,7 @@ def __init__(self, host, port, afi, **configs):
# can use a simple dictionary of correlation_id => request data
self.in_flight_requests = dict()

self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
self._protocol = self._new_protocol_parser()
self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
self._sock = None
Expand All @@ -295,6 +293,12 @@ def __init__(self, host, port, afi, **configs):
self.config['metric_group_prefix'],
self.node_id)

def _new_protocol_parser(self):
return KafkaProtocol(
ident='%s:%d' % (self.host, self.port),
client_id=self.config['client_id'],
api_version=self.config['api_version'])

def _init_sasl_mechanism(self):
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
self._sasl_mechanism = get_sasl_mechanism(self.config['sasl_mechanism'])(host=self.host, **self.config)
Expand Down Expand Up @@ -934,9 +938,7 @@ def close(self, error=None):
self._api_versions_future = None
self._sasl_auth_future = None
self._init_sasl_mechanism()
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
self._protocol = self._new_protocol_parser()
self._send_buffer = b''
if error is None:
error = Errors.Cancelled(str(self))
Expand Down
9 changes: 7 additions & 2 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class KafkaProtocol(object):
Currently only used to check for 0.8.2 protocol quirks, but
may be used for more in the future.
"""
def __init__(self, client_id=None, api_version=None):
def __init__(self, client_id=None, api_version=None, ident=''):
self._ident = ident
if client_id is None:
client_id = self._gen_client_id()
self._client_id = client_id
Expand Down Expand Up @@ -53,7 +54,7 @@ def send_request(self, request, correlation_id=None):
Returns:
correlation_id
"""
log.debug('Sending request %s', request)
log.debug('Sending request %s', request.__class__.__name__)
if correlation_id is None:
correlation_id = self._next_correlation_id()

Expand All @@ -71,6 +72,8 @@ def send_bytes(self):
"""Retrieve all pending bytes to send on the network"""
data = b''.join(self.bytes_to_send)
self.bytes_to_send = []
if data:
log.debug('%s Send: %r', self._ident, data)
return data

def receive_bytes(self, data):
Expand All @@ -92,6 +95,8 @@ def receive_bytes(self, data):
i = 0
n = len(data)
responses = []
if data:
log.debug('%s Recv: %r', self._ident, data)
while i < n:

# Not receiving is the state of reading the payload header
Expand Down
Loading