Skip to content

[improve][broker] PIP-473 P5.2: scalable-topics TC timeout + GC sweeps#25884

Merged
merlimat merged 4 commits into
apache:masterfrom
merlimat:mmerli/pip-473-tc-sweeps
Jun 2, 2026
Merged

[improve][broker] PIP-473 P5.2: scalable-topics TC timeout + GC sweeps#25884
merlimat merged 4 commits into
apache:masterfrom
merlimat:mmerli/pip-473-tc-sweeps

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

Summary

Adds the periodic background sweeps for the scalable-topics transaction coordinator landed in #25863 (P5.1). Both run on a dedicated single-thread scheduler started by PulsarService and gated on assign-partition-0 ownership — only the elected broker sweeps each cycle. Concurrent sweeps from a stale owner remain safe because every state transition is a header CAS; the election is purely an efficiency measure (per-partition scoping comes with the partitioned TC in a later phase).

Timeout sweep

Default cadence 60s. Scans the by-deadline index up to now and drives each expired open txn through endTransaction(ABORT), which re-reads and CAS-guards the header — so a txn the client commits in the same window is left alone (the resulting InvalidTxnStatusException / BadVersionException is treated as a benign race and logged at debug).

GC sweep

Default cadence 300s, retention 900s. For each terminal state, scans the by-final-state index up to now - retention. For each candidate:

  • If leftover /txn/op records remain — some participant hasn't applied the outcome yet, or never received the event (e.g. the TC crashed between the header CAS and the fan-out) — re-drive fanOutEvents and leave the header in place so the participant can re-read the true outcome. It removes its op records once it applies them, and a later GC pass — seeing no op records — deletes the header.
  • If no op records remain, delete the header.

This ordering closes the fan-out-durability gap lhotari raised on #25863 without ever stranding a committed txn's data: we never delete a header while a participant might still re-read it (which would default the outcome to ABORTED).

Config

Key Default
transactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds 60
transactionCoordinatorScalableTopicsGcIntervalSeconds 300
transactionCoordinatorScalableTopicsGcRetentionSeconds 900

All only meaningful when transactionCoordinatorScalableTopicsEnabled = true (still off by default).

Drive-by

Refactored fanOutEvents to use FutureUtil.waitForAll(List<CompletableFuture<Void>>) — matches the new sweep methods and addresses the same comment lhotari left on P5.1.

Test plan

  • pulsar-broker:test --tests TransactionCoordinatorV5Test — 5 new sweep cases plus all P5.1 cases:
    • sweepTimeouts_abortsExpiredOpenTxnAndFansOut
    • sweepTimeouts_leavesUnexpiredOpenTxnAlone
    • sweepGc_deletesHeaderWhenNoOpsRemain
    • sweepGc_repairsAndRetainsHeaderWhenOpsRemain (the fan-out-durability scenario)
    • sweeps_skipWhenNotElected
  • pulsar-broker:test --tests TxnMetadataStoreTest / MetadataTransactionBufferTest / MetadataPendingAckStoreTest — green.
  • Checkstyle clean (main + test).

Deferred / follow-ups

  • Per-partition sweep scoping lands with the partitioned TC (P5.3), replacing the single-elected-sweeper interim.
  • Pure metadata-store leader election also belongs to P5.3.
  • A leftover op record from a permanently-gone participant (segment deleted) currently keeps its header alive forever — the GC sweep keeps re-publishing harmlessly. A future phase can add a liveness check to force-cleanup, but doing so safely needs the participant-liveness signal that doesn't exist yet.

merlimat added 2 commits May 28, 2026 10:33
Adds the periodic background sweeps for the scalable-topics transaction
coordinator. Both run on a dedicated single-thread scheduler started by
PulsarService and gated on assign-partition-0 ownership — only the elected
broker sweeps each cycle. Concurrent sweeps from a stale owner remain safe
because every state transition is a header CAS; the election is purely an
efficiency measure.

Timeout sweep (default 60s): scans the by-deadline index up to `now` and drives
each expired open txn through endTransaction(ABORT), which re-reads and CAS-
guards the header — so a txn the client commits in the same window is left
alone (the InvalidTxnStatus / BadVersion is treated as a benign race).

GC sweep (default 300s, retention 900s): for each terminal state, scans the
by-final-state index up to `now - retention`. For each candidate, if leftover
/txn/op records remain — some participant hasn't applied the outcome, or never
received the event (e.g. TC crashed between header CAS and fan-out) — re-drives
the fan-out and leaves the header in place so the participant can re-read the
true outcome. The header is deleted only once no op records remain, so a
committed txn's data is never stranded as "unknown".

Config: transactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds (60),
transactionCoordinatorScalableTopicsGcIntervalSeconds (300),
transactionCoordinatorScalableTopicsGcRetentionSeconds (900).
…ture>)

Match the same pattern used by the new sweep methods (and lhotari's review
feedback on this code shape). Drops the CompletableFuture<?>[] + allOf in
favour of the typed list helper; no behavioural change.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor comment about handling "closed" state for GC operations.

merlimat added 2 commits May 30, 2026 09:58
- runSweep: catch InterruptedException separately and exit quietly (executor
  shutdownNow() interrupts an in-flight sweep.get() — that's the expected
  shutdown signal, not a failure). For other Throwables, downgrade to a
  no-op when closed is already set; only WARN for genuine sweep errors.

- ifElectedSweeper: short-circuit at entry and after the async ownership
  check resolves, so operations don't continue once close() has been called.
Defense-in-depth: the single-thread scheduler plus the blocking get() already
serialise sweep cycles, but a per-sweep AtomicBoolean (compareAndSet on entry,
reset in finally) makes overlap impossible even if the scheduling is later
changed to fixed-rate or a multi-threaded executor.
@merlimat merlimat merged commit 90c81d7 into apache:master Jun 2, 2026
43 checks passed
@merlimat merlimat deleted the mmerli/pip-473-tc-sweeps branch June 2, 2026 01:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants