From a1f862e4145e81998c868ffb1d1c226596390ead Mon Sep 17 00:00:00 2001 From: jhrotko Date: Thu, 11 Jun 2026 18:42:30 +0100 Subject: [PATCH 1/6] First Attempt --- .../apache/iceberg/BaseReplaceSortOrder.java | 1 + .../org/apache/iceberg/BaseTransaction.java | 2 + .../java/org/apache/iceberg/CommitRetry.java | 122 +++++++++++++++++ .../org/apache/iceberg/PropertiesUpdate.java | 1 + .../org/apache/iceberg/RemoveSnapshots.java | 1 + .../java/org/apache/iceberg/SetLocation.java | 1 + .../apache/iceberg/SetSnapshotOperation.java | 1 + .../org/apache/iceberg/SetStatistics.java | 1 + .../org/apache/iceberg/SnapshotProducer.java | 1 + .../apache/iceberg/rest/CatalogHandlers.java | 3 + .../java/org/apache/iceberg/util/Tasks.java | 78 +++++++---- .../apache/iceberg/view/PropertiesUpdate.java | 2 + .../apache/iceberg/view/SetViewLocation.java | 2 + .../iceberg/view/ViewVersionReplace.java | 2 + .../org/apache/iceberg/util/TestTasks.java | 128 ++++++++++++++++++ 15 files changed, 320 insertions(+), 26 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/CommitRetry.java diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 2311c1b017d9..ce3425ef8869 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -57,6 +57,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { this.base = ops.refresh(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..81f9a2f01fc9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -310,6 +310,7 @@ private void commitReplaceTransaction(boolean orCreate) { props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( underlyingOps -> { try { @@ -365,6 +366,7 @@ private void commitSimpleTransaction() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( underlyingOps -> { applyUpdates(underlyingOps); diff --git a/core/src/main/java/org/apache/iceberg/CommitRetry.java b/core/src/main/java/org/apache/iceberg/CommitRetry.java new file mode 100644 index 000000000000..4d7551020fc4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/CommitRetry.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.util.Tasks; + +public class CommitRetry { + public enum RetryExhaustionReason { + ATTEMPT_LIMIT, + TIMEOUT, + ATTEMPT_LIMIT_AND_TIMEOUT + } + + private CommitRetry() {} + + public static CommitFailedException retryExhaustedException(RetryExhaustedException exhausted) { + return new CommitFailedException( + exhausted, + "Commit failed after exhausting retry budget (%s). %s", + exhausted.getMessage(), + recommendation(exhausted.reason())); + } + + @SuppressWarnings("StatementSwitchToExpressionSwitch") + private static String recommendation(RetryExhaustionReason reason) { + switch (reason) { + case ATTEMPT_LIMIT: + return String.format("To allow more retry attempts, adjust %s.", COMMIT_NUM_RETRIES); + + case TIMEOUT: + return String.format( + "To allow more total retry time, adjust %s.", COMMIT_TOTAL_RETRY_TIME_MS); + + case ATTEMPT_LIMIT_AND_TIMEOUT: + return String.format( + "To allow more retry attempts and total retry time, adjust %s and %s.", + COMMIT_NUM_RETRIES, COMMIT_TOTAL_RETRY_TIME_MS); + } + + throw new IllegalArgumentException("Unknown retry exhaustion reason: " + reason); + } + + + public static RetryExhaustionReason retryExhaustionReason( + boolean attemptsExhausted, boolean timeoutExhausted) { + if (attemptsExhausted && timeoutExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT; + } else if (attemptsExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT; + } else { + return RetryExhaustionReason.TIMEOUT; + } + } + + public static class RetryExhaustedException extends RuntimeException { + private final RetryExhaustionReason reason; + private final int attempts; + private final int maxAttempts; + private final long durationMs; + private final long maxDurationMs; + + public RetryExhaustedException( + RetryExhaustionReason reason, + int attempts, + int maxAttempts, + long durationMs, + long maxDurationMs, + Exception cause) { + super( + String.format( + "Retry exhausted: reason=%s, attempts=%s, max-attempts=%s, duration-ms=%s, " + + "max-duration-ms=%s", + reason, attempts, maxAttempts, durationMs, maxDurationMs), + cause); + this.reason = reason; + this.attempts = attempts; + this.maxAttempts = maxAttempts; + this.durationMs = durationMs; + this.maxDurationMs = maxDurationMs; + } + + public RetryExhaustionReason reason() { + return reason; + } + + public int attempts() { + return attempts; + } + + public int maxAttempts() { + return maxAttempts; + } + + public long durationMs() { + return durationMs; + } + + public long maxDurationMs() { + return maxDurationMs; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 9389aec50c0a..32e82d31d718 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -107,6 +107,7 @@ public void commit() { base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { Map newProperties = apply(); diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index dbccdb86ed4e..57807582e120 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -366,6 +366,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( item -> { TableMetadata updated = internalApply(); diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..04ff2eb89670 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -61,6 +61,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); } } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..c2be45734add 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -115,6 +115,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { Snapshot snapshot = apply(); diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 01e06fa16bca..3c9934d6b322 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -71,6 +71,7 @@ public void commit() { .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { TableMetadata base = taskOps.refresh(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f77222d6a2a6..3129ab6a5dc6 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -494,6 +494,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .countAttempts(commitMetrics().attempts()) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 3a1e62260aae..4f7ca7e32e53 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -49,6 +49,7 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.CommitRetry; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; @@ -621,6 +622,7 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); @@ -783,6 +785,7 @@ static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 29100c6cffb2..f6ce6e134041 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.iceberg.metrics.Counter; @@ -39,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.CommitRetry.retryExhaustionReason; +import static org.apache.iceberg.CommitRetry.RetryExhaustedException; + public class Tasks { private static final Logger LOG = LoggerFactory.getLogger(Tasks.class); @@ -88,6 +92,7 @@ public static class Builder { private long maxDurationMs = 600000; // 10 minutes private double scaleFactor = 2.0; // exponential private Counter attemptsCounter; + private Function retryExhaustedHandler = null; public Builder(Iterable items) { this.items = items; @@ -180,6 +185,12 @@ public Builder countAttempts(Counter counter) { return this; } + public Builder onRetryExhausted( + Function handler) { + this.retryExhaustedHandler = handler; + return this; + } + public Builder exponentialBackoff( long backoffMinSleepTimeMs, long backoffMaxSleepTimeMs, @@ -414,39 +425,30 @@ private void runTaskWithRetry(Task task, I item) thr break; } catch (Exception e) { - long durationMs = System.currentTimeMillis() - start; - if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 1)) { - if (durationMs > maxDurationMs) { - LOG.info("Stopping retries after {} ms", durationMs); - } + if (!shouldRetry(e)) { throw e; } - if (shouldRetryPredicate != null) { - if (!shouldRetryPredicate.test(e)) { - throw e; + long durationMs = System.currentTimeMillis() - start; + boolean attemptsExhausted = attempt >= maxAttempts; + boolean timeoutExhausted = durationMs > maxDurationMs && attempt > 1; + if (attemptsExhausted || timeoutExhausted) { + if (timeoutExhausted) { + LOG.info("Stopping retries after {} ms", durationMs); } - } else if (onlyRetryExceptions != null) { - // if onlyRetryExceptions are present, then this retries if one is found - boolean matchedRetryException = false; - for (Class exClass : onlyRetryExceptions) { - if (exClass.isInstance(e)) { - matchedRetryException = true; - break; - } - } - if (!matchedRetryException) { - throw e; + if (retryExhaustedHandler != null) { + throw retryExhaustedHandler.apply( + new RetryExhaustedException( + retryExhaustionReason(attemptsExhausted, timeoutExhausted), + attempt, + maxAttempts, + durationMs, + maxDurationMs, + e)); } - } else { - // otherwise, always retry unless one of the stop exceptions is found - for (Class exClass : stopRetryExceptions) { - if (exClass.isInstance(e)) { - throw e; - } - } + throw e; } int delayMs = @@ -468,6 +470,30 @@ private void runTaskWithRetry(Task task, I item) thr } } } + + private boolean shouldRetry(Exception exception) { + if (shouldRetryPredicate != null) { + return shouldRetryPredicate.test(exception); + } else if (onlyRetryExceptions != null) { + // if onlyRetryExceptions are present, then this retries if one is found + for (Class exClass : onlyRetryExceptions) { + if (exClass.isInstance(exception)) { + return true; + } + } + + return false; + } else { + // otherwise, always retry unless one of the stop exceptions is found + for (Class exClass : stopRetryExceptions) { + if (exClass.isInstance(exception)) { + return false; + } + } + + return true; + } + } } @SuppressWarnings("checkstyle:CyclomaticComplexity") diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java index 48bcfc3a6805..84327489e541 100644 --- a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; +import org.apache.iceberg.CommitRetry; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -73,6 +74,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java index 481118c85991..d18be2402ba9 100644 --- a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java +++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java @@ -27,6 +27,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import org.apache.iceberg.CommitRetry; import org.apache.iceberg.UpdateLocation; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -63,6 +64,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> taskOps.commit(base, ViewMetadata.buildFrom(base).setLocation(apply()).build())); diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 8b3d087940a5..a12b8faa98c6 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.util.List; +import org.apache.iceberg.CommitRetry; import org.apache.iceberg.EnvironmentContext; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; @@ -100,6 +101,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index 2ca69c66bb6c..a456e9ca2406 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -19,8 +19,12 @@ package org.apache.iceberg.util; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; +import org.apache.iceberg.CommitRetry; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.DefaultMetricsContext; import org.junit.jupiter.api.Test; @@ -57,4 +61,128 @@ public void attemptCounterIsIncreasedWithoutRetries() { assertThat(counter.value()).isOne(); } + + @Test + public void retryExhaustedReportsAttemptLimit() { + RuntimeException failure = new RuntimeException("failed"); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted(exhausted -> exhausted) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); + CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; + assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.ATTEMPT_LIMIT); + assertThat(exhausted.attempts()).isEqualTo(2); + assertThat(exhausted.maxAttempts()).isEqualTo(2); + assertThat(exhausted.getCause()).isSameAs(failure); + } + + @Test + public void retryExhaustedReportsTimeout() { + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(2) + .exponentialBackoff(0, 0, 1, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted(exhausted -> exhausted) + .run( + x -> { + sleep(5); + throw new RuntimeException("failed"); + })); + + assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); + CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; + assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.TIMEOUT); + assertThat(exhausted.attempts()).isEqualTo(2); + assertThat(exhausted.maxAttempts()).isEqualTo(3); + assertThat(exhausted.durationMs()).isGreaterThan(exhausted.maxDurationMs()); + } + + @Test + public void retryExhaustedReportsAttemptLimitAndTimeout() { + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 1, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted(exhausted -> exhausted) + .run( + x -> { + sleep(5); + throw new RuntimeException("failed"); + })); + + assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); + CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; + assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); + assertThat(exhausted.attempts()).isEqualTo(2); + assertThat(exhausted.maxAttempts()).isEqualTo(2); + assertThat(exhausted.durationMs()).isGreaterThan(exhausted.maxDurationMs()); + } + + @Test + public void retryExhaustedHandlerIsOptIn() { + RuntimeException failure = new RuntimeException("failed"); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(0) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(RuntimeException.class) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isSameAs(failure); + } + + @Test + public void retryExhaustedCanWrapAsCommitFailedException() { + CommitFailedException failure = new CommitFailedException("failed"); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(0) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(exhausted -> new CommitFailedException(exhausted, "wrapped")) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isInstanceOf(CommitFailedException.class); + assertThat(thrown).hasMessage("wrapped"); + assertThat(thrown.getCause()).isInstanceOf(CommitRetry.RetryExhaustedException.class); + assertThat(thrown.getCause().getCause()).isSameAs(failure); + } + + private static void sleep(long millis) { + try { + TimeUnit.MILLISECONDS.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } } From 1bff00ac297fc08db31c8fab259997bc3c8fd2d0 Mon Sep 17 00:00:00 2001 From: jhrotko Date: Thu, 11 Jun 2026 18:43:31 +0100 Subject: [PATCH 2/6] Refactor code --- .../apache/iceberg/BaseReplaceSortOrder.java | 1 + .../org/apache/iceberg/BaseTransaction.java | 1 + .../java/org/apache/iceberg/CommitRetry.java | 122 ------------------ .../org/apache/iceberg/PropertiesUpdate.java | 1 + .../org/apache/iceberg/RemoveSnapshots.java | 1 + .../java/org/apache/iceberg/SetLocation.java | 1 + .../apache/iceberg/SetSnapshotOperation.java | 1 + .../org/apache/iceberg/SetStatistics.java | 1 + .../org/apache/iceberg/SnapshotProducer.java | 1 + .../apache/iceberg/rest/CatalogHandlers.java | 2 +- .../org/apache/iceberg/util/CommitRetry.java | 53 ++++++++ .../java/org/apache/iceberg/util/Tasks.java | 35 +++-- .../apache/iceberg/view/PropertiesUpdate.java | 2 +- .../apache/iceberg/view/SetViewLocation.java | 2 +- .../iceberg/view/ViewVersionReplace.java | 2 +- .../org/apache/iceberg/TestCommitRetry.java | 41 ++++++ .../org/apache/iceberg/util/TestTasks.java | 83 ++++++++---- 17 files changed, 186 insertions(+), 164 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/CommitRetry.java create mode 100644 core/src/main/java/org/apache/iceberg/util/CommitRetry.java create mode 100644 core/src/test/java/org/apache/iceberg/TestCommitRetry.java diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index ce3425ef8869..f46a2a58935e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -29,6 +29,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class BaseReplaceSortOrder implements ReplaceSortOrder { diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 81f9a2f01fc9..af3a5bb6c38d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -48,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; diff --git a/core/src/main/java/org/apache/iceberg/CommitRetry.java b/core/src/main/java/org/apache/iceberg/CommitRetry.java deleted file mode 100644 index 4d7551020fc4..000000000000 --- a/core/src/main/java/org/apache/iceberg/CommitRetry.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; - -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.util.Tasks; - -public class CommitRetry { - public enum RetryExhaustionReason { - ATTEMPT_LIMIT, - TIMEOUT, - ATTEMPT_LIMIT_AND_TIMEOUT - } - - private CommitRetry() {} - - public static CommitFailedException retryExhaustedException(RetryExhaustedException exhausted) { - return new CommitFailedException( - exhausted, - "Commit failed after exhausting retry budget (%s). %s", - exhausted.getMessage(), - recommendation(exhausted.reason())); - } - - @SuppressWarnings("StatementSwitchToExpressionSwitch") - private static String recommendation(RetryExhaustionReason reason) { - switch (reason) { - case ATTEMPT_LIMIT: - return String.format("To allow more retry attempts, adjust %s.", COMMIT_NUM_RETRIES); - - case TIMEOUT: - return String.format( - "To allow more total retry time, adjust %s.", COMMIT_TOTAL_RETRY_TIME_MS); - - case ATTEMPT_LIMIT_AND_TIMEOUT: - return String.format( - "To allow more retry attempts and total retry time, adjust %s and %s.", - COMMIT_NUM_RETRIES, COMMIT_TOTAL_RETRY_TIME_MS); - } - - throw new IllegalArgumentException("Unknown retry exhaustion reason: " + reason); - } - - - public static RetryExhaustionReason retryExhaustionReason( - boolean attemptsExhausted, boolean timeoutExhausted) { - if (attemptsExhausted && timeoutExhausted) { - return RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT; - } else if (attemptsExhausted) { - return RetryExhaustionReason.ATTEMPT_LIMIT; - } else { - return RetryExhaustionReason.TIMEOUT; - } - } - - public static class RetryExhaustedException extends RuntimeException { - private final RetryExhaustionReason reason; - private final int attempts; - private final int maxAttempts; - private final long durationMs; - private final long maxDurationMs; - - public RetryExhaustedException( - RetryExhaustionReason reason, - int attempts, - int maxAttempts, - long durationMs, - long maxDurationMs, - Exception cause) { - super( - String.format( - "Retry exhausted: reason=%s, attempts=%s, max-attempts=%s, duration-ms=%s, " - + "max-duration-ms=%s", - reason, attempts, maxAttempts, durationMs, maxDurationMs), - cause); - this.reason = reason; - this.attempts = attempts; - this.maxAttempts = maxAttempts; - this.durationMs = durationMs; - this.maxDurationMs = maxDurationMs; - } - - public RetryExhaustionReason reason() { - return reason; - } - - public int attempts() { - return attempts; - } - - public int maxAttempts() { - return maxAttempts; - } - - public long durationMs() { - return durationMs; - } - - public long maxDurationMs() { - return maxDurationMs; - } - } -} diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 32e82d31d718..51fd50bd7ce5 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; class PropertiesUpdate implements UpdateProperties { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 57807582e120..2071371b6e55 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 04ff2eb89670..dd434cf85aca 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class SetLocation implements UpdateLocation { diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index c2be45734add..309d986ff65f 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 3c9934d6b322..baa28a10264e 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -32,6 +32,7 @@ import java.util.Optional; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class SetStatistics implements UpdateStatistics { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3129ab6a5dc6..22784e8b2bee 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -72,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Exceptions; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 4f7ca7e32e53..3bc958a9da72 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -49,7 +49,6 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; -import org.apache.iceberg.CommitRetry; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; @@ -102,6 +101,7 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseView; diff --git a/core/src/main/java/org/apache/iceberg/util/CommitRetry.java b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java new file mode 100644 index 000000000000..540ee39386cc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; + +import org.apache.iceberg.exceptions.CommitFailedException; + +public class CommitRetry { + private CommitRetry() {} + + public static CommitFailedException retryExhaustedException( + Exception cause, Tasks.RetryExhaustionReason reason) { + return new CommitFailedException( + cause, "Commit failed after exhausting retries. %s", retryExhaustionMessage(reason)); + } + + @SuppressWarnings("StatementSwitchToExpressionSwitch") + private static String retryExhaustionMessage(Tasks.RetryExhaustionReason reason) { + switch (reason) { + case ATTEMPT_LIMIT: + return String.format("To allow more retry attempts, adjust %s.", COMMIT_NUM_RETRIES); + + case TIMEOUT: + return String.format( + "To allow more total retry time, adjust %s.", COMMIT_TOTAL_RETRY_TIME_MS); + + case ATTEMPT_LIMIT_AND_TIMEOUT: + return String.format( + "To allow more retry attempts and total retry time, adjust %s and %s.", + COMMIT_NUM_RETRIES, COMMIT_TOTAL_RETRY_TIME_MS); + } + + throw new IllegalArgumentException("Unknown retry exhaustion reason: " + reason); + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index f6ce6e134041..cb6b54d2ef97 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -32,7 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.iceberg.metrics.Counter; @@ -40,14 +40,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.CommitRetry.retryExhaustionReason; -import static org.apache.iceberg.CommitRetry.RetryExhaustedException; - public class Tasks { private static final Logger LOG = LoggerFactory.getLogger(Tasks.class); private Tasks() {} + public enum RetryExhaustionReason { + ATTEMPT_LIMIT, + TIMEOUT, + ATTEMPT_LIMIT_AND_TIMEOUT + } + public static class UnrecoverableException extends RuntimeException { public UnrecoverableException(String message) { super(message); @@ -92,7 +95,8 @@ public static class Builder { private long maxDurationMs = 600000; // 10 minutes private double scaleFactor = 2.0; // exponential private Counter attemptsCounter; - private Function retryExhaustedHandler = null; + private BiFunction retryExhaustedHandler = + null; public Builder(Iterable items) { this.items = items; @@ -186,7 +190,7 @@ public Builder countAttempts(Counter counter) { } public Builder onRetryExhausted( - Function handler) { + BiFunction handler) { this.retryExhaustedHandler = handler; return this; } @@ -439,13 +443,7 @@ private void runTaskWithRetry(Task task, I item) thr if (retryExhaustedHandler != null) { throw retryExhaustedHandler.apply( - new RetryExhaustedException( - retryExhaustionReason(attemptsExhausted, timeoutExhausted), - attempt, - maxAttempts, - durationMs, - maxDurationMs, - e)); + e, retryExhaustionReason(attemptsExhausted, timeoutExhausted)); } throw e; @@ -494,6 +492,17 @@ private boolean shouldRetry(Exception exception) { return true; } } + + private RetryExhaustionReason retryExhaustionReason( + boolean attemptsExhausted, boolean timeoutExhausted) { + if (attemptsExhausted && timeoutExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT; + } else if (attemptsExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT; + } else { + return RetryExhaustionReason.TIMEOUT; + } + } } @SuppressWarnings("checkstyle:CyclomaticComplexity") diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java index 84327489e541..7a1f0f4a375a 100644 --- a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -29,11 +29,11 @@ import java.util.Map; import java.util.Set; -import org.apache.iceberg.CommitRetry; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java index d18be2402ba9..4c92c8e4fc0a 100644 --- a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java +++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java @@ -27,10 +27,10 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; -import org.apache.iceberg.CommitRetry; import org.apache.iceberg.UpdateLocation; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index a12b8faa98c6..5dddf8ae6c1c 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -28,13 +28,13 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.util.List; -import org.apache.iceberg.CommitRetry; import org.apache.iceberg.EnvironmentContext; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; diff --git a/core/src/test/java/org/apache/iceberg/TestCommitRetry.java b/core/src/test/java/org/apache/iceberg/TestCommitRetry.java new file mode 100644 index 000000000000..50333d458595 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestCommitRetry.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.util.CommitRetry; +import org.apache.iceberg.util.Tasks; +import org.junit.jupiter.api.Test; + +class TestCommitRetry { + + @Test + void retryExhaustedExceptionKeepsCommitFailedExceptionContract() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES); + assertThat(wrapped.getCause()).isSameAs(original); + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index a456e9ca2406..c4df28b03c62 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -22,8 +22,8 @@ import static org.assertj.core.api.Assertions.catchThrowable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; -import org.apache.iceberg.CommitRetry; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.DefaultMetricsContext; @@ -65,6 +65,9 @@ public void attemptCounterIsIncreasedWithoutRetries() { @Test public void retryExhaustedReportsAttemptLimit() { RuntimeException failure = new RuntimeException("failed"); + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedFailure = new AtomicReference<>(); + AtomicReference capturedReason = new AtomicReference<>(); Throwable thrown = catchThrowable( @@ -73,22 +76,27 @@ public void retryExhaustedReportsAttemptLimit() { .retry(1) .exponentialBackoff(0, 0, 5000, 0) .onlyRetryOn(RuntimeException.class) - .onRetryExhausted(exhausted -> exhausted) + .onRetryExhausted( + (exception, reason) -> { + capturedFailure.set(exception); + capturedReason.set(reason); + return wrapped; + }) .run( x -> { throw failure; })); - assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); - CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; - assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.ATTEMPT_LIMIT); - assertThat(exhausted.attempts()).isEqualTo(2); - assertThat(exhausted.maxAttempts()).isEqualTo(2); - assertThat(exhausted.getCause()).isSameAs(failure); + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedFailure.get()).isSameAs(failure); + assertThat(capturedReason.get()).isEqualTo(Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); } @Test public void retryExhaustedReportsTimeout() { + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedReason = new AtomicReference<>(); + Throwable thrown = catchThrowable( () -> @@ -96,23 +104,26 @@ public void retryExhaustedReportsTimeout() { .retry(2) .exponentialBackoff(0, 0, 1, 0) .onlyRetryOn(RuntimeException.class) - .onRetryExhausted(exhausted -> exhausted) + .onRetryExhausted( + (exception, reason) -> { + capturedReason.set(reason); + return wrapped; + }) .run( x -> { sleep(5); throw new RuntimeException("failed"); })); - assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); - CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; - assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.TIMEOUT); - assertThat(exhausted.attempts()).isEqualTo(2); - assertThat(exhausted.maxAttempts()).isEqualTo(3); - assertThat(exhausted.durationMs()).isGreaterThan(exhausted.maxDurationMs()); + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedReason.get()).isEqualTo(Tasks.RetryExhaustionReason.TIMEOUT); } @Test public void retryExhaustedReportsAttemptLimitAndTimeout() { + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedReason = new AtomicReference<>(); + Throwable thrown = catchThrowable( () -> @@ -120,19 +131,20 @@ public void retryExhaustedReportsAttemptLimitAndTimeout() { .retry(1) .exponentialBackoff(0, 0, 1, 0) .onlyRetryOn(RuntimeException.class) - .onRetryExhausted(exhausted -> exhausted) + .onRetryExhausted( + (exception, reason) -> { + capturedReason.set(reason); + return wrapped; + }) .run( x -> { sleep(5); throw new RuntimeException("failed"); })); - assertThat(thrown).isInstanceOf(CommitRetry.RetryExhaustedException.class); - CommitRetry.RetryExhaustedException exhausted = (CommitRetry.RetryExhaustedException) thrown; - assertThat(exhausted.reason()).isEqualTo(CommitRetry.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); - assertThat(exhausted.attempts()).isEqualTo(2); - assertThat(exhausted.maxAttempts()).isEqualTo(2); - assertThat(exhausted.durationMs()).isGreaterThan(exhausted.maxDurationMs()); + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedReason.get()) + .isEqualTo(Tasks.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); } @Test @@ -154,6 +166,27 @@ public void retryExhaustedHandlerIsOptIn() { assertThat(thrown).isSameAs(failure); } + @Test + public void retryExhaustedDoesNotWrapNonRetryableException() { + RuntimeException failure = new RuntimeException("failed"); + RuntimeException wrapped = new RuntimeException("wrapped"); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(0) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(IllegalArgumentException.class) + .onRetryExhausted((exception, reason) -> wrapped) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isSameAs(failure); + } + @Test public void retryExhaustedCanWrapAsCommitFailedException() { CommitFailedException failure = new CommitFailedException("failed"); @@ -165,7 +198,8 @@ public void retryExhaustedCanWrapAsCommitFailedException() { .retry(0) .exponentialBackoff(0, 0, 5000, 0) .onlyRetryOn(CommitFailedException.class) - .onRetryExhausted(exhausted -> new CommitFailedException(exhausted, "wrapped")) + .onRetryExhausted( + (exception, reason) -> new CommitFailedException(exception, "wrapped")) .run( x -> { throw failure; @@ -173,8 +207,7 @@ public void retryExhaustedCanWrapAsCommitFailedException() { assertThat(thrown).isInstanceOf(CommitFailedException.class); assertThat(thrown).hasMessage("wrapped"); - assertThat(thrown.getCause()).isInstanceOf(CommitRetry.RetryExhaustedException.class); - assertThat(thrown.getCause().getCause()).isSameAs(failure); + assertThat(thrown.getCause()).isSameAs(failure); } private static void sleep(long millis) { From 4b53634f65fe0026c5ed92904a12414119767fbd Mon Sep 17 00:00:00 2001 From: jhrotko Date: Thu, 11 Jun 2026 18:54:18 +0100 Subject: [PATCH 3/6] Improve tests --- .../org/apache/iceberg/TestCommitRetry.java | 41 ----------- .../apache/iceberg/util/TestCommitRetry.java | 69 +++++++++++++++++++ .../org/apache/iceberg/util/TestTasks.java | 11 ++- 3 files changed, 78 insertions(+), 43 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/TestCommitRetry.java create mode 100644 core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java diff --git a/core/src/test/java/org/apache/iceberg/TestCommitRetry.java b/core/src/test/java/org/apache/iceberg/TestCommitRetry.java deleted file mode 100644 index 50333d458595..000000000000 --- a/core/src/test/java/org/apache/iceberg/TestCommitRetry.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.util.CommitRetry; -import org.apache.iceberg.util.Tasks; -import org.junit.jupiter.api.Test; - -class TestCommitRetry { - - @Test - void retryExhaustedExceptionKeepsCommitFailedExceptionContract() { - CommitFailedException original = new CommitFailedException("failed"); - - CommitFailedException wrapped = - CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); - - assertThat(wrapped).isInstanceOf(CommitFailedException.class); - assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES); - assertThat(wrapped.getCause()).isSameAs(original); - } -} diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java new file mode 100644 index 000000000000..8cf309a7ba73 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.jupiter.api.Test; + +class TestCommitRetry { + + @Test + void retryExhaustedExceptionRecommendsNumRetriesForAttemptLimit() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES); + assertThat(wrapped).hasMessageNotContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped.getCause()).isSameAs(original); + } + + @Test + void retryExhaustedExceptionRecommendsTotalTimeoutForTimeout() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.TIMEOUT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped).hasMessageNotContaining(TableProperties.COMMIT_NUM_RETRIES); + assertThat(wrapped.getCause()).isSameAs(original); + } + + @Test + void retryExhaustedExceptionRecommendsBothPropertiesForAttemptLimitAndTimeout() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException( + original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped) + .hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES) + .hasMessageContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped.getCause()).isSameAs(original); + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index c4df28b03c62..b23c7f9e70d4 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.catchThrowable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import org.apache.iceberg.exceptions.CommitFailedException; @@ -150,41 +151,47 @@ public void retryExhaustedReportsAttemptLimitAndTimeout() { @Test public void retryExhaustedHandlerIsOptIn() { RuntimeException failure = new RuntimeException("failed"); + AtomicInteger attempts = new AtomicInteger(0); Throwable thrown = catchThrowable( () -> Tasks.foreach(1) - .retry(0) + .retry(1) .exponentialBackoff(0, 0, 5000, 0) .onlyRetryOn(RuntimeException.class) .run( x -> { + attempts.incrementAndGet(); throw failure; })); assertThat(thrown).isSameAs(failure); + assertThat(attempts.get()).isEqualTo(2); } @Test public void retryExhaustedDoesNotWrapNonRetryableException() { RuntimeException failure = new RuntimeException("failed"); RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicInteger attempts = new AtomicInteger(0); Throwable thrown = catchThrowable( () -> Tasks.foreach(1) - .retry(0) + .retry(1) .exponentialBackoff(0, 0, 5000, 0) .onlyRetryOn(IllegalArgumentException.class) .onRetryExhausted((exception, reason) -> wrapped) .run( x -> { + attempts.incrementAndGet(); throw failure; })); assertThat(thrown).isSameAs(failure); + assertThat(attempts.get()).isOne(); } @Test From 1236a0accc3a4cdd15625b75f07e8f689f449e68 Mon Sep 17 00:00:00 2001 From: jhrotko Date: Thu, 11 Jun 2026 19:42:39 +0100 Subject: [PATCH 4/6] Fix assert error message --- .../org/apache/iceberg/InternalTestHelpers.java | 12 ++++++++++++ .../java/org/apache/iceberg/TestFastAppend.java | 17 ++++------------- .../org/apache/iceberg/TestMergeAppend.java | 13 +++---------- .../apache/iceberg/TestReplaceTransaction.java | 5 +---- .../org/apache/iceberg/TestRewriteFiles.java | 9 ++------- .../apache/iceberg/TestRewriteManifests.java | 13 +++---------- .../iceberg/TestSequenceNumberForV2Table.java | 12 +++--------- .../org/apache/iceberg/TestTransaction.java | 8 ++------ 8 files changed, 30 insertions(+), 59 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 781051f11d7b..8bd02c57bd49 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -19,19 +19,31 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantTestUtil; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; public class InternalTestHelpers { private InternalTestHelpers() {} + public static void assertCommitRetryExhausted(ThrowingCallable callable) { + assertThatThrownBy(callable) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES) + .cause() + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + } + public static void assertEquals(Types.StructType struct, Record expected, Record actual) { Types.StructType expectedType = expected.struct(); for (Types.NestedField field : struct.fields()) { diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index bc28ecd88022..ae535dd73b43 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -332,9 +331,7 @@ public void testFailure() { ManifestFile newManifest = pending.allManifests(FILE_IO).get(0); assertThat(new File(newManifest.path())).exists(); - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); assertThat(new File(newManifest.path())).doesNotExist(); } @@ -347,9 +344,7 @@ public void testIncreaseNumRetries() { AppendFiles append = table.newFastAppend().appendFile(FILE_B); // Default number of retries results in a failed commit - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); // After increasing the number of retries the commit succeeds table @@ -381,9 +376,7 @@ public void testAppendManifestCleanup() throws IOException { assertThat(newManifest.path()).isEqualTo(manifest.path()); } - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); if (formatVersion == 1) { assertThat(new File(newManifest.path())).doesNotExist(); @@ -494,9 +487,7 @@ public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOExcept AppendFiles append = table.newAppend(); append.appendManifest(manifest); - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); assertThat(new File(manifest.path())).exists(); } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 2654f244f3d1..8ea9ac88fff5 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -35,7 +35,6 @@ import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.TestHelpers.Row; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -1175,9 +1174,7 @@ public void testFailure() { ids(pending.snapshotId(), baseId), concat(files(FILE_B), files(initialManifest))); - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); @@ -1213,9 +1210,7 @@ public void testAppendManifestCleanup() throws IOException { assertThat(newManifest.path()).isEqualTo(manifest.path()); } - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( @@ -1399,9 +1394,7 @@ public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOExcept AppendFiles append = table.newAppend(); append.appendManifest(manifest); - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); assertThat(readMetadata().lastSequenceNumber()).isEqualTo(0); assertThat(new File(manifest.path())).exists(); diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index 79196c0a7517..8d6a530bc78a 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -302,9 +301,7 @@ public void testReplaceTransactionConflict() { // keep failing to trigger eventual transaction failure ((TestTables.TestTableOperations) ((BaseTransaction) replace).ops()).failCommits(100); - assertThatThrownBy(replace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(replace::commitTransaction); assertThat(version()).isEqualTo(1); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 72a3c89b74d5..38a06adcad3d 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -404,9 +403,7 @@ public void testFailure() { validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED)); validateManifestEntries(manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED)); - assertThatThrownBy(() -> commit(table, rewrite, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, rewrite, branch)); assertThat(new File(manifest1.path())).doesNotExist(); assertThat(new File(manifest2.path())).doesNotExist(); @@ -471,9 +468,7 @@ public void testFailureWhenRewriteBothDataAndDeleteFiles() { files(fileADeletes(), fileBDeletes()), statuses(DELETED, DELETED)); - assertThatThrownBy(rewrite::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewrite::commit); assertThat(new File(manifest1.path())).doesNotExist(); assertThat(new File(manifest2.path())).doesNotExist(); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index dab323743bb1..a2050a175f98 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; @@ -1028,9 +1027,7 @@ public void testManifestReplacementFailure() throws IOException { rewriteManifests.deleteManifest(secondSnapshotManifest); rewriteManifests.addManifest(newManifest); - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); assertThat(new File(newManifest.path())).exists(); } @@ -1070,9 +1067,7 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE rewriteManifests.deleteManifest(secondSnapshotManifest); rewriteManifests.addManifest(newManifest); - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); assertThat(new File(newManifest.path())).exists(); } @@ -1678,9 +1673,7 @@ public void testDeleteManifestReplacementFailure() throws IOException { rewriteManifests.addManifest(newDeleteManifest); // the rewrite must fail - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); // the new manifest must not be deleted as the commit hasn't succeeded assertThat(new File(newDeleteManifest.path())).exists(); diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java index 164ee54c898f..288e1f21c340 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - import java.io.File; import java.net.URI; import java.util.Arrays; @@ -27,7 +25,6 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.TestTemplate; @@ -112,9 +109,8 @@ public void testCommitConflict() { table.ops().failCommits(1); - assertThatThrownBy(() -> table.newFastAppend().appendFile(FILE_B).commit()) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted( + () -> table.newFastAppend().appendFile(FILE_B).commit()); table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "5").commit(); @@ -370,9 +366,7 @@ public void testTransactionFailure() { Transaction txn = table.newTransaction(); txn.newAppend().appendFile(FILE_C).commit(); - assertThatThrownBy(txn::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index fe47ac62561d..9024c2b1d892 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -248,9 +248,7 @@ public void testTransactionConflict() { // cause the transaction commit to fail table.ops().failCommits(1); - assertThatThrownBy(txn::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); } @TestTemplate @@ -282,9 +280,7 @@ public void testTransactionFailureBulkDeletionCleanup() throws IOException { // cause the transaction commit to fail tableWithBulkIO.ops().failCommits(1); - assertThatThrownBy(txn::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); // ensure both files are deleted on transaction failure Mockito.verify(spyFileIO).deleteFiles(Set.of(appendManifest.path(), txnManifestList)); From 9f88d1b33471e810508f5805cb70f75100ac16d6 Mon Sep 17 00:00:00 2001 From: jhrotko Date: Fri, 12 Jun 2026 11:34:36 +0100 Subject: [PATCH 5/6] Limit retry exhaustion guidance to direct commits --- .../src/main/java/org/apache/iceberg/BaseTransaction.java | 3 --- .../java/org/apache/iceberg/rest/CatalogHandlers.java | 3 --- .../java/org/apache/iceberg/TestReplaceTransaction.java | 5 ++++- .../org/apache/iceberg/TestSequenceNumberForV2Table.java | 7 ++++++- .../src/test/java/org/apache/iceberg/TestTransaction.java | 8 ++++++-- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index af3a5bb6c38d..9884ac297079 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -48,7 +48,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -311,7 +310,6 @@ private void commitReplaceTransaction(boolean orCreate) { props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .onRetryExhausted(CommitRetry::retryExhaustedException) .run( underlyingOps -> { try { @@ -367,7 +365,6 @@ private void commitSimpleTransaction() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .onRetryExhausted(CommitRetry::retryExhaustedException) .run( underlyingOps -> { applyUpdates(underlyingOps); diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 3bc958a9da72..3a1e62260aae 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -101,7 +101,6 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; -import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseView; @@ -622,7 +621,6 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); @@ -785,7 +783,6 @@ static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) - .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { ViewMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index 8d6a530bc78a..79196c0a7517 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -301,7 +302,9 @@ public void testReplaceTransactionConflict() { // keep failing to trigger eventual transaction failure ((TestTables.TestTableOperations) ((BaseTransaction) replace).ops()).failCommits(100); - InternalTestHelpers.assertCommitRetryExhausted(replace::commitTransaction); + assertThatThrownBy(replace::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); assertThat(version()).isEqualTo(1); diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java index 288e1f21c340..d2e7efdef614 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import java.io.File; import java.net.URI; import java.util.Arrays; @@ -25,6 +27,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.api.TestTemplate; @@ -366,7 +369,9 @@ public void testTransactionFailure() { Transaction txn = table.newTransaction(); txn.newAppend().appendFile(FILE_C).commit(); - InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); + assertThatThrownBy(txn::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index 9024c2b1d892..fe47ac62561d 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -248,7 +248,9 @@ public void testTransactionConflict() { // cause the transaction commit to fail table.ops().failCommits(1); - InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); + assertThatThrownBy(txn::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); } @TestTemplate @@ -280,7 +282,9 @@ public void testTransactionFailureBulkDeletionCleanup() throws IOException { // cause the transaction commit to fail tableWithBulkIO.ops().failCommits(1); - InternalTestHelpers.assertCommitRetryExhausted(txn::commitTransaction); + assertThatThrownBy(txn::commitTransaction) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); // ensure both files are deleted on transaction failure Mockito.verify(spyFileIO).deleteFiles(Set.of(appendManifest.path(), txnManifestList)); From dba55fb8dfa29323a1650dbad142b78c69efd2e0 Mon Sep 17 00:00:00 2001 From: jhrotko Date: Fri, 12 Jun 2026 12:05:43 +0100 Subject: [PATCH 6/6] Preserve retryable validation commit failures --- .../org/apache/iceberg/rest/ErrorHandlers.java | 8 ++++++++ .../java/org/apache/iceberg/util/CommitRetry.java | 10 ++++++++++ .../apache/iceberg/rest/RESTCatalogAdapter.java | 9 ++++++++- .../org/apache/iceberg/util/TestCommitRetry.java | 14 ++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java index 334bfde8abfc..be03b48aabcf 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java @@ -19,6 +19,7 @@ package org.apache.iceberg.rest; import java.util.function.Consumer; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -125,6 +126,13 @@ public void accept(ErrorResponse error) { case 404: throw new NoSuchTableException("%s", error.message()); case 409: + if (RetryableValidationException.class.getSimpleName().equals(error.type())) { + throw new CommitFailedException( + new RetryableValidationException("%s", error.message()), + "Commit failed: %s", + error.message()); + } + throw new CommitFailedException("Commit failed: %s", error.message()); case 500: case 502: diff --git a/core/src/main/java/org/apache/iceberg/util/CommitRetry.java b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java index 540ee39386cc..d06e22dcedda 100644 --- a/core/src/main/java/org/apache/iceberg/util/CommitRetry.java +++ b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.exceptions.CommitFailedException; public class CommitRetry { @@ -28,10 +29,19 @@ private CommitRetry() {} public static CommitFailedException retryExhaustedException( Exception cause, Tasks.RetryExhaustionReason reason) { + if (isRetryableValidationCommitFailure(cause)) { + return (CommitFailedException) cause; + } + return new CommitFailedException( cause, "Commit failed after exhausting retries. %s", retryExhaustionMessage(reason)); } + public static boolean isRetryableValidationCommitFailure(Exception cause) { + return cause instanceof CommitFailedException + && cause.getCause() instanceof RetryableValidationException; + } + @SuppressWarnings("StatementSwitchToExpressionSwitch") private static String retryExhaustionMessage(Tasks.RetryExhaustionReason reason) { switch (reason) { diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 8c6dc52b1575..e720959e4cdb 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -32,6 +32,7 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.Scan; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -79,6 +80,7 @@ import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -715,9 +717,14 @@ public static T castResponse(Class responseType, Obj public static void configureResponseFromException( Exception exc, ErrorResponse.Builder errorBuilder) { + String errorType = exc.getClass().getSimpleName(); + if (CommitRetry.isRetryableValidationCommitFailure(exc)) { + errorType = RetryableValidationException.class.getSimpleName(); + } + errorBuilder .responseCode(EXCEPTION_ERROR_CODES.getOrDefault(exc.getClass(), 500)) - .withType(exc.getClass().getSimpleName()) + .withType(errorType) .withMessage(exc.getMessage()) .withStackTrace(exc); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java index 8cf309a7ba73..96fb3e8da27a 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitFailedException; import org.junit.jupiter.api.Test; @@ -66,4 +67,17 @@ void retryExhaustedExceptionRecommendsBothPropertiesForAttemptLimitAndTimeout() .hasMessageContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); assertThat(wrapped.getCause()).isSameAs(original); } + + @Test + void retryExhaustedExceptionPreservesRetryableValidationFailures() { + CommitFailedException original = + new CommitFailedException( + new RetryableValidationException("stale values"), + "Commit failed: Validation failed, please retry: stale values"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + + assertThat(wrapped).isSameAs(original); + } }