Skip to content

Race condition when doing .assign(INVALID_OFFSET) followed by .assign(explicit_offset) #2184

@solutionseekeras

Description

@solutionseekeras

confluent-kafka: v2.13.0

We have a use case where we want to start consuming at specific offsets. These offsets are stored outside of Kafka (i.e. not using the consumer group functionality).

The way we do this is (we are aware you recommend this way, however we think this is a bug you'd like to address anyway):

  1. consumer = Consumer({"auto.offset.reset": "latest"})
  2. consumer.assign(OFFSET_INVALID) (OFFSET_INVALID forces use of the auto offset reset policy)
  3. time.sleep(x)
  4. consumer.assign(explicit_offset)
  5. consumer.consume()

I.e. we assign with the OFFSET_INVALID magic value for all partitions, making the client talk to the broker to get the offsets to actually use. Then, we optionally sleep (this is to more easily reproduce, see explanation below), and then assign the offsets we would really like. (We realize this does seem like a weird solution out of context, it makes more sense within our abstractions.)

What we sometimes see is that the offsets we assign in step 4 are not the ones we get in step 5: We get the broker-inferred offsets instead. Whether this happens depends on the timing between our second assign() and the broker's response to the first:

  • Case 1 (No race): The broker responds to the first assign(OFFSET_INVALID) before we issue the second assign(explicit_offset). Our explicit offsets are used correctly.
  • Case 2 (Race condition): The broker responds in two batches: One arriving before, and one arriving after we issue the second assign(explicit_offset). The second batch response overwrites our explicit offsets.

The sleep duration x controls how often we hit each case:

Sleep duration Behavior
x = 0s Race is rare - second assign usually happens before broker responds
x = 0.5s Race is frequent - timing window where both cases can occur
x = 5s Race is very rare/never - first assign fully completes before second

The solution for our use case is to just directly assign the offsets we want instead of using OFFSET_INVALID. However, we believe this should be of interest to Confluent since the library does appear to try to handle this situation (x = 0s does mostly behave correctly, after all), but in some cases the timing is such that we have a race condition. We also believe that this is probably an issue in librdkafka and not specific to the Python library, which means it likely affects a lot more users.

Log Evidence

We have captured detailed librdkafka debug logs that clearly show the race condition using the consumer debug context. We have 1 topic with 128 partitions, and three brokers. Here's a trace comparing a successful run vs. a failing run:

Case 1: No Race (broker responds before second assign)

Timestamp (ms) Event Details
034.267 First assign(OFFSET_INVALID) 128 partitions in pending state
034.341 OffsetFetchRequest sent to broker For 128 partitions
034.558 FETCH starts @ OFFSET_INFERRED (epoch 57) All 128 partitions resolved from broker
034.875 Second assign(explicit_offset) Clears and re-assigns 128 partitions
034.878 FETCH starts @ OFFSET_EXPLICIT (epoch -1) All 128 partitions with explicit offsets
034.878 OffsetFetchRequest triggered For 128 partitions
035.449 Consumer closed -

Result: No issue - the first OffsetFetchRequest completed (all 128 partitions resolved) before the second assign, so no pending responses could overwrite.

Case 2: Race Condition Triggers (broker responds partly after second assign)

Timestamp (ms) Event Details
035.565 First assign(OFFSET_INVALID) 128 partitions in pending state
035.565 OffsetFetchRequest sent to broker For 128 partitions
035.897 FETCH starts @ OFFSET_INFERRED (epoch 57) 96/128 partitions resolved from broker
036.087 Second assign(explicit_offset) Clears and re-assigns 128 partitions
036.089 FETCH starts @ OFFSET_EXPLICIT (epoch -1) All 128 partitions with explicit offsets
036.093 OffsetFetchRequest triggered For 128 partitions
036.671 FETCH starts @ OFFSET_INFERRED (epoch 57) 32/128 partitions overwritten with stale offsets!
037.241 Consumer closed -

Result: Gap detected! 32 partitions consumed from OFFSET_INFERRED instead of OFFSET_EXPLICIT.

Key Observation

The leader epoch -1 in the FETCH log indicates an explicit offset assignment without epoch validation. The leader epoch 57 indicates an offset that came from the broker's OffsetFetchResponse.

The race occurs when:

  1. First assign(OFFSET_INVALID) triggers an OffsetFetchRequest to resolve the actual offset
  2. Second assign(explicit_offset) is issued before the response arrives
  3. The broker's response arrives after the second assign
  4. librdkafka incorrectly applies the stale response, overwriting the explicit offset

Timing Sensitivity

The key factor is whether the first OffsetFetchRequest completes before the second assign:

  • Case 1: First assign at 034.267, all 128 partitions resolved at 034.558, second assign at 034.875.
    608ms between first assign and second assign. The broker response completed in ~291ms, leaving plenty of margin.

  • Case 2: First assign at 035.565, only 96/128 partitions resolved at 035.897, second assign at 036.087.
    522ms between first assign and second assign. Only 96 partitions resolved in ~332ms; the remaining 32 partitions' responses arrived at 036.671 (~584ms after the second assign) and overwrote the explicit offsets.

This explains why x = 0.5s in the original description triggers the issue frequently - it's enough time for some broker responses to complete but not all, creating a window where the second assign happens while responses are still pending. The 32 partitions affected are likely those handled by a slower broker or network path.

More details for completeness

We have 1 topic with 128 partitions, and three brokers.

Case 1: No Race (broker responds before second assign)

%7|1770120034.099|INIT| [thrd:app]: librdkafka v2.13.0 (0x20d00ff) XXX initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x2000)
%7|1770120034.267|ASSIGN| [thrd:main]: Group "d5a0baa0-8338-4689-97d6-a59c0f8d2840": new assignment of 128 partition(s) in join-state init
%7|1770120034.267|CLEARASSIGN| [thrd:main]: No current assignment to clear
%7|1770120034.267|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.341|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120034.454 - 1770120034.575|FETCH| [thrd:main]: Partition XX [128/128 partitions] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120034.875|ASSIGN| [thrd:main]: Group "d5a0baa0-8338-4689-97d6-a59c0f8d2840": new assignment of 128 partition(s) in join-state init
%7|1770120034.875|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120034.875|REMOVE| [thrd:main]: Served 128 removed partition(s), with 0 offset(s) to commit
%7|1770120034.875|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
...
%7|1770120034.876 - 1770120034.878|FETCH| [thrd:main]: Partition XX [128/128 partitions] start fetching at offset OFFSET_EXPLICIT (leader epoch -1)
...
%7|1770120034.878|OFFSET| [thrd:main]: GroupCoordinator/1: Group d5a0baa0-8338-4689-97d6-a59c0f8d2840 OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120034.878|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120035.449|CLOSE| [thrd:app]: Closing consumer
%7|1770120035.449|CLOSE| [thrd:app]: Waiting for close events
%7|1770120035.449|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120035.449|REMOVE| [thrd:main]: Served 128 removed partition(s), with 128 offset(s) to commit
%7|1770120035.450|CLOSE| [thrd:app]: Consumer closed
%7|1770120035.450|DESTROY| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1770120035.450|DESTROY| [thrd:main]: Destroy internal
%7|1770120035.450|DESTROY| [thrd:main]: Removing all topics

Case 2: Race Condition Triggers (broker responds partly after second assign)

We here use partition 127 as an example of a partition that fails, but in fact it happens for a total of 32 partitions, as can be seen in the logs.

%7|1770120035.453|INIT| [thrd:app]: librdkafka v2.13.0 (0x20d00ff) XXX initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x2000)
%7|1770120035.565|ASSIGN| [thrd:main]: Group "7b6d2f90-8022-430e-927b-461861fdb95f": new assignment of 128 partition(s) in join-state init
%7|1770120035.565|CLEARASSIGN| [thrd:main]: No current assignment to clear
%7|1770120035.565|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
%7|1770120035.565|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120035.565|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
%7|1770120035.566|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120035.566|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120035.694 - 1770120036.077|FETCH| [thrd:main]: Partition XX [96 out of 128 partitions (note: this includes some partitions delegated to the broker responsible for partition 27 as well)] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120036.087|ASSIGN| [thrd:main]: Group "7b6d2f90-8022-430e-927b-461861fdb95f": new assignment of 128 partition(s) in join-state init
%7|1770120036.087|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120036.088|REMOVE| [thrd:main]: Served 128 removed partition(s), with 0 offset(s) to commit
%7|1770120036.088|ASSIGNMENT| [thrd:main]: Added 128 partition(s) to assignment which now consists of 128 partition(s) where of 128 are in pending state and 0 are being queried
...
%7|1770120036.089 - 1770120036.092|FETCH| [thrd:main]: Partition XX [128/128 partitions, including partition 27] start fetching at offset OFFSET_EXPLICIT (leader epoch -1)
...
%7|1770120036.093|OFFSET| [thrd:main]: GroupCoordinator/1: Group 7b6d2f90-8022-430e-927b-461861fdb95f OffsetFetchRequest(v7) for 128/128 partition(s)
%7|1770120036.093|OFFSET| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 128/128 partition(s)
...
%7|1770120036.658 - 1770120036.714|FETCH| [thrd:main]: Partition XX [32/128 partitions, incl. partition 127] start fetching at offset OFFSET_INFERRED (leader epoch 57)
...
%7|1770120037.241|CLOSE| [thrd:app]: Closing consumer
%7|1770120037.241|CLOSE| [thrd:app]: Waiting for close events
%7|1770120037.241|CLEARASSIGN| [thrd:main]: Clearing current assignment of 128 partition(s)
%7|1770120037.241|REMOVE| [thrd:main]: Served 128 removed partition(s), with 115 offset(s) to commit
%7|1770120037.243|CLOSE| [thrd:app]: Consumer closed
%7|1770120037.243|DESTROY| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1770120037.243|DESTROY| [thrd:main]: Destroy internal
%7|1770120037.243|DESTROY| [thrd:main]: Removing all topics
...
Gap detected at XX 27: expected OFFSET_EXPLICIT but got OFFSET_INFERRED

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions