From 150faaa7e316a4c6489ceaf6ff8d87ab1c910a5f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 21 May 2026 22:37:02 -0700 Subject: [PATCH] Retry CompleteMultipartUpload and DeleteObjects on in-body errors S3 surfaces transient errors inside 200-OK response bodies for these two APIs, so the existing HTTP status-code retry path never sees them. Add a RETRIABLE_ERROR_CODES set (InternalError, SlowDown, ServiceUnavailable, RequestTimeout, Throttling, ThrottlingException) and retry these calls with the same exponential-backoff config as StatusRetryInterceptor. CompleteMultipartUpload: retry the whole request when the body parses as with a retriable code. DeleteObjects: retry the whole batch when any per-object error has a retriable code. Delete is idempotent so re-sending already-deleted keys is safe. --- api/src/main/java/io/minio/BaseS3Client.java | 142 ++++++++++++++----- api/src/main/java/io/minio/Http.java | 19 +++ 2 files changed, 126 insertions(+), 35 deletions(-) diff --git a/api/src/main/java/io/minio/BaseS3Client.java b/api/src/main/java/io/minio/BaseS3Client.java index 372a07e2c..efcf2eeaa 100644 --- a/api/src/main/java/io/minio/BaseS3Client.java +++ b/api/src/main/java/io/minio/BaseS3Client.java @@ -60,6 +60,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import java.util.logging.Logger; import java.util.stream.IntStream; @@ -260,6 +261,40 @@ public void setAwsS3Prefix(@Nonnull String awsS3Prefix) { baseUrl.setAwsS3Prefix(awsS3Prefix); } + private Http.StatusRetryInterceptor getStatusRetryInterceptor() { + List interceptors = this.httpClient.interceptors(); + int i = getStatusRetryInterceptorIndex(interceptors); + return i < 0 ? null : (Http.StatusRetryInterceptor) interceptors.get(i); + } + + private int maxRetryAttempts() { + Http.StatusRetryInterceptor interceptor = getStatusRetryInterceptor(); + return interceptor != null ? interceptor.maxRetries() : 5; + } + + private long jitteredRetryDelay(int attempt) { + Http.StatusRetryInterceptor interceptor = getStatusRetryInterceptor(); + long delayMs = interceptor != null ? interceptor.delayMs() : 100; + if (delayMs <= 0) return 0; + long maxBackoffLimit = delayMs * (1L << (attempt + 1)); + return ThreadLocalRandom.current().nextLong(0, maxBackoffLimit); + } + + private CompletableFuture retryDelayFuture(long delayMs) { + if (delayMs <= 0) return CompletableFuture.completedFuture(null); + return supplyAsync( + (Supplier) + () -> { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(new MinioException("retry sleep interrupted", e)); + } + return null; + }); + } + ///////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////// HTTP execution methods //////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -693,39 +728,55 @@ public CompletableFuture completeMultipartUpload( } catch (MinioException e) { return Utils.failedFuture(e); } + return completeMultipartUploadAttempt(args, body, 0, maxRetryAttempts()); + } + + private CompletableFuture completeMultipartUploadAttempt( + CompleteMultipartUploadArgs args, Http.Body body, int attempt, int maxAttempts) { return executePostAsync( args, args.ssec() == null ? null : args.ssec().headers(), new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()), body) - .thenApply( + .thenCompose( response -> { try { - String bodyContent = response.body().string(); - bodyContent = bodyContent.trim(); + String bodyContent = response.body().string().trim(); if (!bodyContent.isEmpty()) { + ErrorResponse errorResponse = null; try { if (Xml.validate(ErrorResponse.class, bodyContent)) { - ErrorResponse errorResponse = Xml.unmarshal(ErrorResponse.class, bodyContent); - throw new CompletionException( - new ErrorResponseException(errorResponse, response, null)); + errorResponse = Xml.unmarshal(ErrorResponse.class, bodyContent); } } catch (XmlParserException e) { - // As it is not message, fallback to parse CompleteMultipartUploadOutput - // XML. + // Not ; fall through to parse CompleteMultipartUploadOutput XML. + } + + if (errorResponse != null) { + if (attempt + 1 < maxAttempts + && Http.RETRIABLE_ERROR_CODES.contains(errorResponse.code())) { + return retryDelayFuture(jitteredRetryDelay(attempt)) + .thenCompose( + v -> + completeMultipartUploadAttempt( + args, body, attempt + 1, maxAttempts)); + } + return Utils.failedFuture( + new ErrorResponseException(errorResponse, response, null)); } try { CompleteMultipartUploadResult result = Xml.unmarshal(CompleteMultipartUploadResult.class, bodyContent); - return new ObjectWriteResponse( - response.headers(), - result.bucket(), - result.location(), - result.object(), - result.etag(), - response.header("x-amz-version-id"), - result); + return CompletableFuture.completedFuture( + new ObjectWriteResponse( + response.headers(), + result.bucket(), + result.location(), + result.object(), + result.etag(), + response.header("x-amz-version-id"), + result)); } catch (XmlParserException e) { // As this CompleteMultipartUpload REST call succeeded, just log it. Logger.getLogger(BaseS3Client.class.getName()) @@ -735,15 +786,16 @@ public CompletableFuture completeMultipartUpload( } } - return new ObjectWriteResponse( - response.headers(), - args.bucket(), - args.location(), - args.object(), - null, - response.header("x-amz-version-id")); + return CompletableFuture.completedFuture( + new ObjectWriteResponse( + response.headers(), + args.bucket(), + args.location(), + args.object(), + null, + response.header("x-amz-version-id"))); } catch (IOException e) { - throw new CompletionException(new MinioException(e)); + return Utils.failedFuture(new MinioException(e)); } finally { response.close(); } @@ -895,6 +947,11 @@ public CompletableFuture createMultipartUpload( */ public CompletableFuture deleteObjects(DeleteObjectsArgs args) { checkArgs(args); + return deleteObjectsAttempt(args, 0, maxRetryAttempts()); + } + + private CompletableFuture deleteObjectsAttempt( + DeleteObjectsArgs args, int attempt, int maxAttempts) { Http.Body body = null; try { body = new Http.Body(new DeleteRequest(args.objects(), args.quiet()), null, null, null); @@ -908,35 +965,50 @@ public CompletableFuture deleteObjects(DeleteObjectsArgs : null, new Http.QueryParameters("delete", ""), body) - .thenApply( + .thenCompose( response -> { try { String bodyContent = response.body().string(); + DeleteResult result = null; try { if (Xml.validate(DeleteResult.Error.class, bodyContent)) { DeleteResult.Error error = Xml.unmarshal(DeleteResult.Error.class, bodyContent); - DeleteResult result = new DeleteResult(error); - return new DeleteObjectsResponse( - response.headers(), args.bucket(), args.region(), result); + result = new DeleteResult(error); } } catch (XmlParserException e) { - // Ignore this exception as it is not message, - // but parse it as message below. + // Not top-level ; parse as below. } - DeleteResult result = Xml.unmarshal(DeleteResult.class, bodyContent); - return new DeleteObjectsResponse( - response.headers(), args.bucket(), args.region(), result); + if (result == null) { + result = Xml.unmarshal(DeleteResult.class, bodyContent); + } + + if (attempt + 1 < maxAttempts && hasRetriableErrors(result)) { + return retryDelayFuture(jitteredRetryDelay(attempt)) + .thenCompose(v -> deleteObjectsAttempt(args, attempt + 1, maxAttempts)); + } + + return CompletableFuture.completedFuture( + new DeleteObjectsResponse( + response.headers(), args.bucket(), args.region(), result)); } catch (IOException e) { - throw new CompletionException(new MinioException(e)); + return Utils.failedFuture(new MinioException(e)); } catch (XmlParserException e) { - throw new CompletionException(e); + return Utils.failedFuture(e); } finally { response.close(); } }); } + private static boolean hasRetriableErrors(DeleteResult result) { + if (result == null) return false; + for (DeleteResult.Error err : result.errors()) { + if (Http.RETRIABLE_ERROR_CODES.contains(err.code())) return true; + } + return false; + } + /** * Do GetBucketLocation diff --git a/api/src/main/java/io/minio/Http.java b/api/src/main/java/io/minio/Http.java index a853621e2..a7ba63190 100644 --- a/api/src/main/java/io/minio/Http.java +++ b/api/src/main/java/io/minio/Http.java @@ -103,6 +103,17 @@ public class Http { 504, // Gateway Timeout 520); // Cloudflare unknown error + // S3 error codes that may appear inside a 200-OK response body for APIs like + // CompleteMultipartUpload and DeleteObjects. These are considered transient and safe to retry. + public static final Set RETRIABLE_ERROR_CODES = + ImmutableSet.of( + "InternalError", + "SlowDown", + "ServiceUnavailable", + "RequestTimeout", + "Throttling", + "ThrottlingException"); + public static final String END_HTTP = "----------END-HTTP----------"; public static final String UPLOAD_ID = "uploadId"; public static final Set TRACE_QUERY_PARAMS = @@ -759,6 +770,14 @@ public StatusRetryInterceptor( isBucketRequest); } + public long delayMs() { + return delayMs; + } + + public int maxRetries() { + return maxRetries; + } + @Override public Response intercept(Chain chain) throws IOException { okhttp3.Request request = chain.request();