diff --git a/conf/broker.conf b/conf/broker.conf index 8ed01cc865613..07138b2fd959e 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -2095,6 +2095,17 @@ transactionBufferClientMaxConcurrentRequests=1000 # The max active transactions per transaction coordinator, default value 0 indicates no limit. maxActiveTransactionsPerCoordinator=0 +# How long to retain final transaction statuses in memory after the active transaction metadata is removed. +# The retained status is only used to return precise errors for duplicate end transaction requests. +# Setting this to 0 disables retention. A negative value means the retention time dimension is not limited. +# This and transactionEndedStatusMaxRecordCount cannot both be negative. +transactionEndedStatusRetentionTimeMs=3600000 + +# The maximum number of final transaction statuses retained in memory. +# Setting this to 0 disables retention. A negative value means the record count dimension is not limited. +# This and transactionEndedStatusRetentionTimeMs cannot both be negative. +transactionEndedStatusMaxRecordCount=100000 + # MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex, # It stores the position in pendingAckStore as its value and saves a position used to determine # whether the previous data can be cleaned up as a key. diff --git a/conf/standalone.conf b/conf/standalone.conf index 38c5d1a5b890e..5d52d161ec94f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1385,6 +1385,17 @@ transactionBufferSnapshotSegmentSize=262144 # The transaction buffer client's operation timeout in milliseconds. transactionBufferClientOperationTimeoutInMills=3000 +# How long to retain final transaction statuses in memory after the active transaction metadata is removed. +# The retained status is only used to return precise errors for duplicate end transaction requests. +# Setting this to 0 disables retention. A negative value means the retention time dimension is not limited. +# This and transactionEndedStatusMaxRecordCount cannot both be negative. +transactionEndedStatusRetentionTimeMs=3600000 + +# The maximum number of final transaction statuses retained in memory. +# Setting this to 0 disables retention. A negative value means the record count dimension is not limited. +# This and transactionEndedStatusRetentionTimeMs cannot both be negative. +transactionEndedStatusMaxRecordCount=100000 + # Provide a mechanism allowing the Transaction Log Store to aggregate multiple records into a batched record and # persist into a single BK entry. This will make Pulsar transactions work more efficiently, aka batched log. # see: https://github.com/apache/pulsar/issues/15370 diff --git a/pip/pip-481.md b/pip/pip-481.md new file mode 100644 index 0000000000000..7815cdfc54242 --- /dev/null +++ b/pip/pip-481.md @@ -0,0 +1,140 @@ +# PIP-481: Retain Ended Transaction Status for Precise Error Reporting + +# Background knowledge + +Pulsar Transaction Coordinator (TC) manages the lifecycle of transactions. Each transaction transitions through states: OPEN -> COMMITTING/ABORTING -> COMMITTED/ABORTED. Once a transaction reaches a final state (COMMITTED or ABORTED), its in-memory metadata (`TxnMeta`) and transaction logs are cleaned up. After cleanup, any query for that transaction's metadata returns a `TransactionNotFoundException`, making it impossible for the caller to determine whether the transaction never existed or had already finished with a known outcome. + +In real production scenarios, clients often encounter "commit transaction" request timeouts. In this case, the client cannot determine whether the transaction was actually committed successfully — the request may have reached the TC but the response was lost before being returned. Without this feature, the client can only retry the commit, but receives a `TransactionNotFoundException` and still cannot determine the actual result. + +Additionally, **Flink-connector-Pulsar** has a high dependency on this feature. When a Flink checkpoint partially succeeds (some operators committed the transaction while others failed), Flink will try to re-commit the already-completed transaction. This feature can clearly inform Flink that the transaction has already been committed successfully during the retry, thus avoiding data duplication or loss. + +# Motivation + +When a producer/consumer or admin operation queries a transaction's status after it has ended, the TC only throws a generic `TransactionNotFoundException`. This forces clients to rely on timeouts or heuristic retries. For example: + +1. A commit request succeeds on the TC, but the response is lost due to network issues. +2. The client retries the commit operation. +3. The TC no longer holds the transaction metadata and returns `TransactionNotFoundException`. +4. The client cannot tell whether the commit actually succeeded. + +This behavior leads to uncertainty and forces clients to adopt overly conservative retry logic. Retaining the ended transaction status within a configurable time window allows the TC to return `TransactionAlreadyCommittedException` (or `TransactionAlreadyAbortedException`, `TransactionAlreadyTimedOutException`), providing unambiguous feedback to the client. + +# Goals + +## In Scope + +- Retain the final status (COMMITTED, ABORTED, TIMEOUT) in memory after a transaction is removed from the active store. +- Introduce three new exception subtypes for precise error reporting: the three final statuses (COMMITTED, ABORTED, TIMEOUT) will each correspond to a different error code returned to the client. +- Provide configuration knobs for retention time and maximum cached record count. +- Persist the `isTimeout` flag in the transaction log, allowing users to determine "whether the client was too slow" when investigating failed transactions. + +## Out of Scope + +N/A + +# High Level Design + +When a transaction reaches its final state, instead of deleting all related transaction logs as before, the new behavior is: +- **Immediately delete**: transaction logs representing the "start" and "intermediate" states of the transaction. +- **Retain**: the transaction log representing the "end" state (the COMMITTED/ABORTED entry). + +This end log is retained until it reaches the `transactionEndedStatusRetentionTimeMs` or `transactionEndedStatusMaxRecordCount` limit, at which point it is deleted. The transaction itself is removed from the active `txnMetaMap`, and its final status is written to the `EndedTxnStatusCache`. + +The benefit of this approach is that when the TC restarts, it can fully reconstruct the in-memory ended transaction state from before the restart by replaying the transaction logs, because the end logs still exist. + +When a query arrives and the transaction is not found in the active store, the cache is checked. If found, a specific exception (`TransactionAlreadyCommittedException`, etc.) is thrown. If not found, the original `TransactionNotFoundException` is returned. + +# Detailed Design + +## Design & Implementation Details + +**`EndedTxnStatus` enum** (`coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java`): +- `COMMITTED`, `ABORTED`, `TIMEOUT` + +**`EndedTxnStatusCache`** (`coordinator/impl/EndedTxnStatusCache.java`): +- Wraps a Caffeine `Cache`. +- `EndedTxnMetadata` is a record: `(TxnID txnID, EndedTxnStatus status, Position logPosition, long endedAtMs)`. +- On creation: + - If `retentionTimeMs` or `maxRecordCount` is 0, the cache is disabled (all queries return null). + - If both `retentionTimeMs` and `maxRecordCount` are negative, an `IllegalArgumentException` is thrown (would cause unbounded growth -> OOM). +- TTL-based expiry uses `Expiry.expireAfterCreate` to compute remaining time based on `endedAtMs + retentionTimeMs - now`. +- Optional `removalListener` callback: when a cache entry is evicted due to reaching the `retentionTimeMs` or `maxRecordCount` limit, the corresponding end transaction log is deleted. This ensures the end log is retained for a configurable period, allowing the TC to recover ended transaction state through replay after a restart. + +**`TransactionMetadataStoreConfig`** (`coordinator/TransactionMetadataStoreConfig.java`): +- Internal configuration class (not publicly exposed), containing two new config fields: `transactionEndedStatusRetentionTimeMs` and `transactionEndedStatusMaxRecordCount`. +- Default retention time: 1 hour. Default max record count: 100,000. +- Validation: retention and maxRecordCount cannot both be negative simultaneously. + +**Changes in `MLTransactionMetadataStore`:** + +- **During recovery (replay)**: when encountering a record with status COMMITTED or ABORTED, record it in the `EndedTxnStatusCache`. +- **During normal operation (endTransaction)**: after appending the end log, `positionsToDelete` is taken from the previously accumulated list of old log positions (not including the current end log entry). Call `endedTxnStatusCache.record(...)`: + - If it returns true, the `EndedTxnStatusCache` has recorded the transaction's final status, and the end transaction log will be deleted by the `EndedTxnStatusCache` (when the cache entry expires or is evicted). + - If it returns false, the cache has not recorded this transaction, and the end transaction log must be deleted immediately. +- **Comparison with the old logic**: the old code deleted all logs at once at the end of a transaction (start + intermediate + end). The new code only deletes the start and intermediate logs, while the end log is retained and timely deleted according to the retention policy. + +**Protobuf change (`PulsarTransactionMetadata.proto`)**: +- Added `optional bool is_timeout = 13 [default = false]` to `TransactionMetadataEntry`. +- Used to accurately distinguish whether a transaction was aborted due to timeout (TIMEOUT) or explicitly aborted by the client (ABORTED) during TC restart replay. This field also provides more observable information, allowing users to easily investigate the proportion of "transaction commit timeouts". + +**New exception types** (`coordinator/exceptions/CoordinatorException.java`): +- `TransactionAlreadyCommittedException(TxnID)` +- `TransactionAlreadyAbortedException(TxnID)` +- `TransactionAlreadyTimedOutException(TxnID)` +- On the coordinator side, these exceptions extend `TransactionNotFoundException`. On the client side, they are mapped to dedicated `TransactionCoordinatorClientException` types. Backward compatibility for old clients is maintained by the feature flag fallback described below. + +## Public-facing Changes + +### Configuration + +Two new fields added to `ServiceConfiguration`, `broker.conf`, and `standalone.conf`: + +| Name | Type | Default | Description | +|------|------|---------|-------------| +| `transactionEndedStatusRetentionTimeMs` | long | 3600000 (1 hour) | How long to retain ended transaction status in memory after the transaction ends. Set to 0 to disable. | +| `transactionEndedStatusMaxRecordCount` | long | 100000 | Maximum number of ended transaction status records to retain. Set to 0 to disable. | + +### Feature Flag + +Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility. + +# Monitoring + +- A sudden increase in `TransactionAlreadyCommittedException` / `TransactionAlreadyAbortedException` / `TransactionAlreadyTimedOutException` suggests clients are frequently retrying requests to already-finished transactions. This may indicate network issues or overly aggressive retry policies. +- Conversely, a high rate of `TransactionNotFoundException` for recently-ended transactions may indicate that the retention time or max record count are set too low. + +# Security Considerations + +No new security concerns. The existing authorization model still applies: a client must have the appropriate transaction coordinator permission to execute operations. + +# Backward & Forward Compatibility + +## Upgrade + +- New configuration fields default to 1 hour / 100,000, which is safe for existing deployments. +- Added `FeatureFlags.supportsTransactionEndStatusErrors`: the client declares whether it supports the new error codes during the Connect handshake. If the client does not support them, the Broker converts the new exceptions to `TransactionNotFoundException` to ensure compatibility. +- The new protobuf field `is_timeout` is optional and defaults to `false`, so old transaction logs remain readable. +- No changes to on-disk data format or broker-to-broker protocol. + +## Downgrade / Rollback + +- Roll back to the previous version. The new `EndedTxnStatusCache` will no longer exist, and all queries for ended transactions will return `TransactionNotFoundException`. +- The end transaction logs retained by the new version will be treated as uncleaned logs by the old version. The old version will replay and handle these logs normally during startup (the `is_timeout` field is ignored by the old version). +- After rollback, the old Broker will not recognize the `supportsTransactionEndStatusErrors` flag and will not return new error codes, maintaining compatibility with old clients. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +Geo-replication for transactions is not currently supported. No special considerations. + +# Alternatives + +N/A + +# General Notes + +N/A + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e5ecea2c44b7a..23e789cde5708 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3887,6 +3887,22 @@ public double getLoadBalancerBandwidthOutResourceWeight() { ) private long maxActiveTransactionsPerCoordinator = 0L; + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "How long to retain final transaction statuses in memory after the active transaction metadata is " + + "removed. The retained status is only used to return precise errors for duplicate end " + + "transaction requests. Setting this to 0 disables retention. A negative value means the " + + "retention time dimension is not limited." + ) + private long transactionEndedStatusRetentionTimeMs = TimeUnit.HOURS.toMillis(1); + + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "The maximum number of final transaction statuses retained in memory. Setting this to 0 disables " + + "retention. A negative value means the record count dimension is not limited." + ) + private long transactionEndedStatusMaxRecordCount = 100_000L; + @FieldContext( category = CATEGORY_TRANSACTION, doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`," diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1af1d3a39f5e3..82612eb73ad83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -199,6 +199,7 @@ import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration; import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; @@ -363,6 +364,8 @@ public PulsarService(ServiceConfiguration config, // Validate correctness of configuration PulsarConfigurationLoader.isComplete(config); TransactionBatchedWriteValidator.validate(config); + TransactionMetadataStoreConfig.validateEndedStatusConfig(config.getTransactionEndedStatusRetentionTimeMs(), + config.getTransactionEndedStatusMaxRecordCount()); this.config = config; this.validateCustomMetricLabelKeys(config.getAllowedTopicPropertyKeysForMetrics()); this.clock = Clock.systemUTC(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 6ee730f948e51..9fb3c22efdf16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -57,8 +57,10 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.transaction.coordinator.EndedTxnStatus; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; @@ -67,7 +69,11 @@ import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionAlreadyAbortedException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionAlreadyCommittedException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionAlreadyTimedOutException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; @@ -230,7 +236,11 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc pulsarService.getManagedLedgerStorage().getManagedLedgerStorageClass(v.getStorageClassName()) .get().getManagedLedgerFactory(), v, timeoutTracker, recoverTracker, - pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig, + new TransactionMetadataStoreConfig( + serviceConfiguration.getMaxActiveTransactionsPerCoordinator(), + serviceConfiguration.getTransactionEndedStatusRetentionTimeMs(), + serviceConfiguration.getTransactionEndedStatusMaxRecordCount()), + txnLogBufferedWriterConfig, brokerClientSharedTimer)); } @@ -340,10 +350,10 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, .thenCompose(txnMeta -> { if (txnMeta.status() == TxnStatus.OPEN) { return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout) - .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction, isTimeout)); } return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus) - .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction, isTimeout)); }).whenComplete((__, ex)-> { if (ex == null) { future.complete(null); @@ -414,7 +424,7 @@ public void endTransactionForTimeout(TxnID txnID) { }); } - private CompletableFuture endTxnInTransactionBuffer(TxnID txnID, int txnAction) { + private CompletableFuture endTxnInTransactionBuffer(TxnID txnID, int txnAction, boolean isTimeout) { return getTxnMeta(txnID) .thenCompose(txnMeta -> { long lowWaterMark = getLowWaterMark(txnID); @@ -449,7 +459,7 @@ private CompletableFuture endTxnInTransactionBuffer(TxnID txnID, int txnAc }); return FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream) .collect(Collectors.toList())) - .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction)); + .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction, isTimeout)); }); } @@ -465,16 +475,33 @@ private static boolean isRetryableException(Throwable ex) { && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException); } - private CompletableFuture endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) { + private CompletableFuture endTxnInTransactionMetadataStore(TxnID txnID, int txnAction, boolean isTimeout) { if (TxnAction.COMMIT.getValue() == txnAction) { return updateTxnStatus(txnID, TxnStatus.COMMITTED, COMMITTING, false); } else if (TxnAction.ABORT.getValue() == txnAction) { - return updateTxnStatus(txnID, TxnStatus.ABORTED, ABORTING, false); + return updateTxnStatus(txnID, TxnStatus.ABORTED, ABORTING, isTimeout); } else { return FutureUtil.failedFuture(new InvalidTxnStatusException("Unsupported txnAction " + txnAction)); } } + // TODO review tests, and to add more tests. + public TransactionNotFoundException getEndedTxnException(TxnID txnID) { + TransactionMetadataStore store = stores.get(getTcIdFromTxnId(txnID)); + if (store == null) { + return null; + } + EndedTxnStatus status = store.getEndedTxnStatus(txnID); + if (status == null) { + return null; + } + return switch (status) { + case COMMITTED -> new TransactionAlreadyCommittedException(txnID); + case ABORTED -> new TransactionAlreadyAbortedException(txnID); + case TIMEOUT -> new TransactionAlreadyTimedOutException(txnID); + }; + } + private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) { return new TransactionCoordinatorID(txnId.getMostSigBits()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 43a95cc86016f..d7896dabb3097 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -273,6 +273,12 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU return ServerError.ConsumerAssignError; } else if (t instanceof CoordinatorException.CoordinatorNotFoundException) { return ServerError.TransactionCoordinatorNotFound; + } else if (t instanceof CoordinatorException.TransactionAlreadyCommittedException) { + return ServerError.TransactionAlreadyCommitted; + } else if (t instanceof CoordinatorException.TransactionAlreadyAbortedException) { + return ServerError.TransactionAlreadyAborted; + } else if (t instanceof CoordinatorException.TransactionAlreadyTimedOutException) { + return ServerError.TransactionTimedOut; } else if (t instanceof CoordinatorException.InvalidTxnStatusException) { return ServerError.InvalidTxnStatus; } else if (t instanceof NotAllowedException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 11ce47e06dbcf..3bc111e7e1550 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1581,6 +1581,7 @@ protected void handleConnect(CommandConnect connect) { int clientProtocolVersion = connect.getProtocolVersion(); features = new FeatureFlags(); if (connect.hasFeatureFlags()) { + // TODO add test for new feature flag "supportsTransactionEndStatusErrors". features.copyFrom(connect.getFeatureFlags()); } @@ -3408,6 +3409,17 @@ private Throwable handleTxnException(Throwable ex, String op, long requestId) { return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage()); } + if (cause instanceof CoordinatorException.TransactionAlreadyCommittedException + || cause instanceof CoordinatorException.TransactionAlreadyAbortedException + || cause instanceof CoordinatorException.TransactionAlreadyTimedOutException) { + log.debug().attr("op", op).exception(cause).log("The transaction has already ended"); + if (supportsTransactionEndStatusErrors()) { + return cause; + } else { + // To guarantee compatibility of the old version client, do not throw new error to them. + return new CoordinatorException.TransactionNotFoundException(cause.getMessage()); + } + } log.error() .attr("op", op) .attr("requestId", requestId) @@ -3500,7 +3512,7 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) { // v5: TC doesn't need pre-registration — participants advertise themselves by writing // /txn/op records when they actually apply ops. Still verify ownership before acking, // matching the legacy authorization surface. - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> isOwner ? CompletableFuture.completedFuture(null) : failedFutureTxnNotOwned(txnID)) .whenComplete((v, ex) -> { @@ -3520,7 +3532,7 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnNotOwned(txnID); @@ -3579,7 +3591,7 @@ protected void handleEndTxn(CommandEndTxn command) { } if (service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()) { - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnNotOwned(txnID); @@ -3602,7 +3614,7 @@ protected void handleEndTxn(CommandEndTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnNotOwned(txnID); @@ -3641,25 +3653,47 @@ private CompletableFuture isSuperUser() { } } - private CompletableFuture verifyTxnOwnership(TxnID txnID) { + private CompletableFuture verifyTxnOwnershipAndTxnState(TxnID txnID) { assert ctx.executor().inEventLoop(); + TransactionMetadataStoreService txnMetadataService = service.pulsar().getTransactionMetadataStoreService(); CompletableFuture ownerCheck = service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled() ? service.pulsar().getTransactionCoordinatorV5() .verifyTxnOwnership(txnID, getPrincipal()) - : service.pulsar().getTransactionMetadataStoreService() - .verifyTxnOwnership(txnID, getPrincipal()); - return ownerCheck - .thenComposeAsync(isOwner -> { + : txnMetadataService.verifyTxnOwnership(txnID, getPrincipal()); + CompletableFuture future = new CompletableFuture<>(); + ownerCheck.whenCompleteAsync((isOwner, ex) -> { + if (ex == null) { if (isOwner) { - return CompletableFuture.completedFuture(true); + future.complete(true); } if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { - return isSuperUser(); + isSuperUser().whenCompleteAsync((isSuperUser, ex2) -> { + if (ex == null) { + future.complete(isSuperUser); + } else { + future.completeExceptionally(ex2); + } + }); } else { - return CompletableFuture.completedFuture(false); + future.complete(false); } - }, ctx.executor()); + return; + } + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof CoordinatorException.TransactionNotFoundException) { + CoordinatorException.TransactionNotFoundException notFoundException = + txnMetadataService.getEndedTxnException(txnID); + if (notFoundException != null) { + future.completeExceptionally(notFoundException); + } else { + future.completeExceptionally(cause); + } + } else { + future.completeExceptionally(cause); + } + }, ctx.executor()); + return future; } @Override @@ -3922,7 +3956,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { // v5: TC doesn't need pre-registration — participants advertise themselves by writing // /txn/op records when they actually apply ops. Still verify ownership before acking, // matching the legacy authorization surface. - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> isOwner ? CompletableFuture.completedFuture(null) : failedFutureTxnNotOwned(txnID)) .whenComplete((v, ex) -> { @@ -3943,7 +3977,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - verifyTxnOwnership(txnID) + verifyTxnOwnershipAndTxnState(txnID) .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnNotOwned(txnID); @@ -4394,6 +4428,10 @@ boolean supportsPartialProducer() { return features != null && features.isSupportsPartialProducer(); } + boolean supportsTransactionEndStatusErrors() { + return features != null && features.isSupportsTransactionEndStatusErrors(); + } + @Override public String getClientVersion() { return clientVersion; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 7de4c198f30df..a0786db7925d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -168,6 +168,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; @@ -3952,6 +3953,58 @@ public void sendEndTxnResponseFailed() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void sendEndTxnResponseFailedWithEndedStatusError() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.failedFuture( + new CoordinatorException.TransactionAlreadyCommittedException(new TxnID(12L, 1L)))); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + channel.writeInbound(Commands.newConnect("none", "", null)); + assertTrue(getResponse() instanceof CommandConnected); + ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, + TxnAction.COMMIT)); + channel.writeInbound(clientCommand); + CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError(), ServerError.TransactionAlreadyCommitted); + + channel.finish(); + } + + @Test(timeOut = 30000) + public void sendEndTxnResponseFailedWithEndedStatusErrorFallsBackWithoutFeatureFlag() throws Exception { + final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); + when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class))); + when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true)); + when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean())) + .thenReturn(CompletableFuture.failedFuture( + new CoordinatorException.TransactionAlreadyCommittedException(new TxnID(12L, 1L)))); + when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore); + svcConfig.setTransactionCoordinatorEnabled(true); + resetChannel(); + channel.writeInbound(newConnect(AuthMethod.AuthMethodNone, "", Commands.getCurrentProtocolVersion())); + assertTrue(getResponse() instanceof CommandConnected); + ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, + TxnAction.COMMIT)); + channel.writeInbound(clientCommand); + CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); + + assertEquals(response.getRequestId(), 89L); + assertEquals(response.getTxnidLeastBits(), 1L); + assertEquals(response.getTxnidMostBits(), 12L); + assertEquals(response.getError(), ServerError.TransactionNotFound); + + channel.finish(); + } + @Test(timeOut = 30000) public void sendEndTxnOnPartitionResponse() throws Exception { final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 9f3776ca7cb34..6a39f2dee7d8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -117,6 +117,50 @@ public void testNewTransaction() throws Exception { Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0); } + @Test + public void testDuplicateEndTransactionReturnsRetainedStatus() throws Exception { + TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); + transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)); + Awaitility.await().until(() -> + transactionMetadataStoreService.getStores().size() == 1); + checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(0))); + + TxnID committedTxnID = + transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get(); + transactionMetadataStoreService.endTransaction(committedTxnID, TxnAction.COMMIT.getValue(), false).get(); + try { + transactionMetadataStoreService.endTransaction(committedTxnID, TxnAction.ABORT.getValue(), false).get(); + fail("Duplicate end transaction should fail with the retained committed status"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CoordinatorException.TransactionAlreadyCommittedException); + } + + TxnID abortedTxnID = + transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get(); + transactionMetadataStoreService.endTransaction(abortedTxnID, TxnAction.ABORT.getValue(), false).get(); + try { + transactionMetadataStoreService.endTransaction(abortedTxnID, TxnAction.COMMIT.getValue(), false).get(); + fail("Duplicate end transaction should fail with the retained aborted status"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CoordinatorException.TransactionAlreadyAbortedException); + } + + TxnID timeoutTxnID = + transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get(); + transactionMetadataStoreService.endTransaction(timeoutTxnID, TxnAction.ABORT.getValue(), true).get(); + transactionMetadataStoreService.endTransaction(timeoutTxnID, TxnAction.ABORT.getValue(), false).get(); + try { + transactionMetadataStoreService.endTransaction(timeoutTxnID, TxnAction.COMMIT.getValue(), false).get(); + fail("Duplicate end transaction should fail with the retained timeout status"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CoordinatorException.TransactionAlreadyTimedOutException); + } + + transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)); + Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0); + } + @Test public void testAddProducedPartitionToTxn() throws Exception { TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 16467fabd1599..637fb252a4141 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -66,7 +66,9 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; -import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionAlreadyAbortedException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionAlreadyCommittedException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionTimedOutException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.internal.DefaultImplementation; @@ -644,6 +646,17 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // 1) txn abort txn.abort().get(); + Field field = TransactionImpl.class.getDeclaredField("state"); + field.setAccessible(true); + field.set(txn, TransactionImpl.State.OPEN); + try { + txn.abort().get(); + fail("reabort one transaction should be failed."); + } catch (Exception reAbortError) { + log.info("expected exception for reabort one transaction."); + Assert.assertNotNull(reAbortError); + Assert.assertTrue(reAbortError.getCause() instanceof TransactionAlreadyAbortedException); + } // after transaction abort, the messages could be received Transaction commitTxn = getTxn(); @@ -661,8 +674,6 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); Assert.assertNull(message); - Field field = TransactionImpl.class.getDeclaredField("state"); - field.setAccessible(true); field.set(commitTxn, TransactionImpl.State.OPEN); try { commitTxn.commit().get(); @@ -671,7 +682,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // recommit one transaction should be failed log.info("expected exception for recommit one transaction."); Assert.assertNotNull(reCommitError); - Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); + Assert.assertTrue(reCommitError.getCause() instanceof TransactionAlreadyCommittedException); } } @@ -918,7 +929,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri // recommit one transaction should be failed log.info("expected exception for recommit one transaction."); Assert.assertNotNull(reCommitError); - Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); + Assert.assertTrue(reCommitError.getCause() instanceof TransactionAlreadyCommittedException); } message = consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS); @@ -1133,7 +1144,7 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception { timeoutTxnSkipClientTimeout.commit().get(); fail(); } catch (Exception e) { - assertTrue(e.getCause() instanceof TransactionNotFoundException); + assertTrue(e.getCause() instanceof TransactionTimedOutException); } Field field = TransactionImpl.class.getDeclaredField("state"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index cc0087c0cc6c3..5ec8ef915dc4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -21,6 +21,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.beans.Introspector; @@ -38,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -45,6 +47,7 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.topics.TopicsPattern; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.testng.annotations.Test; @Test(groups = "broker-naming") @@ -275,6 +278,65 @@ public void testBookKeeperClientIoThreads() throws Exception { assertEquals(conf.getBookkeeperClientNumIoThreads(), 1); } } + + @Test + public void testTransactionEndedStatusConfigurations() throws Exception { + ServiceConfiguration defaultConfig = new ServiceConfiguration(); + assertEquals(defaultConfig.getTransactionEndedStatusRetentionTimeMs(), TimeUnit.HOURS.toMillis(1)); + assertEquals(defaultConfig.getTransactionEndedStatusMaxRecordCount(), 100_000L); + + assertTransactionEndedStatusConfigFile("../conf/broker.conf"); + assertTransactionEndedStatusConfigFile("../conf/standalone.conf"); + + Properties properties = new Properties(); + properties.setProperty("transactionEndedStatusRetentionTimeMs", "12345"); + properties.setProperty("transactionEndedStatusMaxRecordCount", "678"); + ServiceConfiguration configured = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(configured.getTransactionEndedStatusRetentionTimeMs(), 12345L); + assertEquals(configured.getTransactionEndedStatusMaxRecordCount(), 678L); + TransactionMetadataStoreConfig.validateEndedStatusConfig(configured.getTransactionEndedStatusRetentionTimeMs(), + configured.getTransactionEndedStatusMaxRecordCount()); + + properties.setProperty("transactionEndedStatusRetentionTimeMs", "-1"); + properties.setProperty("transactionEndedStatusMaxRecordCount", "678"); + configured = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(configured.getTransactionEndedStatusRetentionTimeMs(), -1L); + assertEquals(configured.getTransactionEndedStatusMaxRecordCount(), 678L); + TransactionMetadataStoreConfig.validateEndedStatusConfig(configured.getTransactionEndedStatusRetentionTimeMs(), + configured.getTransactionEndedStatusMaxRecordCount()); + + properties.setProperty("transactionEndedStatusRetentionTimeMs", "12345"); + properties.setProperty("transactionEndedStatusMaxRecordCount", "-1"); + configured = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + assertEquals(configured.getTransactionEndedStatusRetentionTimeMs(), 12345L); + assertEquals(configured.getTransactionEndedStatusMaxRecordCount(), -1L); + TransactionMetadataStoreConfig.validateEndedStatusConfig(configured.getTransactionEndedStatusRetentionTimeMs(), + configured.getTransactionEndedStatusMaxRecordCount()); + + properties.setProperty("transactionEndedStatusRetentionTimeMs", "-1"); + properties.setProperty("transactionEndedStatusMaxRecordCount", "-1"); + configured = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class); + ServiceConfiguration invalidConfig = configured; + assertThrows(IllegalArgumentException.class, () -> TransactionMetadataStoreConfig.validateEndedStatusConfig( + invalidConfig.getTransactionEndedStatusRetentionTimeMs(), + invalidConfig.getTransactionEndedStatusMaxRecordCount())); + } + + private static void assertTransactionEndedStatusConfigFile(String configFilePath) throws Exception { + Properties fileProperties = new Properties(); + try (FileInputStream stream = new FileInputStream(configFilePath)) { + fileProperties.load(stream); + } + assertEquals(fileProperties.getProperty("transactionEndedStatusRetentionTimeMs"), "3600000"); + assertEquals(fileProperties.getProperty("transactionEndedStatusMaxRecordCount"), "100000"); + + try (FileInputStream stream = new FileInputStream(configFilePath)) { + ServiceConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(fileConfig.getTransactionEndedStatusRetentionTimeMs(), TimeUnit.HOURS.toMillis(1)); + assertEquals(fileConfig.getTransactionEndedStatusMaxRecordCount(), 100_000L); + } + } + @SuppressWarnings("deprecation") @Test diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java index 4510e8b8539a7..f1054ce1bf475 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java @@ -93,6 +93,39 @@ public TransactionNotFoundException(String message) { } } + /** + * Thrown when transaction has already been committed in transaction coordinator. + */ + public static class TransactionAlreadyCommittedException extends TransactionCoordinatorClientException { + private static final long serialVersionUID = 1L; + + public TransactionAlreadyCommittedException(String message) { + super(message); + } + } + + /** + * Thrown when transaction has already been aborted in transaction coordinator. + */ + public static class TransactionAlreadyAbortedException extends TransactionCoordinatorClientException { + private static final long serialVersionUID = 1L; + + public TransactionAlreadyAbortedException(String message) { + super(message); + } + } + + /** + * Thrown when transaction has already timed out in transaction coordinator. + */ + public static class TransactionTimedOutException extends TransactionCoordinatorClientException { + private static final long serialVersionUID = 1L; + + public TransactionTimedOutException(String message) { + super(message); + } + } + /** * Thrown when transaction meta store handler not exists. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index f2731c6b74bc8..32522c5e2921f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -680,6 +680,12 @@ public static TransactionCoordinatorClientException getExceptionByServerError(Se return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg); case TransactionNotFound: return new TransactionCoordinatorClientException.TransactionNotFoundException(msg); + case TransactionAlreadyCommitted: + return new TransactionCoordinatorClientException.TransactionAlreadyCommittedException(msg); + case TransactionAlreadyAborted: + return new TransactionCoordinatorClientException.TransactionAlreadyAbortedException(msg); + case TransactionTimedOut: + return new TransactionCoordinatorClientException.TransactionTimedOutException(msg); default: return new TransactionCoordinatorClientException(msg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 737a91aa9d4aa..afbda5cae2ed8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -35,7 +35,10 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionAlreadyAbortedException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionAlreadyCommittedException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionTimedOutException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -199,8 +202,7 @@ public CompletableFuture commit() { tcClient.commitAsync(txnId) .whenComplete((vx, ex) -> { if (ex != null) { - if (ex instanceof TransactionNotFoundException - || ex instanceof InvalidTxnStatusException) { + if (isEndTxnFailedBecauseTransactionFinished(ex)) { this.state = State.ERROR; } commitFuture.completeExceptionally(ex); @@ -228,8 +230,7 @@ private CompletableFuture internalAbort() { tcClient.abortAsync(txnId).whenComplete((vx, ex) -> { if (ex != null) { - if (ex instanceof TransactionNotFoundException - || ex instanceof InvalidTxnStatusException) { + if (isEndTxnFailedBecauseTransactionFinished(ex)) { this.state = State.ERROR; } abortFuture.completeExceptionally(ex); @@ -244,6 +245,14 @@ private CompletableFuture internalAbort() { return abortFuture; } + private static boolean isEndTxnFailedBecauseTransactionFinished(Throwable ex) { + return ex instanceof TransactionNotFoundException + || ex instanceof InvalidTxnStatusException + || ex instanceof TransactionAlreadyCommittedException + || ex instanceof TransactionAlreadyAbortedException + || ex instanceof TransactionTimedOutException; + } + @Override public TxnID getTxnID() { return this.txnId; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index e6cb2605315ad..d597ae01e64e7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -204,6 +204,7 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsPartialProducer(true); flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); flags.setSupportsReplDedupByLidAndEid(true); + flags.setSupportsTransactionEndStatusErrors(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -333,6 +334,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true); connected.setFeatureFlags().setSupportsTopicWatcherReconcile(supportsTopicWatchers); connected.setFeatureFlags().setSupportsScalableTopics(supportsScalableTopics); + connected.setFeatureFlags().setSupportsTransactionEndStatusErrors(true); return cmd; } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 28501c8bc6637..cde4d4508c5e9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -241,6 +241,10 @@ enum ServerError { // use this error to indicate that this producer is now permanently // fenced. Applications are now supposed to close it and create a // new producer + + TransactionAlreadyCommitted = 26; // Transaction has already been committed + TransactionAlreadyAborted = 27; // Transaction has already been aborted + TransactionTimedOut = 28; // Transaction has already timed out } enum AuthMethod { @@ -317,6 +321,7 @@ message FeatureFlags { optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false]; optional bool supports_topic_watcher_reconcile = 7 [default = false]; optional bool supports_scalable_topics = 8 [default = false]; + optional bool supports_transaction_end_status_errors = 9 [default = false]; } message CommandConnected { diff --git a/pulsar-transaction/coordinator/build.gradle.kts b/pulsar-transaction/coordinator/build.gradle.kts index 9223abfc71211..6f4cc21878397 100644 --- a/pulsar-transaction/coordinator/build.gradle.kts +++ b/pulsar-transaction/coordinator/build.gradle.kts @@ -29,6 +29,7 @@ dependencies { implementation(project(":managed-ledger")) implementation(libs.commons.lang3) implementation(libs.commons.collections4) + implementation(libs.caffeine) implementation(libs.netty.buffer) implementation(libs.netty.common) implementation(libs.jctools.core.jdk11) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java new file mode 100644 index 0000000000000..489c6da676029 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/EndedTxnStatus.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +/** + * Final status retained briefly after an active transaction is removed from memory. + */ +public enum EndedTxnStatus { + COMMITTED, + ABORTED, + TIMEOUT +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index 850fcfb4d19ec..fc9a424a12e86 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -52,6 +52,16 @@ default CompletableFuture getTxnStatus(TxnID txnid) { */ CompletableFuture getTxnMeta(TxnID txnid); + /** + * Query the retained final status for a transaction that is no longer active. + * + * @param txnid transaction id + * @return retained final status, or {@code null} if the transaction is active, unknown, or expired. + */ + default EndedTxnStatus getEndedTxnStatus(TxnID txnid) { + return null; + } + /** * Create a new transaction in the transaction metadata store. * @@ -148,4 +158,14 @@ default TransactionMetadataStoreAttributes getAttributes() { * @return {@link TxnMeta} the txnMetas of slow transactions */ List getSlowTransactions(long timeout); + + default EndedTxnStatus getEndedTxnStatus(TxnStatus status, boolean isTimeout) { + if (status == TxnStatus.COMMITTED) { + return EndedTxnStatus.COMMITTED; + } + if (status == TxnStatus.ABORTED) { + return isTimeout ? EndedTxnStatus.TIMEOUT : EndedTxnStatus.ABORTED; + } + return null; + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreConfig.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreConfig.java new file mode 100644 index 0000000000000..55e796de7b4a4 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import java.util.concurrent.TimeUnit; + +/** + * Configuration owned by a transaction metadata store instance. + */ +public class TransactionMetadataStoreConfig { + + public static final long DEFAULT_TRANSACTION_ENDED_STATUS_RETENTION_TIME_MS = TimeUnit.HOURS.toMillis(1); + public static final long DEFAULT_TRANSACTION_ENDED_STATUS_MAX_RECORD_COUNT = 100_000L; + + private final long maxActiveTransactionsPerCoordinator; + private final long transactionEndedStatusRetentionTimeMs; + private final long transactionEndedStatusMaxRecordCount; + + public TransactionMetadataStoreConfig(long maxActiveTransactionsPerCoordinator, + long transactionEndedStatusRetentionTimeMs, + long transactionEndedStatusMaxRecordCount) { + validateEndedStatusConfig(transactionEndedStatusRetentionTimeMs, transactionEndedStatusMaxRecordCount); + this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator; + this.transactionEndedStatusRetentionTimeMs = transactionEndedStatusRetentionTimeMs; + this.transactionEndedStatusMaxRecordCount = transactionEndedStatusMaxRecordCount; + } + + public static TransactionMetadataStoreConfig defaultConfig(long maxActiveTransactionsPerCoordinator) { + return new TransactionMetadataStoreConfig(maxActiveTransactionsPerCoordinator, + DEFAULT_TRANSACTION_ENDED_STATUS_RETENTION_TIME_MS, + DEFAULT_TRANSACTION_ENDED_STATUS_MAX_RECORD_COUNT); + } + + public static void validateEndedStatusConfig(long retentionTimeMs, long maxRecordCount) { + if (retentionTimeMs < 0 && maxRecordCount < 0) { + throw new IllegalArgumentException("Configuration fields 'transactionEndedStatusRetentionTimeMs' and " + + "'transactionEndedStatusMaxRecordCount' cannot both be negative because that would retain " + + "ended transaction statuses without a time or size limit, eventually, it will cause OOM error"); + } + } + + public long getMaxActiveTransactionsPerCoordinator() { + return maxActiveTransactionsPerCoordinator; + } + + public long getTransactionEndedStatusRetentionTimeMs() { + return transactionEndedStatusRetentionTimeMs; + } + + public long getTransactionEndedStatusMaxRecordCount() { + return transactionEndedStatusMaxRecordCount; + } +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java index 7145ea1214f95..84493f8d14211 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java @@ -63,6 +63,7 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th * @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger. * @param timeoutTracker {@link TransactionTimeoutTracker} the timeoutTracker to handle transaction time out. * @param recoverTracker {@link TransactionRecoverTracker} the recoverTracker to handle transaction recover. + * @param metadataStoreConfig {@link TransactionMetadataStoreConfig} the metadata store level config. * @return a future represents the result of the operation. * an instance of {@link TransactionMetadataStore} is returned * if the operation succeeds. @@ -70,6 +71,6 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th CompletableFuture openStore( TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator, + TransactionRecoverTracker recoverTracker, TransactionMetadataStoreConfig metadataStoreConfig, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java index 9ca3d549abc60..070dc1f3afe5c 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java @@ -100,6 +100,42 @@ public TransactionNotFoundException(Throwable cause) { } } + /** + * Exception is thrown when a transaction has already been committed. + */ + public static class TransactionAlreadyCommittedException extends TransactionNotFoundException { + + private static final long serialVersionUID = 0L; + + public TransactionAlreadyCommittedException(TxnID txnID) { + super("The transaction `" + txnID + "` has already been committed."); + } + } + + /** + * Exception is thrown when a transaction has already been aborted. + */ + public static class TransactionAlreadyAbortedException extends TransactionNotFoundException { + + private static final long serialVersionUID = 0L; + + public TransactionAlreadyAbortedException(TxnID txnID) { + super("The transaction `" + txnID + "` has already been aborted."); + } + } + + /** + * Exception is thrown when a transaction has already timed out. + */ + public static class TransactionAlreadyTimedOutException extends TransactionNotFoundException { + + private static final long serialVersionUID = 0L; + + public TransactionAlreadyTimedOutException(TxnID txnID) { + super("The transaction `" + txnID + "` has already timed out."); + } + } + /** * Exception is thrown when a operation of transaction is executed in a error transaction metadata store state. */ diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCache.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCache.java new file mode 100644 index 0000000000000..df90f74a1b1c1 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCache.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.transaction.coordinator.EndedTxnStatus; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; +import org.jspecify.annotations.Nullable; + +class EndedTxnStatusCache { + + private final Cache cache; + private final long retentionTimeMs; + private final AtomicBoolean closed; + + private EndedTxnStatusCache(Cache cache, long retentionTimeMs, AtomicBoolean closed) { + this.cache = cache; + this.retentionTimeMs = retentionTimeMs; + this.closed = closed; + } + + static EndedTxnStatusCache create(long retentionTimeMs, long maxRecordCount, + @Nullable Consumer removalListener) { + TransactionMetadataStoreConfig.validateEndedStatusConfig(retentionTimeMs, maxRecordCount); + AtomicBoolean closed = new AtomicBoolean(false); + if (retentionTimeMs == 0 || maxRecordCount == 0) { + return new EndedTxnStatusCache(null, retentionTimeMs, closed); + } + Caffeine cacheBuilder = Caffeine.newBuilder().executor(Runnable::run); + if (retentionTimeMs > 0) { + cacheBuilder.expireAfter(new Expiry<>() { + @Override + public long expireAfterCreate(Object key, Object rawValue, long currentTime) { + EndedTxnMetadata value = (EndedTxnMetadata) rawValue; + long expireAtMs = value.endedAtMs() + retentionTimeMs; + long remainingMs = expireAtMs - System.currentTimeMillis(); + return remainingMs <= 0 ? 0 : TimeUnit.MILLISECONDS.toNanos(remainingMs); + } + + @Override + public long expireAfterUpdate(Object key, Object value, long currentTime, + long currentDuration) { + return expireAfterCreate(key, value, currentTime); + } + + @Override + public long expireAfterRead(Object key, Object value, long currentTime, + long currentDuration) { + return currentDuration; + } + }); + } + if (maxRecordCount > 0) { + cacheBuilder.maximumSize(maxRecordCount); + } + if (removalListener != null) { + cacheBuilder.removalListener((txnID, value, cause) -> { + if (!closed.get() && value != null) { + removalListener.accept((EndedTxnMetadata) value); + } + }); + } + Cache cache = cacheBuilder.build(); + return new EndedTxnStatusCache(cache, retentionTimeMs, closed); + } + + EndedTxnStatus get(TxnID txnID) { + if (cache == null) { + return null; + } + EndedTxnMetadata metadata = cache.getIfPresent(txnID); + return metadata == null ? null : metadata.status(); + } + + boolean record(TxnID txnID, EndedTxnStatus status, Position logPositions, long endedAtMs) { + if (cache == null || status == null || !isWithinRetention(endedAtMs)) { + return false; + } + cache.put(txnID, new EndedTxnMetadata(txnID, status, logPositions, endedAtMs)); + return true; + } + + void close() { + if (cache != null) { + closed.set(true); + cache.invalidateAll(); + cache.cleanUp(); + } + } + + private boolean isWithinRetention(long endedAtMs) { + return retentionTimeMs < 0 || endedAtMs + retentionTimeMs > System.currentTimeMillis(); + } + + record EndedTxnMetadata(TxnID txnID, EndedTxnStatus status, Position logPosition, long endedAtMs) {} +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 7817d48487568..bb370c2e2d186 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -28,9 +28,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; +import org.apache.pulsar.transaction.coordinator.EndedTxnStatus; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; @@ -50,13 +52,14 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { private final LongAdder commitTransactionCount; private final LongAdder abortTransactionCount; private final LongAdder transactionTimeoutCount; + private final EndedTxnStatusCache endedTxnStatusCache; private volatile TransactionMetadataStoreAttributes attributes = null; private static final AtomicReferenceFieldUpdater ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( InMemTransactionMetadataStore.class, TransactionMetadataStoreAttributes.class, "attributes"); - InMemTransactionMetadataStore(TransactionCoordinatorID tcID) { + InMemTransactionMetadataStore(TransactionCoordinatorID tcID, TransactionMetadataStoreConfig metadataStoreConfig) { this.tcID = tcID; this.localID = new AtomicLong(0L); this.transactions = new ConcurrentHashMap<>(); @@ -65,6 +68,9 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { this.commitTransactionCount = new LongAdder(); this.abortTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); + this.endedTxnStatusCache = EndedTxnStatusCache.create( + metadataStoreConfig.getTransactionEndedStatusRetentionTimeMs(), + metadataStoreConfig.getTransactionEndedStatusMaxRecordCount(), null); } @@ -80,6 +86,11 @@ public CompletableFuture getTxnMeta(TxnID txnid) { return getFuture; } + @Override + public EndedTxnStatus getEndedTxnStatus(TxnID txnid) { + return endedTxnStatusCache.get(txnid); + } + @Override public CompletableFuture newTransaction(long timeoutInMills, String owner) { if (owner != null) { @@ -130,9 +141,19 @@ public CompletableFuture updateTxnStatus(TxnID txnid, TxnStatus newStatus, return getTxnMeta(txnid).thenCompose(txn -> { try { txn.updateTxnStatus(newStatus, expectedStatus); - if (isTimeout && expectedStatus == TxnStatus.ABORTING) { + if (isTimeout && newStatus == TxnStatus.ABORTING) { transactionTimeoutCount.increment(); } + if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { + if (newStatus == TxnStatus.COMMITTED) { + commitTransactionCount.increment(); + } else { + abortTransactionCount.increment(); + } + EndedTxnStatus status = getEndedTxnStatus(newStatus, isTimeout); + endedTxnStatusCache.record(txnid, status, null, System.currentTimeMillis()); + transactions.remove(txnid); + } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { CompletableFuture error = new CompletableFuture<>(); @@ -155,6 +176,7 @@ public TransactionCoordinatorStats getCoordinatorStats() { @Override public CompletableFuture closeAsync() { transactions.clear(); + endedTxnStatusCache.close(); return CompletableFuture.completedFuture(null); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java index f3f5a3e432904..f3901e1342baa 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; @@ -39,10 +40,10 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, - long maxActiveTransactionsPerCoordinator, + TransactionMetadataStoreConfig metadataStoreConfig, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer) { return CompletableFuture.completedFuture( - new InMemTransactionMetadataStore(transactionCoordinatorId)); + new InMemTransactionMetadataStore(transactionCoordinatorId, metadataStoreConfig)); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 780ef47687f49..5d5cdc54bf8a3 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -45,10 +45,12 @@ import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RecoverTimeRecord; +import org.apache.pulsar.transaction.coordinator.EndedTxnStatus; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; @@ -86,6 +88,7 @@ public class MLTransactionMetadataStore private final ExecutorService internalPinnedExecutor; public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); private final long maxActiveTransactionsPerCoordinator; + private final EndedTxnStatusCache endedTxnStatusCache; private volatile TransactionMetadataStoreAttributes attributes = null; private static final AtomicReferenceFieldUpdater @@ -97,6 +100,15 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, TransactionTimeoutTracker timeoutTracker, MLTransactionSequenceIdGenerator sequenceIdGenerator, long maxActiveTransactionsPerCoordinator) { + this(tcID, mlTransactionLog, timeoutTracker, sequenceIdGenerator, + TransactionMetadataStoreConfig.defaultConfig(maxActiveTransactionsPerCoordinator)); + } + + public MLTransactionMetadataStore(TransactionCoordinatorID tcID, + MLTransactionLogImpl mlTransactionLog, + TransactionTimeoutTracker timeoutTracker, + MLTransactionSequenceIdGenerator sequenceIdGenerator, + TransactionMetadataStoreConfig metadataStoreConfig) { super(State.None); this.sequenceIdGenerator = sequenceIdGenerator; this.tcID = tcID; @@ -104,13 +116,20 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, this.timeoutTracker = timeoutTracker; this.transactionMetadataStoreStats = new TransactionMetadataStoreStats(); - this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator; + this.maxActiveTransactionsPerCoordinator = metadataStoreConfig.getMaxActiveTransactionsPerCoordinator(); this.createdTransactionCount = new LongAdder(); this.committedTransactionCount = new LongAdder(); this.abortedTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); this.appendLogCount = new LongAdder(); this.onGoingTxnCount = new LongAdder(); + this.endedTxnStatusCache = EndedTxnStatusCache.create( + metadataStoreConfig.getTransactionEndedStatusRetentionTimeMs(), + metadataStoreConfig.getTransactionEndedStatusMaxRecordCount(), + endedTxnMetadata -> { + deleteTxnLogPositions(endedTxnMetadata.txnID(), + Collections.singletonList(endedTxnMetadata.logPosition())); + }); DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + tcID.toString() + "thread_factory"); this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); @@ -191,22 +210,44 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran break; case UPDATE: if (!txnMetaMap.containsKey(transactionId)) { - transactionLog.deletePosition(Collections.singletonList(position)); + TxnStatus newStatus = transactionMetadataEntry.getNewStatus(); + if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { + long endTime = transactionMetadataEntry.getLastModificationTime(); + EndedTxnStatus endedTxnStatus = getEndedTxnStatus(newStatus, + transactionMetadataEntry.hasIsTimeout() + && transactionMetadataEntry.isIsTimeout()); + if (!endedTxnStatusCache.record(txnID, endedTxnStatus, position, endTime)) { + transactionLog.deletePosition(Collections.singletonList(position)); + } + } else { + log.warn().attr("txnId", + tcID.toString() + ":" + transactionMetadataEntry.getMaxLocalTxnId()) + .attr("txnStatus", newStatus) + .log("Out-of-order transaction logs"); + transactionLog.deletePosition(Collections.singletonList(position)); + } } else { TxnStatus newStatus = transactionMetadataEntry.getNewStatus(); - txnMetaMap.get(transactionId).getLeft() + Pair> txnMetaListPair = txnMetaMap.get(transactionId); + List positionsToDelete = txnMetaListPair.getRight(); + txnMetaListPair.getLeft() .updateTxnStatus(transactionMetadataEntry.getNewStatus(), transactionMetadataEntry.getExpectedStatus()); - txnMetaMap.get(transactionId).getRight().add(position); recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus); if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - transactionLog.deletePosition(txnMetaMap - .get(transactionId).getRight()).thenAccept(v -> { - if (txnMetaMap.remove(transactionId) != null) { - onGoingTxnCount.decrement(); - } - } - ); + long endTime = transactionMetadataEntry.getLastModificationTime(); + EndedTxnStatus endedTxnStatus = getEndedTxnStatus(newStatus, + transactionMetadataEntry.hasIsTimeout() + && transactionMetadataEntry.isIsTimeout()); + if (!endedTxnStatusCache.record(txnID, endedTxnStatus, position, endTime)) { + positionsToDelete.add(position); + } + deleteTxnLogPositions(txnID, positionsToDelete); + if (txnMetaMap.remove(transactionId) != null) { + onGoingTxnCount.decrement(); + } + } else { + positionsToDelete.add(position); } } break; @@ -242,6 +283,11 @@ public CompletableFuture getTxnMeta(TxnID txnID) { return completableFuture; } + @Override + public EndedTxnStatus getEndedTxnStatus(TxnID txnID) { + return endedTxnStatusCache.get(txnID); + } + @Override public CompletableFuture newTransaction(long timeOut, String owner) { if (this.maxActiveTransactionsPerCoordinator == 0 @@ -404,22 +450,27 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, promise.complete(null); return promise; } + long modifyTime = System.currentTimeMillis(); TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() .setTxnidMostBits(txnID.getMostSigBits()) .setTxnidLeastBits(txnID.getLeastSigBits()) .setExpectedStatus(expectedStatus) .setMetadataOp(TransactionMetadataOp.UPDATE) - .setLastModificationTime(System.currentTimeMillis()) + .setLastModificationTime(modifyTime) .setNewStatus(newStatus) + .setIsTimeout(isTimeout) .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); return transactionLog.append(transactionMetadataEntry) .thenAccept(position -> { appendLogCount.increment(); try { + List txnLogs = txnMetaListPair.getRight(); synchronized (txnMetaListPair.getLeft()) { txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); - txnMetaListPair.getRight().add(position); + if (newStatus != TxnStatus.COMMITTED && newStatus != TxnStatus.ABORTED) { + txnLogs.add(position); + } } if (newStatus == TxnStatus.ABORTING && isTimeout) { this.transactionTimeoutCount.increment(); @@ -433,15 +484,14 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, } else { abortedTransactionCount.increment(); } + EndedTxnStatus endedTxnStatus = getEndedTxnStatus(newStatus, isTimeout); + if (!endedTxnStatusCache.record(txnID, endedTxnStatus, position, modifyTime)) { + txnLogs.add(position); + } + deleteTxnLogPositions(txnID, txnLogs); if (txnMetaMap.remove(txnID.getLeastSigBits()) != null) { onGoingTxnCount.decrement(); } - transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> { - log.warn().attr("txnId", txnID) - .log("Failed to delete transaction log position" - + " at end transaction"); - return null; - }); } promise.complete(null); } catch (InvalidTxnStatusException e) { @@ -498,6 +548,20 @@ private CompletableFuture>> getTxnPositionPair(TxnI return completableFuture; } + private void deleteTxnLogPositions(TxnID txnID, List positions) { + if (positions.isEmpty()) { + return; + } + transactionLog.deletePosition(positions).exceptionally(ex -> { + log.error().attr("txnId", txnID) + .attr("ML", transactionLog.getManagedLedger().getName()) + .exception(ex) + .log("Failed to delete transaction log position at end transaction, this may result in a large" + + " backlog and consume excessive BK of disk resources"); + return null; + }); + } + @Override public CompletableFuture closeAsync() { if (changeToClosingState()) { @@ -505,6 +569,7 @@ public CompletableFuture closeAsync() { internalPinnedExecutor.shutdown(); return transactionLog.closeAsync().thenCompose(v -> { txnMetaMap.clear(); + endedTxnStatusCache.close(); onGoingTxnCount.reset(); this.timeoutTracker.close(); if (!this.changeToCloseState()) { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index eb12a3dfb77e4..892a4b37c1023 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreConfig; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; @@ -66,7 +67,7 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, - long maxActiveTransactionsPerCoordinator, + TransactionMetadataStoreConfig metadataStoreConfig, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer) { MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); @@ -77,7 +78,7 @@ public CompletableFuture openStore(TransactionCoordina // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. return txnLog.initialize().thenCompose(__ -> new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, - mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker)); + mlTransactionSequenceIdGenerator, metadataStoreConfig).init(recoverTracker)); } private static class MLTransactionMetadataStoreBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats { diff --git a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto index 134d1cf3b51a1..dd6da21e878c8 100644 --- a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto +++ b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto @@ -52,6 +52,7 @@ message TransactionMetadataEntry { optional uint64 last_modification_time = 10; optional uint64 max_local_txn_id = 11; optional string owner = 12; + optional bool is_timeout = 13 [default = false]; } message BatchedTransactionMetadataEntry{ diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index eee5494ba2da1..00e837b1b85a0 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; +import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase; import org.awaitility.Awaitility; @@ -377,7 +378,8 @@ public void testDeleteLog(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) @Cleanup("closeAsync") MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, - new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, + new TransactionMetadataStoreConfig(0L, 0L, 100_000L)); transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); int checkReplayRetryCount = 0; while (true) { @@ -439,6 +441,119 @@ public void testDeleteLog(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) } } + @Test(dataProvider = "bufferedWriterConfigDataProvider") + public void testRecoverEndedTxnStatus(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception { + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); + managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); + mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + MLTransactionMetadataStore transactionMetadataStore = + new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); + + Awaitility.await().until(transactionMetadataStore::checkIfReady); + TxnID txnID = transactionMetadataStore.newTransaction(1000, null).get(); + transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.ABORTING, TxnStatus.OPEN, true).get(); + transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING, true).get(); + assertEquals(transactionMetadataStore.getEndedTxnStatus(txnID), EndedTxnStatus.TIMEOUT); + + transactionMetadataStore.closeAsync().get(2, TimeUnit.SECONDS); + Assert.assertNull(transactionMetadataStore.getEndedTxnStatus(txnID)); + + MLTransactionLogImpl recoveredTxnLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); + recoveredTxnLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") + MLTransactionMetadataStore recoveredStore = + new MLTransactionMetadataStore(transactionCoordinatorID, recoveredTxnLog, + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); + recoveredStore.init(new TransactionRecoverTrackerImpl()).get(); + + Awaitility.await().until(recoveredStore::checkIfReady); + assertEquals(recoveredStore.getEndedTxnStatus(txnID), EndedTxnStatus.TIMEOUT); + try { + recoveredStore.getTxnMeta(txnID).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException); + } + } + + @Test(dataProvider = "bufferedWriterConfigDataProvider") + public void testReplayKeepsEndLogWhenPreviousLogsRemain(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) + throws Exception { + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); + managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); + mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + + TxnID txnID = new TxnID(transactionCoordinatorID.getId(), 0L); + long now = System.currentTimeMillis(); + Position newPosition = mlTransactionLog.append(newTxnLogEntry(txnID, now)).get(2, TimeUnit.SECONDS); + Position abortingPosition = mlTransactionLog.append(updateTxnLogEntry(txnID, TxnStatus.OPEN, + TxnStatus.ABORTING, now + 1, true)).get(2, TimeUnit.SECONDS); + Position abortedPosition = mlTransactionLog.append(updateTxnLogEntry(txnID, TxnStatus.ABORTING, + TxnStatus.ABORTED, now + 2, true)).get(2, TimeUnit.SECONDS); + mlTransactionLog.closeAsync().get(2, TimeUnit.SECONDS); + + RecordingMLTransactionLogImpl recoveredTxnLog = new RecordingMLTransactionLogImpl(transactionCoordinatorID, + factory, managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + recoveredTxnLog.initialize().get(2, TimeUnit.SECONDS); + @Cleanup("closeAsync") + MLTransactionMetadataStore recoveredStore = + new MLTransactionMetadataStore(transactionCoordinatorID, recoveredTxnLog, + new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L); + recoveredStore.init(new TransactionRecoverTrackerImpl()).get(); + + Awaitility.await().until(recoveredStore::checkIfReady); + assertEquals(recoveredStore.getEndedTxnStatus(txnID), EndedTxnStatus.TIMEOUT); + Assert.assertTrue(recoveredTxnLog.deleted(newPosition)); + Assert.assertTrue(recoveredTxnLog.deleted(abortingPosition)); + Assert.assertFalse(recoveredTxnLog.deleted(abortedPosition)); + } + + private static TransactionMetadataEntry newTxnLogEntry(TxnID txnID, long now) { + return new TransactionMetadataEntry() + .setTxnidMostBits(txnID.getMostSigBits()) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setStartTime(now) + .setTimeoutMs(1000) + .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) + .setLastModificationTime(now) + .setMaxLocalTxnId(txnID.getLeastSigBits()); + } + + private static TransactionMetadataEntry updateTxnLogEntry(TxnID txnID, TxnStatus expectedStatus, + TxnStatus newStatus, long now, boolean isTimeout) { + return new TransactionMetadataEntry() + .setTxnidMostBits(txnID.getMostSigBits()) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setExpectedStatus(expectedStatus) + .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE) + .setLastModificationTime(now) + .setNewStatus(newStatus) + .setIsTimeout(isTimeout) + .setMaxLocalTxnId(txnID.getLeastSigBits()); + } + /** * Verify transaction meta store recover correct. * TODO After the batch feature is dynamically switched,append tests that contain both batch and non-batch data. @@ -551,6 +666,29 @@ public void close() { } } + private static class RecordingMLTransactionLogImpl extends MLTransactionLogImpl { + private final List> deletedPositions = new ArrayList<>(); + + RecordingMLTransactionLogImpl(TransactionCoordinatorID tcID, + ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig, + TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, + HashedWheelTimer timer) { + super(tcID, managedLedgerFactory, managedLedgerConfig, txnLogBufferedWriterConfig, timer, + DISABLED_BUFFERED_WRITER_METRICS); + } + + @Override + public CompletableFuture deletePosition(List positions) { + deletedPositions.add(new ArrayList<>(positions)); + return super.deletePosition(positions); + } + + boolean deleted(Position position) { + return deletedPositions.stream().flatMap(List::stream).anyMatch(position::equals); + } + } + public static class TransactionRecoverTrackerImpl implements TransactionRecoverTracker { @Override @@ -574,4 +712,4 @@ public void handleCommittingAndAbortingTransaction() { } } -} \ No newline at end of file +} diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index 0d1f92e2e196b..f4256480dcc05 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -69,7 +69,8 @@ public TransactionMetadataStoreProviderTest(String providerClassName) throws Exc public void setup() throws Exception { this.tcId = new TransactionCoordinatorID(1L); this.store = this.provider.openStore(tcId, null, null, - null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L, + null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), + TransactionMetadataStoreConfig.defaultConfig(0L), new TxnLogBufferedWriterConfig(), transactionTimer).get(); } @@ -155,6 +156,21 @@ public void testUpdateTxnStatusCannotTransition() throws Exception { assertEquals(newTxnStatus, TxnStatus.OPEN); } + @Test + public void testFinalTxnStatusRemovedFromActiveStore() throws Exception { + TxnID txnID = this.store.newTransaction(0L, null).get(); + this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get(); + this.store.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get(); + assertEquals(this.store.getEndedTxnStatus(txnID), EndedTxnStatus.COMMITTED); + + try { + this.store.getTxnMeta(txnID).get(); + fail("Should fail to get txn meta after the transaction reaches a final status"); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof TransactionNotFoundException); + } + } + @Test public void testAddProducedPartition() throws Exception { TxnID txnID = this.store.newTransaction(0L, null).get(); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCacheTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCacheTest.java new file mode 100644 index 0000000000000..3fec24e98b73d --- /dev/null +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/EndedTxnStatusCacheTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.transaction.coordinator.EndedTxnStatus; +import org.testng.annotations.Test; + +public class EndedTxnStatusCacheTest { + + @Test + public void testCreateEndedTxnStatusCacheWithUnboundedDimensions() { + TxnID txnID = new TxnID(1L, 1L); + long now = System.currentTimeMillis(); + + EndedTxnStatusCache cache = EndedTxnStatusCache.create(3600000L, 100000L, null); + assertTrue(cache.record(txnID, EndedTxnStatus.COMMITTED, null, now)); + assertEquals(cache.get(txnID), EndedTxnStatus.COMMITTED); + + cache = EndedTxnStatusCache.create(-1L, 100000L, null); + assertTrue(cache.record(txnID, EndedTxnStatus.ABORTED, null, now)); + assertEquals(cache.get(txnID), EndedTxnStatus.ABORTED); + + cache = EndedTxnStatusCache.create(3600000L, -1L, null); + assertTrue(cache.record(txnID, EndedTxnStatus.TIMEOUT, null, now)); + assertEquals(cache.get(txnID), EndedTxnStatus.TIMEOUT); + + cache = EndedTxnStatusCache.create(0L, 100000L, null); + assertFalse(cache.record(txnID, EndedTxnStatus.COMMITTED, null, now)); + assertNull(cache.get(txnID)); + + cache = EndedTxnStatusCache.create(3600000L, 0L, null); + assertFalse(cache.record(txnID, EndedTxnStatus.COMMITTED, null, now)); + assertNull(cache.get(txnID)); + + cache = EndedTxnStatusCache.create(1L, -1L, null); + assertFalse(cache.record(txnID, EndedTxnStatus.ABORTED, null, now - 10000L)); + assertNull(cache.get(txnID)); + + assertThrows(IllegalArgumentException.class, () -> EndedTxnStatusCache.create(-1L, -1L, null)); + } +}