Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3806,6 +3806,34 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean transactionCoordinatorScalableTopicsEnabled = false;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Interval, in seconds, at which the scalable-topics transaction coordinator sweeps"
+ " for timed-out open transactions and aborts them. Only the broker that owns"
+ " partition 0 of the transaction-coordinator-assign topic runs the sweep."
+ " Only relevant when transactionCoordinatorScalableTopicsEnabled = true."
)
private int transactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds = 60;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Interval, in seconds, at which the scalable-topics transaction coordinator sweeps"
+ " for finalized transactions whose retention has elapsed and garbage-collects"
+ " their metadata. Only relevant when transactionCoordinatorScalableTopicsEnabled"
+ " = true."
)
private int transactionCoordinatorScalableTopicsGcIntervalSeconds = 300;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "How long, in seconds, a finalized (committed/aborted) transaction's metadata is"
+ " retained before the scalable-topics transaction coordinator's GC sweep is"
+ " allowed to delete it. Gives participants time to observe the outcome via the"
+ " durable per-segment visibility state. Only relevant when"
+ " transactionCoordinatorScalableTopicsEnabled = true."
)
private int transactionCoordinatorScalableTopicsGcRetentionSeconds = 900;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,10 @@ public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop) {
if (transactionTimer != null) {
transactionTimer.stop();
}
if (transactionCoordinatorV5 != null) {
transactionCoordinatorV5.close();
transactionCoordinatorV5 = null;
}
MLPendingAckStoreProvider.closeBufferedWriterMetrics();
MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
if (this.offloaderStats != null) {
Expand Down Expand Up @@ -1047,6 +1051,7 @@ public void start() throws PulsarServerException {

if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
transactionCoordinatorV5 = new TransactionCoordinatorV5(this);
transactionCoordinatorV5.start();
}

transactionBufferProvider = TransactionBufferProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
*/
package org.apache.pulsar.broker.transaction.coordinator.v5;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
Expand All @@ -33,6 +41,7 @@
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.broker.transaction.metadata.TxnOp;
import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
import org.apache.pulsar.broker.transaction.metadata.TxnState;
import org.apache.pulsar.broker.transaction.metadata.Versioned;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand All @@ -46,9 +55,9 @@
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;

/**
* PIP-473 v5 transaction coordinator — broker-side service.
* Metadata-driven transaction coordinator for scalable topics — broker-side service.
*
* <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code N} iff it owns
* <p>Per-partition coordinator. A broker runs the TC for partition {@code N} iff it owns
* partition {@code N} of {@code SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
* leader-election mechanism the legacy {@code TransactionMetadataStoreService} uses; reusing
* it keeps the client-side discovery surface unchanged.
Expand All @@ -58,9 +67,9 @@
* <ul>
* <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
* <li>{@code NEW_TXN} → {@link #newTransaction}</li>
* <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — no-ops per PIP; the v5
* participants advertise themselves by writing {@code /txn/op} records, so the TC doesn't
* need a pre-registration step.</li>
* <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — no-ops; participants
* advertise themselves by writing {@code /txn/op} records, so the TC doesn't need a
* pre-registration step.</li>
* <li>{@code END_TXN} → {@link #endTransaction}</li>
* </ul>
*
Expand All @@ -70,18 +79,96 @@
* {@code (segment, subscription)} pair. The fan-out is metadata-store writes (not RPCs) and
* is bounded by the txn's participant count.
*
* <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — those land in
* P5.2.
* <p>Background sweeps: a single elected broker — the owner of partition 0 of
* {@code transaction_coordinator_assign} — periodically (a) aborts timed-out open transactions
* ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions whose retention has
* elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still safe — every state
* transition is a header CAS — so the single-sweeper election is an efficiency measure, not a
* correctness one.
*/
@CustomLog
public class TransactionCoordinatorV5 {

private final PulsarService pulsar;
private final TxnMetadataStore txnStore;

private final long timeoutSweepIntervalMs;
private final long gcSweepIntervalMs;
private final long gcRetentionMs;
private volatile ScheduledExecutorService sweepExecutor;
private volatile boolean closed;
private final AtomicBoolean timeoutSweepRunning = new AtomicBoolean(false);
private final AtomicBoolean gcSweepRunning = new AtomicBoolean(false);

public TransactionCoordinatorV5(PulsarService pulsar) {
this.pulsar = pulsar;
this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
var config = pulsar.getConfiguration();
this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
config.getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
this.gcRetentionMs = TimeUnit.SECONDS.toMillis(
config.getTransactionCoordinatorScalableTopicsGcRetentionSeconds());
}

// ---- Lifecycle --------------------------------------------------------

/**
* Start the periodic timeout / GC sweeps on a dedicated single-thread scheduler. Each tick is
* gated by {@link #ifElectedSweeper} so only the partition-0 owner does the scan. Idempotent —
* a second call is ignored.
*/
public synchronized void start() {
if (closed || sweepExecutor != null) {
return;
}
sweepExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-txn-v5-sweep"));
sweepExecutor.scheduleWithFixedDelay(
() -> runSweep("timeout", timeoutSweepRunning, this::sweepTimeouts),
timeoutSweepIntervalMs, timeoutSweepIntervalMs, TimeUnit.MILLISECONDS);
sweepExecutor.scheduleWithFixedDelay(
() -> runSweep("gc", gcSweepRunning, this::sweepGc),
gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
}

/** Stop the sweeps. Idempotent. */
public synchronized void close() {
closed = true;
if (sweepExecutor != null) {
sweepExecutor.shutdownNow();
sweepExecutor = null;
}
}

/**
* Run one sweep cycle on the scheduler thread and block until it completes, so the
* fixed-delay scheduling never overlaps two cycles. The {@code running} flag is a
* defense-in-depth guard: the single-thread scheduler plus the blocking {@code get()} already
* serialise cycles, but the flag makes overlap impossible even if the scheduling were later
* changed (e.g. to a fixed-rate or multi-threaded executor). Errors are logged and swallowed —
* the next tick retries.
*/
private void runSweep(String name, AtomicBoolean running, Supplier<CompletableFuture<Void>> sweep) {
if (closed || !running.compareAndSet(false, true)) {
return;
}
try {
sweep.get().get();
} catch (InterruptedException ie) {
// shutdownNow() interrupted the sweep thread mid-wait — restore the flag and exit
// quietly; this is the expected shutdown signal, not a failure.
Thread.currentThread().interrupt();
} catch (Throwable t) {
if (closed) {
// close() raced with an in-flight async chain; not worth a WARN.
return;
}
log.warn().attr("sweep", name).exception(t).log("v5 TC sweep cycle failed; will retry");
} finally {
running.set(false);
}
Comment thread
merlimat marked this conversation as resolved.
}

// ---- TC client connect ------------------------------------------------
Expand Down Expand Up @@ -232,19 +319,163 @@ public void onCompleted() {
}
}).thenCompose(__ -> {
TxnEvent event = new TxnEvent(txnIdKey, decision);
CompletableFuture<?>[] publishes = new CompletableFuture<?>[
writeSegments.size() + ackParticipants.size()];
int i = 0;
List<CompletableFuture<Void>> publishes = new ArrayList<>(
writeSegments.size() + ackParticipants.size());
for (String segment : writeSegments) {
publishes[i++] = txnStore.publishSegmentEvent(segment, event);
publishes.add(txnStore.publishSegmentEvent(segment, event).thenApply(s -> null));
}
for (AckParticipant p : ackParticipants) {
publishes[i++] = txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
publishes.add(txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event)
.thenApply(s -> null));
}
return CompletableFuture.allOf(publishes);
return FutureUtil.waitForAll(publishes);
});
}

// ---- Sweeps -----------------------------------------------------------

/**
* Abort transactions whose deadline has passed. Scans the by-deadline index up to "now" and
* drives each through {@link #endTransaction} with {@code ABORT}, which re-reads and CAS-guards
* the header — so a txn the client commits in the same window is left alone (the CAS loses and
* the resulting InvalidTxnStatus / BadVersion is treated as a benign race).
*/
CompletableFuture<Void> sweepTimeouts() {
return ifElectedSweeper(() -> {
long now = System.currentTimeMillis();
List<TxnID> expired = Collections.synchronizedList(new ArrayList<>());
return txnStore.listOpenByDeadlineRange(null, now, new ScanConsumer() {
@Override
public void onNext(GetResult r) {
String txnIdKey = TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
if (txnIdKey != null) {
expired.add(TxnIds.fromKey(txnIdKey));
}
}

@Override
public void onError(Throwable throwable) {
log.warn().exception(throwable).log("Timeout-sweep deadline scan errored");
}

@Override
public void onCompleted() {
}
}).thenCompose(__ -> {
List<CompletableFuture<Void>> aborts = new ArrayList<>(expired.size());
for (TxnID txnId : expired) {
aborts.add(endTransaction(txnId, TxnAction.ABORT_VALUE)
.exceptionally(ex -> {
// Benign: the client may have committed/aborted it between the scan
// and our CAS, or another sweeper got there first.
log.debug().attr("txnId", txnId).exception(ex)
.log("Timeout-sweep abort skipped");
return null;
}));
}
return FutureUtil.waitForAll(aborts);
});
});
}

/**
* Garbage-collect finalized transactions whose retention window has elapsed. For each terminal
* state, scans the by-final-state index up to {@code now - retention} and applies
* {@link #gcOneTxn}.
*/
CompletableFuture<Void> sweepGc() {
return ifElectedSweeper(() -> {
long cutoff = System.currentTimeMillis() - gcRetentionMs;
return gcFinalized(TxnState.COMMITTED, cutoff)
.thenCompose(__ -> gcFinalized(TxnState.ABORTED, cutoff));
});
}

private CompletableFuture<Void> gcFinalized(TxnState state, long cutoffMs) {
List<Versioned<String>> candidates = Collections.synchronizedList(new ArrayList<>());
return txnStore.listFinalizedByStateAndTimeRange(state, null, cutoffMs, new ScanConsumer() {
@Override
public void onNext(GetResult r) {
String txnIdKey = TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
if (txnIdKey != null) {
candidates.add(new Versioned<>(txnIdKey, r.getStat().getVersion()));
}
}

@Override
public void onError(Throwable throwable) {
log.warn().attr("state", state).exception(throwable).log("GC-sweep scan errored");
}

@Override
public void onCompleted() {
}
}).thenCompose(__ -> {
List<CompletableFuture<Void>> gcs = new ArrayList<>(candidates.size());
for (Versioned<String> c : candidates) {
gcs.add(gcOneTxn(c.value(), c.version(), state));
}
return FutureUtil.waitForAll(gcs);
});
}

/**
* GC one finalized txn. If it still has {@code /txn/op} records, some participant hasn't applied
* the outcome yet — or never received the event (e.g. the TC crashed between the header CAS and
* the fan-out). Re-drive the fan-out and leave the header in place so the participant re-reads
* the true outcome; it removes its op records once it applies them, and a later GC pass — seeing
* no op records — deletes the header. We never delete a header while a participant might still
* re-read it, so a committed txn's data is never stranded as "unknown".
*/
private CompletableFuture<Void> gcOneTxn(String txnIdKey, long version, TxnState state) {
TxnID txnId = TxnIds.fromKey(txnIdKey);
boolean[] hasOps = {false};
return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
@Override
public void onNext(GetResult r) {
hasOps[0] = true;
}

@Override
public void onError(Throwable throwable) {
// Treat a scan error as "ops may exist" — safer to retry the repair than to delete.
hasOps[0] = true;
log.warn().attr("txnId", txnId).exception(throwable).log("GC-sweep op scan errored");
}

@Override
public void onCompleted() {
}
}).thenCompose(__ -> {
if (hasOps[0]) {
return fanOutEvents(txnId, txnIdKey, state);
}
return txnStore.deleteHeader(txnIdKey, version).exceptionally(ex -> {
// Benign: header changed or was already deleted since the scan.
log.debug().attr("txnId", txnId).exception(ex).log("GC-sweep header delete skipped");
return null;
});
});
}

/**
* Run {@code action} only on the elected sweeper — the broker that owns partition 0 of
* {@code transaction_coordinator_assign}. Not owning it (or any error checking ownership) means
* "skip this cycle". Correctness doesn't depend on the election: every transition is a header
* CAS, so a stale owner sweeping concurrently is harmless.
*/
private CompletableFuture<Void> ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {
Comment thread
merlimat marked this conversation as resolved.
if (closed) {
return CompletableFuture.completedFuture(null);
}
String assignPartition0 = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
.getPartition(0).toString();
return pulsar.getBrokerService().checkTopicNsOwnership(assignPartition0)
.handle((v, ex) -> ex == null)
.thenCompose(owned -> (owned && !closed)
? action.get() : CompletableFuture.completedFuture(null));
}

/** A {@code (segment, subscription)} ack participant; keys the ack fan-out de-dup set. */
private record AckParticipant(String segment, String subscription) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,19 @@ public static String txnIdFromOpPath(String opPath) {
return name.substring(0, dash);
}

/**
* Extract the {@code txnId} key from a header path under {@link #TXN_HEADER_PREFIX}. The layout
* is {@code /txn/id/<txnId>}, so the txnId key is the trailing path component.
*
* @return the txnId key, or {@code null} if {@code headerPath} doesn't have the expected shape
*/
public static String txnIdFromHeaderPath(String headerPath) {
int lastSlash = headerPath.lastIndexOf('/');
if (lastSlash < 0 || lastSlash == headerPath.length() - 1) {
return null;
}
return headerPath.substring(lastSlash + 1);
}

private TxnPaths() {}
}
Loading
Loading