diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 9b85c5872972..85cd832d98f9 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -41,37 +41,21 @@ import org.apache.druid.data.input.impl.systemfield.SystemField; import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.s3.S3InputDataConfig; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.services.sts.StsClientBuilder; -import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; -import java.time.Duration; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.UUID; public class S3InputSource extends CloudObjectInputSource { @@ -134,72 +118,20 @@ public S3InputSource( this.awsClientConfig = awsClientConfig; this.awsEndpointConfig = awsEndpointConfig; - this.s3ClientSupplier = Suppliers.memoize( - () -> { - if (s3ClientBuilder != null && s3InputSourceConfig != null) { - // Build a custom S3Client with the provided configuration - S3ClientBuilder customBuilder = S3Client.builder(); - - // Configure endpoint and region - if (awsEndpointConfig != null && awsEndpointConfig.getUrl() != null) { - String endpointUrl = awsEndpointConfig.getUrl(); - // Ensure endpoint URL has a scheme - if (!endpointUrl.startsWith("http://") && !endpointUrl.startsWith("https://")) { - boolean useHttps = S3Utils.useHttps(awsClientConfig, awsEndpointConfig); - endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl, useHttps); - } - customBuilder.endpointOverride(URI.create(endpointUrl)); - if (awsEndpointConfig.getSigningRegion() != null) { - customBuilder.region(Region.of(awsEndpointConfig.getSigningRegion())); - } - } - - // Configure S3-specific settings - if (awsClientConfig != null) { - S3Configuration.Builder s3ConfigBuilder = S3Configuration.builder() - .pathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess()) - .chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding()); - customBuilder.serviceConfiguration(s3ConfigBuilder.build()); - customBuilder.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled()); - } - - // Configure HTTP client with proxy if needed - ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder(); - if (awsProxyConfig != null && awsProxyConfig.getHost() != null) { - ProxyConfiguration proxyConfig = S3Utils.buildProxyConfiguration(awsProxyConfig); - if (proxyConfig != null) { - httpClientBuilder.proxyConfiguration(proxyConfig); - } - } - customBuilder.httpClientBuilder(httpClientBuilder); - - // Configure credentials - AwsCredentialsProvider credentialsProvider; - if (s3InputSourceConfig.isCredentialsConfigured()) { - credentialsProvider = createStaticCredentialsProvider(s3InputSourceConfig); - } else { - credentialsProvider = awsCredentialsProvider; - } - - // Apply assume role if configured - if (s3InputSourceConfig.getAssumeRoleArn() != null) { - credentialsProvider = createAssumeRoleCredentialsProvider( - s3InputSourceConfig, - credentialsProvider - ); - } - - customBuilder.credentialsProvider(credentialsProvider); - - // Build and wrap in ServerSideEncryptingAmazonS3 - return s3ClientBuilder - .setS3ClientSupplier(customBuilder::build) - .build(); - } else { - return s3Client; - } - } - ); + this.s3ClientSupplier = Suppliers.memoize(() -> { + if (s3ClientBuilder == null || s3InputSourceConfig == null) { + return s3Client; + } + return ServerSideEncryptingAmazonS3.builder( + awsCredentialsProvider, + s3ClientBuilder.getS3StorageConfig(), + awsProxyConfig, + awsEndpointConfig, + awsClientConfig, + s3InputSourceConfig, + null + ).build(); + }); this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES; } @@ -278,61 +210,6 @@ public Set getTypes() return Collections.singleton(TYPE_KEY); } - private AwsCredentialsProvider createAssumeRoleCredentialsProvider( - S3InputSourceConfig s3InputSourceConfig, - AwsCredentialsProvider baseCredentialsProvider - ) - { - String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn(); - String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString()); - - StsClientBuilder stsBuilder = StsClient.builder() - .credentialsProvider(baseCredentialsProvider); - - // If we have endpoint config, use its region for STS too - if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() != null) { - stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion())); - } - - StsClient stsClient = stsBuilder.build(); - - AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() - .roleArn(assumeRoleArn) - .roleSessionName(roleSessionName) - .durationSeconds(3600); - - if (s3InputSourceConfig.getAssumeRoleExternalId() != null) { - assumeRoleRequestBuilder.externalId(s3InputSourceConfig.getAssumeRoleExternalId()); - } - - return StsAssumeRoleCredentialsProvider.builder() - .stsClient(stsClient) - .refreshRequest(assumeRoleRequestBuilder.build()) - .asyncCredentialUpdateEnabled(true) - .staleTime(Duration.ofMinutes(3)) - .build(); - } - - @Nonnull - private StaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig) - { - if (s3InputSourceConfig.getSessionToken() != null) { - AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create( - s3InputSourceConfig.getAccessKeyId().getPassword(), - s3InputSourceConfig.getSecretAccessKey().getPassword(), - s3InputSourceConfig.getSessionToken().getPassword() - ); - return StaticCredentialsProvider.create(sessionCredentials); - } else { - return StaticCredentialsProvider.create( - AwsBasicCredentials.create( - s3InputSourceConfig.getAccessKeyId().getPassword(), - s3InputSourceConfig.getSecretAccessKey().getPassword() - ) - ); - } - } - @Nullable @JsonProperty("properties") @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 27fc537e34b3..a0c7f9866558 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -39,17 +39,9 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.http.apache.ApacheHttpClient; -import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.S3Configuration; import javax.annotation.Nullable; import java.net.URI; @@ -150,75 +142,15 @@ public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Build : "" ); } - - final boolean useHttps = S3Utils.useHttps(clientConfig, endpointConfig); - final URI endpointOverride = buildEndpointOverride(endpointConfig, useHttps); - final Region region = StringUtils.isNotEmpty(endpointConfig.getSigningRegion()) - ? Region.of(endpointConfig.getSigningRegion()) - : null; - - final Supplier s3ClientSupplier = () -> { - // Build HTTP client with proxy configuration - ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder() - .connectionTimeout(Duration.ofMillis(clientConfig.getConnectionTimeoutMillis())) - .socketTimeout(Duration.ofMillis(clientConfig.getSocketTimeoutMillis())) - .maxConnections(clientConfig.getMaxConnections()); - - ProxyConfiguration proxyConfiguration = S3Utils.buildProxyConfiguration(proxyConfig); - if (proxyConfiguration != null) { - httpClientBuilder.proxyConfiguration(proxyConfiguration); - } - - // Build S3 configuration - // Note: forcePathStyle is configured on the S3ClientBuilder, not in S3Configuration - S3Configuration s3Configuration = S3Configuration.builder() - .chunkedEncodingEnabled(!clientConfig.isDisableChunkedEncoding()) - .build(); - - S3ClientBuilder s3ClientBuilder = S3Client.builder() - .credentialsProvider(provider) - .httpClientBuilder(httpClientBuilder) - .serviceConfiguration(s3Configuration) - .forcePathStyle(clientConfig.isEnablePathStyleAccess()) - .crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled()); - - if (endpointOverride != null) { - s3ClientBuilder.endpointOverride(endpointOverride); - } - - if (region != null) { - s3ClientBuilder.region(region); - } - - return s3ClientBuilder.build(); - }; - - // Create async client supplier for S3TransferManager - final AsyncHttpClientType asyncHttpClientType = - AsyncHttpClientType.fromString(storageConfig.getS3TransferConfig().getAsyncHttpClientType()); - final Supplier s3AsyncClientSupplier = () -> { - S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder() - .credentialsProvider(provider) - .httpClientBuilder(asyncHttpClientType.buildBuilder(clientConfig)) - .forcePathStyle(clientConfig.isEnablePathStyleAccess()) - .crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled()) - .multipartEnabled(true); - - if (endpointOverride != null) { - s3AsyncClientBuilder.endpointOverride(endpointOverride); - } - - if (region != null) { - s3AsyncClientBuilder.region(region); - } - - return s3AsyncClientBuilder.build(); - }; - - return ServerSideEncryptingAmazonS3.builder() - .setS3ClientSupplier(s3ClientSupplier) - .setS3AsyncClientSupplier(s3AsyncClientSupplier) - .setS3StorageConfig(storageConfig); + return ServerSideEncryptingAmazonS3.builder( + provider, + storageConfig, + proxyConfig, + endpointConfig, + clientConfig, + null, + null + ); } public enum AsyncHttpClientType diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java index d59124ecba28..e5f42bb49999 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java @@ -19,13 +19,32 @@ package org.apache.druid.storage.s3; +import com.google.common.base.Strings; +import org.apache.druid.common.aws.AWSClientConfig; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; +import org.apache.druid.data.input.s3.S3InputSourceConfig; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.s3.S3StorageDruidModule.AsyncHttpClientType; +import org.apache.druid.storage.s3.output.S3ExportStorageProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; @@ -49,13 +68,21 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.transfer.s3.S3TransferManager; import software.amazon.awssdk.transfer.s3.model.FileUpload; import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.UUID; import java.util.function.Supplier; /** @@ -274,6 +301,161 @@ public void upload(String bucket, String key, File file, @Nullable Grant aclGran } } + public static ServerSideEncryptingAmazonS3.Builder builder( + AwsCredentialsProvider awsCredentialsProvider, + S3StorageConfig s3StorageConfig, + @Nullable AWSProxyConfig awsProxyConfig, + @Nullable AWSEndpointConfig awsEndpointConfig, + @Nullable AWSClientConfig awsClientConfig, + @Nullable S3InputSourceConfig s3InputSourceConfig, + @Nullable S3ExportStorageProvider s3ExportStorageProvider + ) + { + if (s3InputSourceConfig != null && s3ExportStorageProvider != null) { + throw DruidException.defensive("Cannot set both s3InputSourceConfig and s3ExportStorageProvider!"); + } + final String assumeRoleArn; + final String assumeRoleExternalId; + if (s3InputSourceConfig != null) { + assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn(); + assumeRoleExternalId = s3InputSourceConfig.getAssumeRoleExternalId(); + } else if (s3ExportStorageProvider != null) { + assumeRoleArn = s3ExportStorageProvider.getAssumeRoleArn(); + assumeRoleExternalId = s3ExportStorageProvider.getAssumeRoleExternalId(); + } else { + assumeRoleArn = null; + assumeRoleExternalId = null; + } + + // Build a custom S3Client with the provided configuration + S3ClientBuilder clientBuilder = S3Client.builder(); + S3AsyncClientBuilder asyncClientBuilder = S3AsyncClient.builder(); + + // Configure endpoint and region + if (awsEndpointConfig != null) { + if (!Strings.isNullOrEmpty(awsEndpointConfig.getUrl())) { + String endpointUrl = awsEndpointConfig.getUrl(); + // Ensure endpoint URL has a scheme + if (!endpointUrl.startsWith("http://") && !endpointUrl.startsWith("https://")) { + boolean useHttps = S3Utils.useHttps(awsClientConfig, awsEndpointConfig); + endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl, useHttps); + } + URI endpointOverride = URI.create(endpointUrl); + clientBuilder.endpointOverride(endpointOverride); + asyncClientBuilder.endpointOverride(endpointOverride); + } + if (!Strings.isNullOrEmpty(awsEndpointConfig.getSigningRegion())) { + Region region = Region.of(awsEndpointConfig.getSigningRegion()); + clientBuilder.region(region); + asyncClientBuilder.region(region); + } + } + + ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder(); + + // Configure S3-specific settings + if (awsClientConfig != null) { + httpClientBuilder.connectionTimeout(Duration.ofMillis(awsClientConfig.getConnectionTimeoutMillis())) + .socketTimeout(Duration.ofMillis(awsClientConfig.getSocketTimeoutMillis())) + .maxConnections(awsClientConfig.getMaxConnections()); + S3Configuration s3Config = S3Configuration.builder() + .chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding()) + .build(); + clientBuilder.serviceConfiguration(s3Config) + .forcePathStyle(awsClientConfig.isEnablePathStyleAccess()) + .crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled()); + asyncClientBuilder.forcePathStyle(awsClientConfig.isEnablePathStyleAccess()) + .crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled()) + .httpClientBuilder(AsyncHttpClientType.fromString(s3StorageConfig.getS3TransferConfig().getAsyncHttpClientType()).buildBuilder(awsClientConfig)) + .multipartEnabled(true); + } + + // Configure HTTP client with proxy if needed + if (awsProxyConfig != null) { + ProxyConfiguration proxyConfig = S3Utils.buildProxyConfiguration(awsProxyConfig); + if (proxyConfig != null) { + httpClientBuilder.proxyConfiguration(proxyConfig); + } + } + clientBuilder.httpClientBuilder(httpClientBuilder); + + // Configure credentials + AwsCredentialsProvider credentialsProvider; + if (s3InputSourceConfig != null && s3InputSourceConfig.isCredentialsConfigured()) { + credentialsProvider = createStaticCredentialsProvider(s3InputSourceConfig); + } else { + credentialsProvider = awsCredentialsProvider; + } + + // Apply assume role if configured + if (!Strings.isNullOrEmpty(assumeRoleArn)) { + credentialsProvider = createAssumeRoleCredentialsProvider( + assumeRoleArn, + assumeRoleExternalId, + awsEndpointConfig, + credentialsProvider + ); + } + + clientBuilder.credentialsProvider(credentialsProvider); + asyncClientBuilder.credentialsProvider(credentialsProvider); + + // Build and wrap in ServerSideEncryptingAmazonS3 + return ServerSideEncryptingAmazonS3.builder() + .setS3ClientSupplier(clientBuilder::build) + .setS3AsyncClientSupplier(asyncClientBuilder::build) + .setS3StorageConfig(s3StorageConfig); + } + + @Nonnull + private static StaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig) + { + if (s3InputSourceConfig.getSessionToken() != null) { + AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create( + s3InputSourceConfig.getAccessKeyId().getPassword(), + s3InputSourceConfig.getSecretAccessKey().getPassword(), + s3InputSourceConfig.getSessionToken().getPassword() + ); + return StaticCredentialsProvider.create(sessionCredentials); + } else { + return StaticCredentialsProvider.create( + AwsBasicCredentials.create( + s3InputSourceConfig.getAccessKeyId().getPassword(), + s3InputSourceConfig.getSecretAccessKey().getPassword() + ) + ); + } + } + + public static AwsCredentialsProvider createAssumeRoleCredentialsProvider( + String assumeRoleArn, + @Nullable String assumeRoleExternalId, + @Nullable AWSEndpointConfig awsEndpointConfig, + AwsCredentialsProvider baseCredentialsProvider + ) + { + String roleSessionName = StringUtils.format("druid-s3-%s", UUID.randomUUID().toString()); + + StsClientBuilder stsBuilder = StsClient.builder().credentialsProvider(baseCredentialsProvider); + // If we have endpoint config, use its region for STS too + if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() != null) { + stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion())); + } + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = + AssumeRoleRequest.builder().roleArn(assumeRoleArn).roleSessionName(roleSessionName).durationSeconds(3600); + if (assumeRoleExternalId != null) { + assumeRoleRequestBuilder.externalId(assumeRoleExternalId); + } + + return StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsBuilder.build()) + .refreshRequest(assumeRoleRequestBuilder.build()) + .asyncCredentialUpdateEnabled(true) + .staleTime(Duration.ofMinutes(3)) + .build(); + } + public static class Builder { private Supplier s3ClientSupplier; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index 129622dfbd77..6f51bf4c29d0 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -22,19 +22,27 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import org.apache.druid.common.aws.AWSClientConfig; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.s3.S3InputSource; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.s3.S3StorageConfig; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.File; import java.net.URI; @@ -51,10 +59,28 @@ public class S3ExportStorageProvider implements ExportStorageProvider @JsonProperty private final String prefix; + @Nullable + @JsonProperty + private final String assumeRoleArn; + @Nullable + @JsonProperty + private final String assumeRoleExternalId; + @JacksonInject S3ExportConfig s3ExportConfig; @JacksonInject - ServerSideEncryptingAmazonS3 s3; + ServerSideEncryptingAmazonS3 s3Client; + + @JacksonInject + S3StorageConfig s3StorageConfig; + @JacksonInject + AWSProxyConfig awsProxyConfig; + @JacksonInject + AWSEndpointConfig awsEndpointConfig; + @JacksonInject + AWSClientConfig awsClientConfig; + @JacksonInject + AwsCredentialsProvider baseCredentialsProvider; @JacksonInject S3UploadManager s3UploadManager; @@ -62,11 +88,16 @@ public class S3ExportStorageProvider implements ExportStorageProvider @JsonCreator public S3ExportStorageProvider( @JsonProperty(value = "bucket", required = true) String bucket, - @JsonProperty(value = "prefix", required = true) String prefix + @JsonProperty(value = "prefix", required = true) String prefix, + @Nullable @JsonProperty(value = "assumeRoleArn") String assumeRoleArn, + @Nullable @JsonProperty(value = "assumeRoleExternalId") String assumeRoleExternalId + ) { this.bucket = bucket; this.prefix = prefix; + this.assumeRoleArn = assumeRoleArn; + this.assumeRoleExternalId = assumeRoleExternalId; } @@ -93,7 +124,22 @@ public StorageConnector createStorageConnector(File taskTempDir) s3ExportConfig.getChunkSize(), s3ExportConfig.getMaxRetry() ); - return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager); + if (Strings.isNullOrEmpty(assumeRoleArn)) { + return new S3StorageConnector(s3OutputConfig, s3Client, s3UploadManager); + } + return new S3StorageConnector( + s3OutputConfig, + ServerSideEncryptingAmazonS3.builder( + baseCredentialsProvider, + s3StorageConfig, + awsProxyConfig, + awsEndpointConfig, + awsClientConfig, + null, + this + ).build(), + s3UploadManager + ); } @VisibleForTesting @@ -136,6 +182,22 @@ public String getPrefix() return prefix; } + @Nullable + @JsonProperty("assumeRoleArn") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getAssumeRoleArn() + { + return assumeRoleArn; + } + + @Nullable + @JsonProperty("assumeRoleExternalId") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getAssumeRoleExternalId() + { + return assumeRoleExternalId; + } + @Override @JsonIgnore public String getResourceType() diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 22c2c767355a..168216affdca 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -59,6 +59,7 @@ import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.S3InputDataConfig; +import org.apache.druid.storage.s3.S3StorageConfig; import org.apache.druid.storage.s3.S3TransferConfig; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -110,7 +111,10 @@ public class S3InputSourceTest extends InitializedNullHandlingTest public static final S3Client S3_CLIENT = EasyMock.createMock(S3Client.class); public static final ServerSideEncryptingAmazonS3.Builder SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER = EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class); - public static final S3ClientBuilder S3_CLIENT_BUILDER = S3Client.builder(); + public static final S3StorageConfig S3_STORAGE_CONFIG = new S3StorageConfig( + new NoopServerSideEncryption(), + new S3TransferConfig() + ); public static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3( S3_CLIENT, null, @@ -346,10 +350,7 @@ public void testSerdeWithObjects() throws Exception public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exception { EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject())) - .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) - .andReturn(SERVICE); + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG); EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); final S3InputSource withPrefixes = new S3InputSource( SERVICE, @@ -376,10 +377,7 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio public void testSerdeWithCloudConfigPropertiesWithSessionToken() throws Exception { EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject())) - .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) - .andReturn(SERVICE); + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG); EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); final S3InputSource withSessionToken = new S3InputSource( SERVICE, @@ -450,73 +448,6 @@ public void testGetSetSessionToken() Assert.assertNull(inputSourceWithoutSessionToken.getS3InputSourceConfig().getSessionToken()); } - @Test - public void testSessionCredentialsUsedWhenSessionTokenProvided() throws IOException - { - // This test verifies that when session token is provided, the S3InputSource - // correctly uses BasicSessionCredentials instead of BasicAWSCredentials - EasyMock.reset(S3_CLIENT); - expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT); - expectGetObject(EXPECTED_URIS.get(0)); - EasyMock.replay(S3_CLIENT); - - EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject())) - .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) - .andReturn(SERVICE); - EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - - // Create S3InputSource with session token - S3InputSource inputSource = new S3InputSource( - SERVICE, - SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER, - INPUT_DATA_CONFIG, - null, - ImmutableList.of(PREFIXES.get(0)), - null, - null, - CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN, - null, - null, - null - ); - - // Verify session token is set - Assert.assertNotNull(inputSource.getS3InputSourceConfig()); - Assert.assertNotNull(inputSource.getS3InputSourceConfig().getSessionToken()); - Assert.assertEquals( - "mySessionToken", - inputSource.getS3InputSourceConfig().getSessionToken().getPassword() - ); - - // Create a reader which will trigger the s3ClientSupplier and use the session credentials - InputRowSchema someSchema = new InputRowSchema( - new TimestampSpec("time", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), - ColumnsFilter.all() - ); - - InputSourceReader reader = inputSource.reader( - someSchema, - new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null), - temporaryFolder.newFolder() - ); - - // Read data - this exercises the session credentials path - CloseableIterator iterator = reader.read(); - - while (iterator.hasNext()) { - InputRow nextRow = iterator.next(); - Assert.assertEquals(NOW, nextRow.getTimestamp()); - Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); - Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); - } - - EasyMock.verify(S3_CLIENT); - EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - } - @Test public void testGetTypes() { @@ -556,6 +487,9 @@ public void testS3InputSourceUseEndPointClientProxy() EasyMock.expect(mockAwsClientConfig.isEnablePathStyleAccess()).andStubReturn(false); EasyMock.expect(mockAwsClientConfig.isCrossRegionAccessEnabled()).andStubReturn(true); EasyMock.expect(mockAwsClientConfig.getProtocol()).andStubReturn("http"); + EasyMock.expect(mockAwsClientConfig.getConnectionTimeoutMillis()).andStubReturn(10_000); + EasyMock.expect(mockAwsClientConfig.getSocketTimeoutMillis()).andStubReturn(50_000); + EasyMock.expect(mockAwsClientConfig.getMaxConnections()).andStubReturn(50); EasyMock.expect(mockAwsProxyConfig.getHost()).andStubReturn(""); EasyMock.expect(mockAwsProxyConfig.getPort()).andStubReturn(-1); @@ -563,6 +497,7 @@ public void testS3InputSourceUseEndPointClientProxy() EasyMock.expect(mockAwsProxyConfig.getPassword()).andStubReturn(""); EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null); + EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleExternalId()).andStubReturn(null); EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured()) .andStubReturn(false); EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret); @@ -571,10 +506,7 @@ public void testS3InputSourceUseEndPointClientProxy() EasyMock.replay(mockAwsProxyConfig); EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject())) - .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) - .andReturn(SERVICE); + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG); EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); final S3InputSource withPrefixes = new S3InputSource( SERVICE, @@ -604,14 +536,18 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class); EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret); EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null); + EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleExternalId()).andStubReturn(null); EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured()) .andStubReturn(false); EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret); + + S3StorageConfig mockS3StorageConfig = EasyMock.createMock(S3StorageConfig.class); + EasyMock.reset(mockS3StorageConfig); + EasyMock.expect(mockS3StorageConfig.getS3TransferConfig()).andStubReturn(new S3TransferConfig()); + EasyMock.expect(mockS3StorageConfig.getServerSideEncryption()).andStubReturn(new NoopServerSideEncryption()); + EasyMock.replay(mockS3StorageConfig); EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject())) - .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); - EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build()) - .andReturn(SERVICE); + EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(mockS3StorageConfig); EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); final S3InputSource withPrefixes = new S3InputSource( SERVICE, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java index 0a87c3a4c016..9cb311cec8ad 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java @@ -63,7 +63,7 @@ public void testExportManifestFilePath() { Assert.assertEquals( "s3://export-bucket/export/table/file1", - new S3ExportStorageProvider("export-bucket", "export/table").getFilePathForManifest("file1") + new S3ExportStorageProvider("export-bucket", "export/table", null, null).getFilePathForManifest("file1") ); } }