Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 107 additions & 35 deletions api/src/main/java/io/minio/BaseS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -260,6 +261,40 @@ public void setAwsS3Prefix(@Nonnull String awsS3Prefix) {
baseUrl.setAwsS3Prefix(awsS3Prefix);
}

private Http.StatusRetryInterceptor getStatusRetryInterceptor() {
List<Interceptor> interceptors = this.httpClient.interceptors();
int i = getStatusRetryInterceptorIndex(interceptors);
return i < 0 ? null : (Http.StatusRetryInterceptor) interceptors.get(i);
}

private int maxRetryAttempts() {
Http.StatusRetryInterceptor interceptor = getStatusRetryInterceptor();
return interceptor != null ? interceptor.maxRetries() : 5;
}

private long jitteredRetryDelay(int attempt) {
Http.StatusRetryInterceptor interceptor = getStatusRetryInterceptor();
long delayMs = interceptor != null ? interceptor.delayMs() : 100;
if (delayMs <= 0) return 0;
long maxBackoffLimit = delayMs * (1L << (attempt + 1));
return ThreadLocalRandom.current().nextLong(0, maxBackoffLimit);
}

private CompletableFuture<?> retryDelayFuture(long delayMs) {
if (delayMs <= 0) return CompletableFuture.completedFuture(null);
return supplyAsync(
(Supplier<Object>)
() -> {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(new MinioException("retry sleep interrupted", e));
}
return null;
});
}

/////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// HTTP execution methods ////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -693,39 +728,55 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
} catch (MinioException e) {
return Utils.failedFuture(e);
}
return completeMultipartUploadAttempt(args, body, 0, maxRetryAttempts());
}

private CompletableFuture<ObjectWriteResponse> completeMultipartUploadAttempt(
CompleteMultipartUploadArgs args, Http.Body body, int attempt, int maxAttempts) {
return executePostAsync(
args,
args.ssec() == null ? null : args.ssec().headers(),
new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()),
body)
.thenApply(
.thenCompose(
response -> {
try {
String bodyContent = response.body().string();
bodyContent = bodyContent.trim();
String bodyContent = response.body().string().trim();
if (!bodyContent.isEmpty()) {
ErrorResponse errorResponse = null;
try {
if (Xml.validate(ErrorResponse.class, bodyContent)) {
ErrorResponse errorResponse = Xml.unmarshal(ErrorResponse.class, bodyContent);
throw new CompletionException(
new ErrorResponseException(errorResponse, response, null));
errorResponse = Xml.unmarshal(ErrorResponse.class, bodyContent);
}
} catch (XmlParserException e) {
// As it is not <Error> message, fallback to parse CompleteMultipartUploadOutput
// XML.
// Not <Error>; fall through to parse CompleteMultipartUploadOutput XML.
}

if (errorResponse != null) {
if (attempt + 1 < maxAttempts
&& Http.RETRIABLE_ERROR_CODES.contains(errorResponse.code())) {
return retryDelayFuture(jitteredRetryDelay(attempt))
.thenCompose(
v ->
completeMultipartUploadAttempt(
args, body, attempt + 1, maxAttempts));
}
return Utils.<ObjectWriteResponse>failedFuture(
new ErrorResponseException(errorResponse, response, null));
}

try {
CompleteMultipartUploadResult result =
Xml.unmarshal(CompleteMultipartUploadResult.class, bodyContent);
return new ObjectWriteResponse(
response.headers(),
result.bucket(),
result.location(),
result.object(),
result.etag(),
response.header("x-amz-version-id"),
result);
return CompletableFuture.completedFuture(
new ObjectWriteResponse(
response.headers(),
result.bucket(),
result.location(),
result.object(),
result.etag(),
response.header("x-amz-version-id"),
result));
} catch (XmlParserException e) {
// As this CompleteMultipartUpload REST call succeeded, just log it.
Logger.getLogger(BaseS3Client.class.getName())
Expand All @@ -735,15 +786,16 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
}
}

return new ObjectWriteResponse(
response.headers(),
args.bucket(),
args.location(),
args.object(),
null,
response.header("x-amz-version-id"));
return CompletableFuture.completedFuture(
new ObjectWriteResponse(
response.headers(),
args.bucket(),
args.location(),
args.object(),
null,
response.header("x-amz-version-id")));
} catch (IOException e) {
throw new CompletionException(new MinioException(e));
return Utils.<ObjectWriteResponse>failedFuture(new MinioException(e));
} finally {
response.close();
}
Expand Down Expand Up @@ -895,6 +947,11 @@ public CompletableFuture<CreateMultipartUploadResponse> createMultipartUpload(
*/
public CompletableFuture<DeleteObjectsResponse> deleteObjects(DeleteObjectsArgs args) {
checkArgs(args);
return deleteObjectsAttempt(args, 0, maxRetryAttempts());
}

private CompletableFuture<DeleteObjectsResponse> deleteObjectsAttempt(
DeleteObjectsArgs args, int attempt, int maxAttempts) {
Http.Body body = null;
try {
body = new Http.Body(new DeleteRequest(args.objects(), args.quiet()), null, null, null);
Expand All @@ -908,35 +965,50 @@ public CompletableFuture<DeleteObjectsResponse> deleteObjects(DeleteObjectsArgs
: null,
new Http.QueryParameters("delete", ""),
body)
.thenApply(
.thenCompose(
response -> {
try {
String bodyContent = response.body().string();
DeleteResult result = null;
try {
if (Xml.validate(DeleteResult.Error.class, bodyContent)) {
DeleteResult.Error error = Xml.unmarshal(DeleteResult.Error.class, bodyContent);
DeleteResult result = new DeleteResult(error);
return new DeleteObjectsResponse(
response.headers(), args.bucket(), args.region(), result);
result = new DeleteResult(error);
}
} catch (XmlParserException e) {
// Ignore this exception as it is not <Error> message,
// but parse it as <DeleteResult> message below.
// Not top-level <Error>; parse as <DeleteResult> below.
}

DeleteResult result = Xml.unmarshal(DeleteResult.class, bodyContent);
return new DeleteObjectsResponse(
response.headers(), args.bucket(), args.region(), result);
if (result == null) {
result = Xml.unmarshal(DeleteResult.class, bodyContent);
}

if (attempt + 1 < maxAttempts && hasRetriableErrors(result)) {
return retryDelayFuture(jitteredRetryDelay(attempt))
.thenCompose(v -> deleteObjectsAttempt(args, attempt + 1, maxAttempts));
}

return CompletableFuture.completedFuture(
new DeleteObjectsResponse(
response.headers(), args.bucket(), args.region(), result));
} catch (IOException e) {
throw new CompletionException(new MinioException(e));
return Utils.<DeleteObjectsResponse>failedFuture(new MinioException(e));
} catch (XmlParserException e) {
throw new CompletionException(e);
return Utils.<DeleteObjectsResponse>failedFuture(e);
} finally {
response.close();
}
});
}

private static boolean hasRetriableErrors(DeleteResult result) {
if (result == null) return false;
for (DeleteResult.Error err : result.errors()) {
if (Http.RETRIABLE_ERROR_CODES.contains(err.code())) return true;
}
return false;
}

/**
* Do <a
* href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html">GetBucketLocation
Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/io/minio/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ public class Http {
504, // Gateway Timeout
520); // Cloudflare unknown error

// S3 error codes that may appear inside a 200-OK response body for APIs like
// CompleteMultipartUpload and DeleteObjects. These are considered transient and safe to retry.
public static final Set<String> RETRIABLE_ERROR_CODES =
ImmutableSet.of(
"InternalError",
"SlowDown",
"ServiceUnavailable",
"RequestTimeout",
"Throttling",
"ThrottlingException");

public static final String END_HTTP = "----------END-HTTP----------";
public static final String UPLOAD_ID = "uploadId";
public static final Set<String> TRACE_QUERY_PARAMS =
Expand Down Expand Up @@ -759,6 +770,14 @@ public StatusRetryInterceptor(
isBucketRequest);
}

public long delayMs() {
return delayMs;
}

public int maxRetries() {
return maxRetries;
}

@Override
public Response intercept(Chain chain) throws IOException {
okhttp3.Request request = chain.request();
Expand Down
Loading