diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 2311c1b017d9..f46a2a58935e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -29,6 +29,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class BaseReplaceSortOrder implements ReplaceSortOrder { @@ -57,6 +58,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { this.base = ops.refresh(); diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 9389aec50c0a..51fd50bd7ce5 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; class PropertiesUpdate implements UpdateProperties { @@ -107,6 +108,7 @@ public void commit() { base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { Map newProperties = apply(); diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index dbccdb86ed4e..2071371b6e55 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; @@ -366,6 +367,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( item -> { TableMetadata updated = internalApply(); diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..dd434cf85aca 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class SetLocation implements UpdateLocation { @@ -61,6 +62,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); } } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..309d986ff65f 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -115,6 +116,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { Snapshot snapshot = apply(); diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 01e06fa16bca..baa28a10264e 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -32,6 +32,7 @@ import java.util.Optional; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Tasks; public class SetStatistics implements UpdateStatistics { @@ -71,6 +72,7 @@ public void commit() { .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> { TableMetadata base = taskOps.refresh(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f77222d6a2a6..22784e8b2bee 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -72,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Exceptions; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -494,6 +495,7 @@ public void commit() { base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .countAttempts(commitMetrics().attempts()) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java index 334bfde8abfc..be03b48aabcf 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java @@ -19,6 +19,7 @@ package org.apache.iceberg.rest; import java.util.function.Consumer; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -125,6 +126,13 @@ public void accept(ErrorResponse error) { case 404: throw new NoSuchTableException("%s", error.message()); case 409: + if (RetryableValidationException.class.getSimpleName().equals(error.type())) { + throw new CommitFailedException( + new RetryableValidationException("%s", error.message()), + "Commit failed: %s", + error.message()); + } + throw new CommitFailedException("Commit failed: %s", error.message()); case 500: case 502: diff --git a/core/src/main/java/org/apache/iceberg/util/CommitRetry.java b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java new file mode 100644 index 000000000000..d06e22dcedda --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/CommitRetry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; + +import org.apache.iceberg.RetryableValidationException; +import org.apache.iceberg.exceptions.CommitFailedException; + +public class CommitRetry { + private CommitRetry() {} + + public static CommitFailedException retryExhaustedException( + Exception cause, Tasks.RetryExhaustionReason reason) { + if (isRetryableValidationCommitFailure(cause)) { + return (CommitFailedException) cause; + } + + return new CommitFailedException( + cause, "Commit failed after exhausting retries. %s", retryExhaustionMessage(reason)); + } + + public static boolean isRetryableValidationCommitFailure(Exception cause) { + return cause instanceof CommitFailedException + && cause.getCause() instanceof RetryableValidationException; + } + + @SuppressWarnings("StatementSwitchToExpressionSwitch") + private static String retryExhaustionMessage(Tasks.RetryExhaustionReason reason) { + switch (reason) { + case ATTEMPT_LIMIT: + return String.format("To allow more retry attempts, adjust %s.", COMMIT_NUM_RETRIES); + + case TIMEOUT: + return String.format( + "To allow more total retry time, adjust %s.", COMMIT_TOTAL_RETRY_TIME_MS); + + case ATTEMPT_LIMIT_AND_TIMEOUT: + return String.format( + "To allow more retry attempts and total retry time, adjust %s and %s.", + COMMIT_NUM_RETRIES, COMMIT_TOTAL_RETRY_TIME_MS); + } + + throw new IllegalArgumentException("Unknown retry exhaustion reason: " + reason); + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 29100c6cffb2..cb6b54d2ef97 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.iceberg.metrics.Counter; @@ -44,6 +45,12 @@ public class Tasks { private Tasks() {} + public enum RetryExhaustionReason { + ATTEMPT_LIMIT, + TIMEOUT, + ATTEMPT_LIMIT_AND_TIMEOUT + } + public static class UnrecoverableException extends RuntimeException { public UnrecoverableException(String message) { super(message); @@ -88,6 +95,8 @@ public static class Builder { private long maxDurationMs = 600000; // 10 minutes private double scaleFactor = 2.0; // exponential private Counter attemptsCounter; + private BiFunction retryExhaustedHandler = + null; public Builder(Iterable items) { this.items = items; @@ -180,6 +189,12 @@ public Builder countAttempts(Counter counter) { return this; } + public Builder onRetryExhausted( + BiFunction handler) { + this.retryExhaustedHandler = handler; + return this; + } + public Builder exponentialBackoff( long backoffMinSleepTimeMs, long backoffMaxSleepTimeMs, @@ -414,39 +429,24 @@ private void runTaskWithRetry(Task task, I item) thr break; } catch (Exception e) { - long durationMs = System.currentTimeMillis() - start; - if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 1)) { - if (durationMs > maxDurationMs) { - LOG.info("Stopping retries after {} ms", durationMs); - } + if (!shouldRetry(e)) { throw e; } - if (shouldRetryPredicate != null) { - if (!shouldRetryPredicate.test(e)) { - throw e; + long durationMs = System.currentTimeMillis() - start; + boolean attemptsExhausted = attempt >= maxAttempts; + boolean timeoutExhausted = durationMs > maxDurationMs && attempt > 1; + if (attemptsExhausted || timeoutExhausted) { + if (timeoutExhausted) { + LOG.info("Stopping retries after {} ms", durationMs); } - } else if (onlyRetryExceptions != null) { - // if onlyRetryExceptions are present, then this retries if one is found - boolean matchedRetryException = false; - for (Class exClass : onlyRetryExceptions) { - if (exClass.isInstance(e)) { - matchedRetryException = true; - break; - } - } - if (!matchedRetryException) { - throw e; + if (retryExhaustedHandler != null) { + throw retryExhaustedHandler.apply( + e, retryExhaustionReason(attemptsExhausted, timeoutExhausted)); } - } else { - // otherwise, always retry unless one of the stop exceptions is found - for (Class exClass : stopRetryExceptions) { - if (exClass.isInstance(e)) { - throw e; - } - } + throw e; } int delayMs = @@ -468,6 +468,41 @@ private void runTaskWithRetry(Task task, I item) thr } } } + + private boolean shouldRetry(Exception exception) { + if (shouldRetryPredicate != null) { + return shouldRetryPredicate.test(exception); + } else if (onlyRetryExceptions != null) { + // if onlyRetryExceptions are present, then this retries if one is found + for (Class exClass : onlyRetryExceptions) { + if (exClass.isInstance(exception)) { + return true; + } + } + + return false; + } else { + // otherwise, always retry unless one of the stop exceptions is found + for (Class exClass : stopRetryExceptions) { + if (exClass.isInstance(exception)) { + return false; + } + } + + return true; + } + } + + private RetryExhaustionReason retryExhaustionReason( + boolean attemptsExhausted, boolean timeoutExhausted) { + if (attemptsExhausted && timeoutExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT; + } else if (attemptsExhausted) { + return RetryExhaustionReason.ATTEMPT_LIMIT; + } else { + return RetryExhaustionReason.TIMEOUT; + } + } } @SuppressWarnings("checkstyle:CyclomaticComplexity") diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java index 48bcfc3a6805..7a1f0f4a375a 100644 --- a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -73,6 +74,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java index 481118c85991..4c92c8e4fc0a 100644 --- a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java +++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java @@ -30,6 +30,7 @@ import org.apache.iceberg.UpdateLocation; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -63,6 +64,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run( taskOps -> taskOps.commit(base, ViewMetadata.buildFrom(base).setLocation(apply()).build())); diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 8b3d087940a5..5dddf8ae6c1c 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -34,6 +34,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -100,6 +101,7 @@ public void commit() { base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted(CommitRetry::retryExhaustedException) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 781051f11d7b..8bd02c57bd49 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -19,19 +19,31 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantTestUtil; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; public class InternalTestHelpers { private InternalTestHelpers() {} + public static void assertCommitRetryExhausted(ThrowingCallable callable) { + assertThatThrownBy(callable) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES) + .cause() + .isInstanceOf(CommitFailedException.class) + .hasMessage("Injected failure"); + } + public static void assertEquals(Types.StructType struct, Record expected, Record actual) { Types.StructType expectedType = expected.struct(); for (Types.NestedField field : struct.fields()) { diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index bc28ecd88022..ae535dd73b43 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -332,9 +331,7 @@ public void testFailure() { ManifestFile newManifest = pending.allManifests(FILE_IO).get(0); assertThat(new File(newManifest.path())).exists(); - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); assertThat(new File(newManifest.path())).doesNotExist(); } @@ -347,9 +344,7 @@ public void testIncreaseNumRetries() { AppendFiles append = table.newFastAppend().appendFile(FILE_B); // Default number of retries results in a failed commit - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); // After increasing the number of retries the commit succeeds table @@ -381,9 +376,7 @@ public void testAppendManifestCleanup() throws IOException { assertThat(newManifest.path()).isEqualTo(manifest.path()); } - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); if (formatVersion == 1) { assertThat(new File(newManifest.path())).doesNotExist(); @@ -494,9 +487,7 @@ public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOExcept AppendFiles append = table.newAppend(); append.appendManifest(manifest); - assertThatThrownBy(append::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(append::commit); assertThat(new File(manifest.path())).exists(); } diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 2654f244f3d1..8ea9ac88fff5 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -35,7 +35,6 @@ import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.TestHelpers.Row; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -1175,9 +1174,7 @@ public void testFailure() { ids(pending.snapshotId(), baseId), concat(files(FILE_B), files(initialManifest))); - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); @@ -1213,9 +1210,7 @@ public void testAppendManifestCleanup() throws IOException { assertThat(newManifest.path()).isEqualTo(manifest.path()); } - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); V2Assert.assertEquals( "Last sequence number should be 0", 0, readMetadata().lastSequenceNumber()); V1Assert.assertEquals( @@ -1399,9 +1394,7 @@ public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOExcept AppendFiles append = table.newAppend(); append.appendManifest(manifest); - assertThatThrownBy(() -> commit(table, append, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, append, branch)); assertThat(readMetadata().lastSequenceNumber()).isEqualTo(0); assertThat(new File(manifest.path())).exists(); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 72a3c89b74d5..38a06adcad3d 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -404,9 +403,7 @@ public void testFailure() { validateManifestEntries(manifest1, ids(pending.snapshotId()), files(FILE_B), statuses(ADDED)); validateManifestEntries(manifest2, ids(pending.snapshotId()), files(FILE_A), statuses(DELETED)); - assertThatThrownBy(() -> commit(table, rewrite, branch)) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(() -> commit(table, rewrite, branch)); assertThat(new File(manifest1.path())).doesNotExist(); assertThat(new File(manifest2.path())).doesNotExist(); @@ -471,9 +468,7 @@ public void testFailureWhenRewriteBothDataAndDeleteFiles() { files(fileADeletes(), fileBDeletes()), statuses(DELETED, DELETED)); - assertThatThrownBy(rewrite::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewrite::commit); assertThat(new File(manifest1.path())).doesNotExist(); assertThat(new File(manifest2.path())).doesNotExist(); diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index dab323743bb1..a2050a175f98 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; @@ -1028,9 +1027,7 @@ public void testManifestReplacementFailure() throws IOException { rewriteManifests.deleteManifest(secondSnapshotManifest); rewriteManifests.addManifest(newManifest); - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); assertThat(new File(newManifest.path())).exists(); } @@ -1070,9 +1067,7 @@ public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOE rewriteManifests.deleteManifest(secondSnapshotManifest); rewriteManifests.addManifest(newManifest); - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); assertThat(new File(newManifest.path())).exists(); } @@ -1678,9 +1673,7 @@ public void testDeleteManifestReplacementFailure() throws IOException { rewriteManifests.addManifest(newDeleteManifest); // the rewrite must fail - assertThatThrownBy(rewriteManifests::commit) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted(rewriteManifests::commit); // the new manifest must not be deleted as the commit hasn't succeeded assertThat(new File(newDeleteManifest.path())).exists(); diff --git a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java index 164ee54c898f..d2e7efdef614 100644 --- a/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java +++ b/core/src/test/java/org/apache/iceberg/TestSequenceNumberForV2Table.java @@ -112,9 +112,8 @@ public void testCommitConflict() { table.ops().failCommits(1); - assertThatThrownBy(() -> table.newFastAppend().appendFile(FILE_B).commit()) - .isInstanceOf(CommitFailedException.class) - .hasMessage("Injected failure"); + InternalTestHelpers.assertCommitRetryExhausted( + () -> table.newFastAppend().appendFile(FILE_B).commit()); table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "5").commit(); diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 8c6dc52b1575..e720959e4cdb 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -32,6 +32,7 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.Scan; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -79,6 +80,7 @@ import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.util.CommitRetry; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -715,9 +717,14 @@ public static T castResponse(Class responseType, Obj public static void configureResponseFromException( Exception exc, ErrorResponse.Builder errorBuilder) { + String errorType = exc.getClass().getSimpleName(); + if (CommitRetry.isRetryableValidationCommitFailure(exc)) { + errorType = RetryableValidationException.class.getSimpleName(); + } + errorBuilder .responseCode(EXCEPTION_ERROR_CODES.getOrDefault(exc.getClass(), 500)) - .withType(exc.getClass().getSimpleName()) + .withType(errorType) .withMessage(exc.getMessage()) .withStackTrace(exc); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java new file mode 100644 index 000000000000..96fb3e8da27a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitRetry.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.RetryableValidationException; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.junit.jupiter.api.Test; + +class TestCommitRetry { + + @Test + void retryExhaustedExceptionRecommendsNumRetriesForAttemptLimit() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES); + assertThat(wrapped).hasMessageNotContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped.getCause()).isSameAs(original); + } + + @Test + void retryExhaustedExceptionRecommendsTotalTimeoutForTimeout() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.TIMEOUT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped).hasMessageContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped).hasMessageNotContaining(TableProperties.COMMIT_NUM_RETRIES); + assertThat(wrapped.getCause()).isSameAs(original); + } + + @Test + void retryExhaustedExceptionRecommendsBothPropertiesForAttemptLimitAndTimeout() { + CommitFailedException original = new CommitFailedException("failed"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException( + original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); + + assertThat(wrapped).isInstanceOf(CommitFailedException.class); + assertThat(wrapped) + .hasMessageContaining(TableProperties.COMMIT_NUM_RETRIES) + .hasMessageContaining(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS); + assertThat(wrapped.getCause()).isSameAs(original); + } + + @Test + void retryExhaustedExceptionPreservesRetryableValidationFailures() { + CommitFailedException original = + new CommitFailedException( + new RetryableValidationException("stale values"), + "Commit failed: Validation failed, please retry: stale values"); + + CommitFailedException wrapped = + CommitRetry.retryExhaustedException(original, Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + + assertThat(wrapped).isSameAs(original); + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index 2ca69c66bb6c..b23c7f9e70d4 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -19,8 +19,13 @@ package org.apache.iceberg.util; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.DefaultMetricsContext; import org.junit.jupiter.api.Test; @@ -57,4 +62,167 @@ public void attemptCounterIsIncreasedWithoutRetries() { assertThat(counter.value()).isOne(); } + + @Test + public void retryExhaustedReportsAttemptLimit() { + RuntimeException failure = new RuntimeException("failed"); + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedFailure = new AtomicReference<>(); + AtomicReference capturedReason = new AtomicReference<>(); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted( + (exception, reason) -> { + capturedFailure.set(exception); + capturedReason.set(reason); + return wrapped; + }) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedFailure.get()).isSameAs(failure); + assertThat(capturedReason.get()).isEqualTo(Tasks.RetryExhaustionReason.ATTEMPT_LIMIT); + } + + @Test + public void retryExhaustedReportsTimeout() { + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedReason = new AtomicReference<>(); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(2) + .exponentialBackoff(0, 0, 1, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted( + (exception, reason) -> { + capturedReason.set(reason); + return wrapped; + }) + .run( + x -> { + sleep(5); + throw new RuntimeException("failed"); + })); + + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedReason.get()).isEqualTo(Tasks.RetryExhaustionReason.TIMEOUT); + } + + @Test + public void retryExhaustedReportsAttemptLimitAndTimeout() { + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicReference capturedReason = new AtomicReference<>(); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 1, 0) + .onlyRetryOn(RuntimeException.class) + .onRetryExhausted( + (exception, reason) -> { + capturedReason.set(reason); + return wrapped; + }) + .run( + x -> { + sleep(5); + throw new RuntimeException("failed"); + })); + + assertThat(thrown).isSameAs(wrapped); + assertThat(capturedReason.get()) + .isEqualTo(Tasks.RetryExhaustionReason.ATTEMPT_LIMIT_AND_TIMEOUT); + } + + @Test + public void retryExhaustedHandlerIsOptIn() { + RuntimeException failure = new RuntimeException("failed"); + AtomicInteger attempts = new AtomicInteger(0); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(RuntimeException.class) + .run( + x -> { + attempts.incrementAndGet(); + throw failure; + })); + + assertThat(thrown).isSameAs(failure); + assertThat(attempts.get()).isEqualTo(2); + } + + @Test + public void retryExhaustedDoesNotWrapNonRetryableException() { + RuntimeException failure = new RuntimeException("failed"); + RuntimeException wrapped = new RuntimeException("wrapped"); + AtomicInteger attempts = new AtomicInteger(0); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(1) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(IllegalArgumentException.class) + .onRetryExhausted((exception, reason) -> wrapped) + .run( + x -> { + attempts.incrementAndGet(); + throw failure; + })); + + assertThat(thrown).isSameAs(failure); + assertThat(attempts.get()).isOne(); + } + + @Test + public void retryExhaustedCanWrapAsCommitFailedException() { + CommitFailedException failure = new CommitFailedException("failed"); + + Throwable thrown = + catchThrowable( + () -> + Tasks.foreach(1) + .retry(0) + .exponentialBackoff(0, 0, 5000, 0) + .onlyRetryOn(CommitFailedException.class) + .onRetryExhausted( + (exception, reason) -> new CommitFailedException(exception, "wrapped")) + .run( + x -> { + throw failure; + })); + + assertThat(thrown).isInstanceOf(CommitFailedException.class); + assertThat(thrown).hasMessage("wrapped"); + assertThat(thrown.getCause()).isSameAs(failure); + } + + private static void sleep(long millis) { + try { + TimeUnit.MILLISECONDS.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } }