Skip to content

Producer.poll(timeout) blocks for full timeout after delivery callback fires (regression in 2.13.0) #2215

@launay12u

Description

Description

Producer.poll(timeout) no longer returns early after processing delivery callbacks when timeout > 200ms. Instead it blocks for the entire timeout duration, regardless of whether events were served.

This is a regression introduced in 2.13.0 by the "wakeable poll" refactor (PR #2126 ). The original single-call implementation of rd_kafka_poll() returned as soon as events were processed. The new chunked loop replicates this correctly for Consumer.poll() but not for Producer.poll().

The producer loop accumulates chunk_result but never breaks when chunk_result > 0, causing it to exhaust all remaining chunks even after the delivery callback has fired.

Step to reproduce

import time
from confluent_kafka import Producer, __version__

KAFKA_BOOTSTRAP = "localhost:9092"
POLL_TIMEOUT = 5.0

print(f"confluent-kafka version: {__version__}")

producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP})

def on_delivery(err, msg):
    print(f"  [{time.time():.3f}] on_delivery fired — err={err}, topic={msg.topic()}")

print(f"[{time.time():.3f}] produce()")
producer.produce("test-topic", value=b"hello", on_delivery=on_delivery)

print(f"[{time.time():.3f}] poll({POLL_TIMEOUT}) start")
t0 = time.time()
producer.poll(POLL_TIMEOUT)
elapsed = time.time() - t0
print(f"[{time.time():.3f}] poll() returned after {elapsed:.3f}s")

Output in 2.12.2

confluent-kafka version: 2.12.2
[1733305268.206] produce()
[1733305268.206] poll(5.0) start
  [1733305268.532] on_delivery firederr=None, topic=test-topic
[1733305268.532] poll() returned after 0.325s

Output in 2.13.2

confluent-kafka version: 2.13.2
[1733305281.264] produce()
[1733305281.265] poll(5.0) start
  [1733305281.276] on_delivery firederr=None, topic=test-topic
[1733305286.078] poll() returned after 4.813s

Potential Cause

In Producer_poll0 (src/confluent_kafka/src/Producer.c), the wakeable loop does not break when chunk_result > 0

Potential fix

Add an early exit in Producer_poll0 after events are served, matching the original rd_kafka_poll() single-call behaviour and the consumer implementation:

int chunk_result = rd_kafka_poll(self->rk, chunk_timeout_ms);
if (chunk_result < 0) { r = chunk_result; break; }

r += chunk_result;
if (chunk_result > 0) break;  //return early once events are served

The new chunked implementation was intended to preserve this early-exit behaviour while adding signal interruptibility. It does so correctly for Consumer.poll() (if (rkm) break;) but the equivalent guard was omitted in Producer_poll0, making poll(5.0) in 2.13.x strictly worse than poll(5.0) in 2.12.x when the broker is fast.

Environment

  • confluent-kafka-python: 2.13.0, 2.13.1, 2.13.2
  • Python: 3.11+
  • OS: Linux

Thanks to the maintainers for the great work on confluent kafka. Happy to open a PR with the proposed fix if the issue is
confirmed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:producerIssues tied specifically to producer logic or code paths

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions