-
Notifications
You must be signed in to change notification settings - Fork 937
Description
confluent-kafka: v2.13.0
We have a use case where we want to start consuming at specific offsets. These offsets are stored outside of Kafka (i.e. not using the consumer group functionality).
The way we do this is (we are aware you recommend this way, however we think this is a bug you'd like to address anyway):
consumer = Consumer({"auto.offset.reset": "latest"})consumer.assign(OFFSET_INVALID)(OFFSET_INVALID forces use of the auto offset reset policy)time.sleep(x)consumer.assign(explicit_offset)consumer.consume()
I.e. we assign with the OFFSET_INVALID magic value for all partitions, making the client talk to the broker to get the offsets to actually use. Then, we optionally sleep (this is to more easily reproduce, see explanation below), and then assign the offsets we would really like. (We realize this does seem like a weird solution out of context, it makes more sense within our abstractions.)
What we sometimes see is that the offsets we assign in step 4 are not the ones we get in step 5: We get the broker-inferred offsets instead. Whether this happens depends on the timing between our second assign() and the broker's response to the first:
- Case 1 (No race): The broker responds to the first
assign(OFFSET_INVALID)before we issue the secondassign(explicit_offset). Our explicit offsets are used correctly. - Case 2 (Race condition): The broker responds in two batches: One arriving before, and one arriving after we issue the second
assign(explicit_offset). The second batch response overwrites our explicit offsets.
The sleep duration x controls how often we hit each case:
| Sleep duration | Behavior |
|---|---|
x = 0s |
Race is rare - second assign usually happens before broker responds |
x = 0.5s |
Race is frequent - timing window where both cases can occur |
x = 5s |
Race is very rare/never - first assign fully completes before second |
The solution for our use case is to just directly assign the offsets we want instead of using OFFSET_INVALID. However, we believe this should be of interest to Confluent since the library does appear to try to handle this situation (x = 0s does mostly behave correctly, after all), but in some cases the timing is such that we have a race condition. We also believe that this is probably an issue in librdkafka and not specific to the Python library, which means it likely affects a lot more users.
Log Evidence
We have captured detailed librdkafka debug logs that clearly show the race condition using the consumer debug context. We have 1 topic with 128 partitions, and three brokers. Here's a trace comparing a successful run vs. a failing run:
Case 1: No Race (broker responds before second assign)
| Timestamp (ms) | Event | Details |
|---|---|---|
| 034.267 | First assign(OFFSET_INVALID) |
128 partitions in pending state |
| 034.341 | OffsetFetchRequest sent to broker |
For 128 partitions |
| 034.558 | FETCH starts @ OFFSET_INFERRED (epoch 57) | All 128 partitions resolved from broker |
| 034.875 | Second assign(explicit_offset) |
Clears and re-assigns 128 partitions |
| 034.878 | FETCH starts @ OFFSET_EXPLICIT (epoch -1) | All 128 partitions with explicit offsets |
| 034.878 | OffsetFetchRequest triggered |
For 128 partitions |
| 035.449 | Consumer closed | - |
Result: No issue - the first OffsetFetchRequest completed (all 128 partitions resolved) before the second assign, so no pending responses could overwrite.
Case 2: Race Condition Triggers (broker responds partly after second assign)
| Timestamp (ms) | Event | Details |
|---|---|---|
| 035.565 | First assign(OFFSET_INVALID) |
128 partitions in pending state |
| 035.565 | OffsetFetchRequest sent to broker |
For 128 partitions |
| 035.897 | FETCH starts @ OFFSET_INFERRED (epoch 57) | 96/128 partitions resolved from broker |
| 036.087 | Second assign(explicit_offset) |
Clears and re-assigns 128 partitions |
| 036.089 | FETCH starts @ OFFSET_EXPLICIT (epoch -1) | All 128 partitions with explicit offsets |
| 036.093 | OffsetFetchRequest triggered |
For 128 partitions |
| 036.671 | FETCH starts @ OFFSET_INFERRED (epoch 57) | 32/128 partitions overwritten with stale offsets! |
| 037.241 | Consumer closed | - |
Result: Gap detected! 32 partitions consumed from OFFSET_INFERRED instead of OFFSET_EXPLICIT.
Key Observation
The leader epoch -1 in the FETCH log indicates an explicit offset assignment without epoch validation. The leader epoch 57 indicates an offset that came from the broker's OffsetFetchResponse.
The race occurs when:
- First
assign(OFFSET_INVALID)triggers anOffsetFetchRequestto resolve the actual offset - Second
assign(explicit_offset)is issued before the response arrives - The broker's response arrives after the second assign
- librdkafka incorrectly applies the stale response, overwriting the explicit offset
Timing Sensitivity
The key factor is whether the first OffsetFetchRequest completes before the second assign:
-
Case 1: First assign at 034.267, all 128 partitions resolved at 034.558, second assign at 034.875.
→ 608ms between first assign and second assign. The broker response completed in ~291ms, leaving plenty of margin. -
Case 2: First assign at 035.565, only 96/128 partitions resolved at 035.897, second assign at 036.087.
→ 522ms between first assign and second assign. Only 96 partitions resolved in ~332ms; the remaining 32 partitions' responses arrived at 036.671 (~584ms after the second assign) and overwrote the explicit offsets.
This explains why x = 0.5s in the original description triggers the issue frequently - it's enough time for some broker responses to complete but not all, creating a window where the second assign happens while responses are still pending. The 32 partitions affected are likely those handled by a slower broker or network path.
More details for completeness
We have 1 topic with 128 partitions, and three brokers.
Case 1: No Race (broker responds before second assign)
%7|1770120034.099|INIT| [thrd:app]: librdkafka v2.13.0 (0x20d00ff) XXX initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x2000)
%7|1770120034.267|ASSIGN| [thrd:main]: Group "d5a0baa0-8338-4689-97d6-a59c0f8d2840": new assignment of 128 partition(s) in join-state init
%7|1770120034.267|CLEARASSIGN| [thrd:main]: No current assignment to clear
%7|1770120034.267|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120034.454 - 1770120034.575|FETCH| [thrd:main]: Partition XX [128/128 partitions] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120034.875|ASSIGN| [thrd:main]: Group "d5a0baa0-8338-4689-97d6-a59c0f8d2840": new assignment of 128 partition(s) in join-state init
%7|1770120034.875|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120034.875|REMOVE| [thrd:main]: Served 128 removed partition(s), with 0 offset(s) to commit
%7|1770120034.875|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
...
%7|1770120034.876 - 1770120034.878|FETCH| [thrd:main]: Partition XX [128/128 partitions] start fetching at offset OFFSET_EXPLICIT (leader epoch -1)
...
%7|1770120034.878|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.878|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120035.449|CLOSE| [thrd:app]: Closing consumer
%7|1770120035.449|CLOSE| [thrd:app]: Waiting for close events
%7|1770120035.449|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120035.449|REMOVE| [thrd:main]: Served 128 removed partition(s), with 128 offset(s) to commit
%7|1770120035.450|CLOSE| [thrd:app]: Consumer closed
%7|1770120035.450|DESTROY| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1770120035.450|DESTROY| [thrd:main]: Destroy internal
%7|1770120035.450|DESTROY| [thrd:main]: Removing all topics
Case 2: Race Condition Triggers (broker responds partly after second assign)
We here use partition 127 as an example of a partition that fails, but in fact it happens for a total of 32 partitions, as can be seen in the logs.
%7|1770120035.453|INIT| [thrd:app]: librdkafka v2.13.0 (0x20d00ff) XXX initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x2000)
%7|1770120035.565|ASSIGN| [thrd:main]: Group "7b6d2f90-8022-430e-927b-461861fdb95f": new assignment of 128 partition(s) in join-state init
%7|1770120035.565|CLEARASSIGN| [thrd:main]: No current assignment to clear
%7|1770120035.565|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
%7|1770120035.565|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120035.565|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120035.566|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120035.566|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120035.694 - 1770120036.077|FETCH| [thrd:main]: Partition XX [96 out of 128 partitions (note: this includes some partitions delegated to the broker responsible for partition 27 as well)] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120036.087|ASSIGN| [thrd:main]: Group "7b6d2f90-8022-430e-927b-461861fdb95f": new assignment of 128 partition(s) in join-state init
%7|1770120036.087|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120036.088|REMOVE| [thrd:main]: Served 128 removed partition(s), with 0 offset(s) to commit
%7|1770120036.088|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
...
%7|1770120036.089 - 1770120036.092|FETCH| [thrd:main]: Partition XX [128/128 partitions, including partition 27] start fetching at offset OFFSET_EXPLICIT (leader epoch -1)
...
%7|1770120036.093|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120036.093|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120036.658 - 1770120036.714|FETCH| [thrd:main]: Partition XX [32/128 partitions, incl. partition 127] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120037.241|CLOSE| [thrd:app]: Closing consumer
%7|1770120037.241|CLOSE| [thrd:app]: Waiting for close events
%7|1770120037.241|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120037.241|REMOVE| [thrd:main]: Served 128 removed partition(s), with 115 offset(s) to commit
%7|1770120037.243|CLOSE| [thrd:app]: Consumer closed
%7|1770120037.243|DESTROY| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1770120037.243|DESTROY| [thrd:main]: Destroy internal
%7|1770120037.243|DESTROY| [thrd:main]: Removing all topics
...
Gap detected at XX 27: expected OFFSET_EXPLICIT but got OFFSET_INFERRED