Description
Producer.poll(timeout) no longer returns early after processing delivery callbacks when timeout > 200ms. Instead it blocks for the entire timeout duration, regardless of whether events were served.
This is a regression introduced in 2.13.0 by the "wakeable poll" refactor (PR #2126 ). The original single-call implementation of rd_kafka_poll() returned as soon as events were processed. The new chunked loop replicates this correctly for Consumer.poll() but not for Producer.poll().
The producer loop accumulates chunk_result but never breaks when chunk_result > 0, causing it to exhaust all remaining chunks even after the delivery callback has fired.
Step to reproduce
import time
from confluent_kafka import Producer, __version__
KAFKA_BOOTSTRAP = "localhost:9092"
POLL_TIMEOUT = 5.0
print(f"confluent-kafka version: {__version__}")
producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP})
def on_delivery(err, msg):
print(f" [{time.time():.3f}] on_delivery fired — err={err}, topic={msg.topic()}")
print(f"[{time.time():.3f}] produce()")
producer.produce("test-topic", value=b"hello", on_delivery=on_delivery)
print(f"[{time.time():.3f}] poll({POLL_TIMEOUT}) start")
t0 = time.time()
producer.poll(POLL_TIMEOUT)
elapsed = time.time() - t0
print(f"[{time.time():.3f}] poll() returned after {elapsed:.3f}s")
Output in 2.12.2
confluent-kafka version: 2.12.2
[1733305268.206] produce()
[1733305268.206] poll(5.0) start
[1733305268.532] on_delivery fired — err=None, topic=test-topic
[1733305268.532] poll() returned after 0.325s
Output in 2.13.2
confluent-kafka version: 2.13.2
[1733305281.264] produce()
[1733305281.265] poll(5.0) start
[1733305281.276] on_delivery fired — err=None, topic=test-topic
[1733305286.078] poll() returned after 4.813s
Potential Cause
In Producer_poll0 (src/confluent_kafka/src/Producer.c), the wakeable loop does not break when chunk_result > 0
Potential fix
Add an early exit in Producer_poll0 after events are served, matching the original rd_kafka_poll() single-call behaviour and the consumer implementation:
int chunk_result = rd_kafka_poll(self->rk, chunk_timeout_ms);
if (chunk_result < 0) { r = chunk_result; break; }
r += chunk_result;
if (chunk_result > 0) break; // ← return early once events are served
The new chunked implementation was intended to preserve this early-exit behaviour while adding signal interruptibility. It does so correctly for Consumer.poll() (if (rkm) break;) but the equivalent guard was omitted in Producer_poll0, making poll(5.0) in 2.13.x strictly worse than poll(5.0) in 2.12.x when the broker is fast.
Environment
- confluent-kafka-python: 2.13.0, 2.13.1, 2.13.2
- Python: 3.11+
- OS: Linux
Thanks to the maintainers for the great work on confluent kafka. Happy to open a PR with the proposed fix if the issue is
confirmed.
Description
Producer.poll(timeout)no longer returns early after processing delivery callbacks when timeout > 200ms. Instead it blocks for the entire timeout duration, regardless of whether events were served.This is a regression introduced in 2.13.0 by the "wakeable poll" refactor (PR #2126 ). The original single-call implementation of
rd_kafka_poll()returned as soon as events were processed. The new chunked loop replicates this correctly forConsumer.poll()but not forProducer.poll().The producer loop accumulates
chunk_resultbut never breaks whenchunk_result > 0, causing it to exhaust all remaining chunks even after the delivery callback has fired.Step to reproduce
Output in 2.12.2
Output in 2.13.2
Potential Cause
In
Producer_poll0(src/confluent_kafka/src/Producer.c), the wakeable loop does not break whenchunk_result > 0Potential fix
Add an early exit in
Producer_poll0after events are served, matching the originalrd_kafka_poll()single-call behaviour and the consumer implementation:The new chunked implementation was intended to preserve this early-exit behaviour while adding signal interruptibility. It does so correctly for
Consumer.poll()(if (rkm) break;) but the equivalent guard was omitted inProducer_poll0, makingpoll(5.0)in 2.13.x strictly worse thanpoll(5.0)in 2.12.x when the broker is fast.Environment
Thanks to the maintainers for the great work on confluent kafka. Happy to open a PR with the proposed fix if the issue is
confirmed.