Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,17 @@ transactionBufferClientMaxConcurrentRequests=1000
# The max active transactions per transaction coordinator, default value 0 indicates no limit.
maxActiveTransactionsPerCoordinator=0

# How long to retain final transaction statuses in memory after the active transaction metadata is removed.
# The retained status is only used to return precise errors for duplicate end transaction requests.
# Setting this to 0 disables retention. A negative value means the retention time dimension is not limited.
# This and transactionEndedStatusMaxRecordCount cannot both be negative.
transactionEndedStatusRetentionTimeMs=3600000

# The maximum number of final transaction statuses retained in memory.
# Setting this to 0 disables retention. A negative value means the record count dimension is not limited.
# This and transactionEndedStatusRetentionTimeMs cannot both be negative.
transactionEndedStatusMaxRecordCount=100000

# MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
# It stores the position in pendingAckStore as its value and saves a position used to determine
# whether the previous data can be cleaned up as a key.
Expand Down
11 changes: 11 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,17 @@ transactionBufferSnapshotSegmentSize=262144
# The transaction buffer client's operation timeout in milliseconds.
transactionBufferClientOperationTimeoutInMills=3000

# How long to retain final transaction statuses in memory after the active transaction metadata is removed.
# The retained status is only used to return precise errors for duplicate end transaction requests.
# Setting this to 0 disables retention. A negative value means the retention time dimension is not limited.
# This and transactionEndedStatusMaxRecordCount cannot both be negative.
transactionEndedStatusRetentionTimeMs=3600000

# The maximum number of final transaction statuses retained in memory.
# Setting this to 0 disables retention. A negative value means the record count dimension is not limited.
# This and transactionEndedStatusRetentionTimeMs cannot both be negative.
transactionEndedStatusMaxRecordCount=100000

# Provide a mechanism allowing the Transaction Log Store to aggregate multiple records into a batched record and
# persist into a single BK entry. This will make Pulsar transactions work more efficiently, aka batched log.
# see: https://github.com/apache/pulsar/issues/15370
Expand Down
140 changes: 140 additions & 0 deletions pip/pip-481.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# PIP-481: Retain Ended Transaction Status for Precise Error Reporting

# Background knowledge

Pulsar Transaction Coordinator (TC) manages the lifecycle of transactions. Each transaction transitions through states: OPEN -> COMMITTING/ABORTING -> COMMITTED/ABORTED. Once a transaction reaches a final state (COMMITTED or ABORTED), its in-memory metadata (`TxnMeta`) and transaction logs are cleaned up. After cleanup, any query for that transaction's metadata returns a `TransactionNotFoundException`, making it impossible for the caller to determine whether the transaction never existed or had already finished with a known outcome.

In real production scenarios, clients often encounter "commit transaction" request timeouts. In this case, the client cannot determine whether the transaction was actually committed successfully — the request may have reached the TC but the response was lost before being returned. Without this feature, the client can only retry the commit, but receives a `TransactionNotFoundException` and still cannot determine the actual result.

Additionally, **Flink-connector-Pulsar** has a high dependency on this feature. When a Flink checkpoint partially succeeds (some operators committed the transaction while others failed), Flink will try to re-commit the already-completed transaction. This feature can clearly inform Flink that the transaction has already been committed successfully during the retry, thus avoiding data duplication or loss.

# Motivation

When a producer/consumer or admin operation queries a transaction's status after it has ended, the TC only throws a generic `TransactionNotFoundException`. This forces clients to rely on timeouts or heuristic retries. For example:

1. A commit request succeeds on the TC, but the response is lost due to network issues.
2. The client retries the commit operation.
3. The TC no longer holds the transaction metadata and returns `TransactionNotFoundException`.
4. The client cannot tell whether the commit actually succeeded.

This behavior leads to uncertainty and forces clients to adopt overly conservative retry logic. Retaining the ended transaction status within a configurable time window allows the TC to return `TransactionAlreadyCommittedException` (or `TransactionAlreadyAbortedException`, `TransactionAlreadyTimedOutException`), providing unambiguous feedback to the client.

# Goals

## In Scope

- Retain the final status (COMMITTED, ABORTED, TIMEOUT) in memory after a transaction is removed from the active store.
- Introduce three new exception subtypes for precise error reporting: the three final statuses (COMMITTED, ABORTED, TIMEOUT) will each correspond to a different error code returned to the client.
- Provide configuration knobs for retention time and maximum cached record count.
- Persist the `isTimeout` flag in the transaction log, allowing users to determine "whether the client was too slow" when investigating failed transactions.

## Out of Scope

N/A

# High Level Design

When a transaction reaches its final state, instead of deleting all related transaction logs as before, the new behavior is:
- **Immediately delete**: transaction logs representing the "start" and "intermediate" states of the transaction.
- **Retain**: the transaction log representing the "end" state (the COMMITTED/ABORTED entry).

This end log is retained until it reaches the `transactionEndedStatusRetentionTimeMs` or `transactionEndedStatusMaxRecordCount` limit, at which point it is deleted. The transaction itself is removed from the active `txnMetaMap`, and its final status is written to the `EndedTxnStatusCache`.

The benefit of this approach is that when the TC restarts, it can fully reconstruct the in-memory ended transaction state from before the restart by replaying the transaction logs, because the end logs still exist.

When a query arrives and the transaction is not found in the active store, the cache is checked. If found, a specific exception (`TransactionAlreadyCommittedException`, etc.) is thrown. If not found, the original `TransactionNotFoundException` is returned.

# Detailed Design

## Design & Implementation Details

**`EndedTxnStatus` enum** (`coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java`):
- `COMMITTED`, `ABORTED`, `TIMEOUT`

**`EndedTxnStatusCache`** (`coordinator/impl/EndedTxnStatusCache.java`):
- Wraps a Caffeine `Cache<TxnID, EndedTxnMetadata>`.
- `EndedTxnMetadata` is a record: `(TxnID txnID, EndedTxnStatus status, Position logPosition, long endedAtMs)`.
- On creation:
- If `retentionTimeMs` or `maxRecordCount` is 0, the cache is disabled (all queries return null).
- If both `retentionTimeMs` and `maxRecordCount` are negative, an `IllegalArgumentException` is thrown (would cause unbounded growth -> OOM).
- TTL-based expiry uses `Expiry.expireAfterCreate` to compute remaining time based on `endedAtMs + retentionTimeMs - now`.
- Optional `removalListener` callback: when a cache entry is evicted due to reaching the `retentionTimeMs` or `maxRecordCount` limit, the corresponding end transaction log is deleted. This ensures the end log is retained for a configurable period, allowing the TC to recover ended transaction state through replay after a restart.

**`TransactionMetadataStoreConfig`** (`coordinator/TransactionMetadataStoreConfig.java`):
- Internal configuration class (not publicly exposed), containing two new config fields: `transactionEndedStatusRetentionTimeMs` and `transactionEndedStatusMaxRecordCount`.
- Default retention time: 1 hour. Default max record count: 100,000.
- Validation: retention and maxRecordCount cannot both be negative simultaneously.

**Changes in `MLTransactionMetadataStore`:**

- **During recovery (replay)**: when encountering a record with status COMMITTED or ABORTED, record it in the `EndedTxnStatusCache`.
- **During normal operation (endTransaction)**: after appending the end log, `positionsToDelete` is taken from the previously accumulated list of old log positions (not including the current end log entry). Call `endedTxnStatusCache.record(...)`:
- If it returns true, the `EndedTxnStatusCache` has recorded the transaction's final status, and the end transaction log will be deleted by the `EndedTxnStatusCache` (when the cache entry expires or is evicted).
- If it returns false, the cache has not recorded this transaction, and the end transaction log must be deleted immediately.
- **Comparison with the old logic**: the old code deleted all logs at once at the end of a transaction (start + intermediate + end). The new code only deletes the start and intermediate logs, while the end log is retained and timely deleted according to the retention policy.

**Protobuf change (`PulsarTransactionMetadata.proto`)**:
- Added `optional bool is_timeout = 13 [default = false]` to `TransactionMetadataEntry`.
- Used to accurately distinguish whether a transaction was aborted due to timeout (TIMEOUT) or explicitly aborted by the client (ABORTED) during TC restart replay. This field also provides more observable information, allowing users to easily investigate the proportion of "transaction commit timeouts".

**New exception types** (`coordinator/exceptions/CoordinatorException.java`):
- `TransactionAlreadyCommittedException(TxnID)`
- `TransactionAlreadyAbortedException(TxnID)`
- `TransactionAlreadyTimedOutException(TxnID)`
- On the coordinator side, these exceptions extend `TransactionNotFoundException`. On the client side, they are mapped to dedicated `TransactionCoordinatorClientException` types. Backward compatibility for old clients is maintained by the feature flag fallback described below.

## Public-facing Changes

### Configuration

Two new fields added to `ServiceConfiguration`, `broker.conf`, and `standalone.conf`:

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `transactionEndedStatusRetentionTimeMs` | long | 3600000 (1 hour) | How long to retain ended transaction status in memory after the transaction ends. Set to 0 to disable. |
| `transactionEndedStatusMaxRecordCount` | long | 100000 | Maximum number of ended transaction status records to retain. Set to 0 to disable. |

### Feature Flag

Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility.

# Monitoring

- A sudden increase in `TransactionAlreadyCommittedException` / `TransactionAlreadyAbortedException` / `TransactionAlreadyTimedOutException` suggests clients are frequently retrying requests to already-finished transactions. This may indicate network issues or overly aggressive retry policies.
- Conversely, a high rate of `TransactionNotFoundException` for recently-ended transactions may indicate that the retention time or max record count are set too low.

# Security Considerations

No new security concerns. The existing authorization model still applies: a client must have the appropriate transaction coordinator permission to execute operations.

# Backward & Forward Compatibility

## Upgrade

- New configuration fields default to 1 hour / 100,000, which is safe for existing deployments.
- Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility.
- The new protobuf field `is_timeout` is optional and defaults to `false`, so old transaction logs remain readable.
- No changes to on-disk data format or broker-to-broker protocol.

## Downgrade / Rollback

- Roll back to the previous version. The new `EndedTxnStatusCache` will no longer exist, and all queries for ended transactions will return `TransactionNotFoundException`.
- The end transaction logs retained by the new version will be treated as uncleaned logs by the old version. The old version will replay and handle these logs normally during startup (the `is_timeout` field is ignored by the old version).
- After rollback, the old Broker will not recognize the `supportsTransactionEndStatusErrors` flag and will not return new error codes, maintaining compatibility with old clients.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

Geo-replication for transactions is not currently supported. No special considerations.

# Alternatives

N/A

# General Notes

N/A

# Links

* Mailing List discussion thread:
* Mailing List voting thread:
Original file line number Diff line number Diff line change
Expand Up @@ -3887,6 +3887,22 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private long maxActiveTransactionsPerCoordinator = 0L;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "How long to retain final transaction statuses in memory after the active transaction metadata is "
+ "removed. The retained status is only used to return precise errors for duplicate end "
+ "transaction requests. Setting this to 0 disables retention. A negative value means the "
+ "retention time dimension is not limited."
)
private long transactionEndedStatusRetentionTimeMs = TimeUnit.HOURS.toMillis(1);

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "The maximum number of final transaction statuses retained in memory. Setting this to 0 disables "
+ "retention. A negative value means the record count dimension is not limited."
)
private long transactionEndedStatusMaxRecordCount = 100_000L;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
Expand Down Expand Up @@ -363,6 +364,8 @@ public PulsarService(ServiceConfiguration config,
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
TransactionMetadataStoreConfig.validateEndedStatusConfig(config.getTransactionEndedStatusRetentionTimeMs(),
config.getTransactionEndedStatusMaxRecordCount());
this.config = config;
this.validateCustomMetricLabelKeys(config.getAllowedTopicPropertyKeysForMetrics());
this.clock = Clock.systemUTC();
Expand Down
Loading
Loading