Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,37 +41,21 @@
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

public class S3InputSource extends CloudObjectInputSource
{
Expand Down Expand Up @@ -134,72 +118,20 @@ public S3InputSource(
this.awsClientConfig = awsClientConfig;
this.awsEndpointConfig = awsEndpointConfig;

this.s3ClientSupplier = Suppliers.memoize(
() -> {
if (s3ClientBuilder != null && s3InputSourceConfig != null) {
// Build a custom S3Client with the provided configuration
S3ClientBuilder customBuilder = S3Client.builder();

// Configure endpoint and region
if (awsEndpointConfig != null && awsEndpointConfig.getUrl() != null) {
String endpointUrl = awsEndpointConfig.getUrl();
// Ensure endpoint URL has a scheme
if (!endpointUrl.startsWith("http://") && !endpointUrl.startsWith("https://")) {
boolean useHttps = S3Utils.useHttps(awsClientConfig, awsEndpointConfig);
endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl, useHttps);
}
customBuilder.endpointOverride(URI.create(endpointUrl));
if (awsEndpointConfig.getSigningRegion() != null) {
customBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
}
}

// Configure S3-specific settings
if (awsClientConfig != null) {
S3Configuration.Builder s3ConfigBuilder = S3Configuration.builder()
.pathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess())
.chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding());
customBuilder.serviceConfiguration(s3ConfigBuilder.build());
customBuilder.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled());
}

// Configure HTTP client with proxy if needed
ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
if (awsProxyConfig != null && awsProxyConfig.getHost() != null) {
ProxyConfiguration proxyConfig = S3Utils.buildProxyConfiguration(awsProxyConfig);
if (proxyConfig != null) {
httpClientBuilder.proxyConfiguration(proxyConfig);
}
}
customBuilder.httpClientBuilder(httpClientBuilder);

// Configure credentials
AwsCredentialsProvider credentialsProvider;
if (s3InputSourceConfig.isCredentialsConfigured()) {
credentialsProvider = createStaticCredentialsProvider(s3InputSourceConfig);
} else {
credentialsProvider = awsCredentialsProvider;
}

// Apply assume role if configured
if (s3InputSourceConfig.getAssumeRoleArn() != null) {
credentialsProvider = createAssumeRoleCredentialsProvider(
s3InputSourceConfig,
credentialsProvider
);
}

customBuilder.credentialsProvider(credentialsProvider);

// Build and wrap in ServerSideEncryptingAmazonS3
return s3ClientBuilder
.setS3ClientSupplier(customBuilder::build)
.build();
} else {
return s3Client;
}
}
);
this.s3ClientSupplier = Suppliers.memoize(() -> {
if (s3ClientBuilder == null || s3InputSourceConfig == null) {
return s3Client;
}
return ServerSideEncryptingAmazonS3.builder(
awsCredentialsProvider,
s3ClientBuilder.getS3StorageConfig(),
awsProxyConfig,
awsEndpointConfig,
awsClientConfig,
s3InputSourceConfig,
null
).build();
});
this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
}

Expand Down Expand Up @@ -278,61 +210,6 @@ public Set<String> getTypes()
return Collections.singleton(TYPE_KEY);
}

private AwsCredentialsProvider createAssumeRoleCredentialsProvider(
S3InputSourceConfig s3InputSourceConfig,
AwsCredentialsProvider baseCredentialsProvider
)
{
String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString());

StsClientBuilder stsBuilder = StsClient.builder()
.credentialsProvider(baseCredentialsProvider);

// If we have endpoint config, use its region for STS too
if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() != null) {
stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
}

StsClient stsClient = stsBuilder.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleArn(assumeRoleArn)
.roleSessionName(roleSessionName)
.durationSeconds(3600);

if (s3InputSourceConfig.getAssumeRoleExternalId() != null) {
assumeRoleRequestBuilder.externalId(s3InputSourceConfig.getAssumeRoleExternalId());
}

return StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.asyncCredentialUpdateEnabled(true)
.staleTime(Duration.ofMinutes(3))
.build();
}

@Nonnull
private StaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
{
if (s3InputSourceConfig.getSessionToken() != null) {
AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword(),
s3InputSourceConfig.getSessionToken().getPassword()
);
return StaticCredentialsProvider.create(sessionCredentials);
} else {
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword()
)
);
}
}

@Nullable
@JsonProperty("properties")
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,9 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;

import javax.annotation.Nullable;
import java.net.URI;
Expand Down Expand Up @@ -150,75 +142,15 @@ public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Build
: ""
);
}

final boolean useHttps = S3Utils.useHttps(clientConfig, endpointConfig);
final URI endpointOverride = buildEndpointOverride(endpointConfig, useHttps);
final Region region = StringUtils.isNotEmpty(endpointConfig.getSigningRegion())
? Region.of(endpointConfig.getSigningRegion())
: null;

final Supplier<S3Client> s3ClientSupplier = () -> {
// Build HTTP client with proxy configuration
ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder()
.connectionTimeout(Duration.ofMillis(clientConfig.getConnectionTimeoutMillis()))
.socketTimeout(Duration.ofMillis(clientConfig.getSocketTimeoutMillis()))
.maxConnections(clientConfig.getMaxConnections());

ProxyConfiguration proxyConfiguration = S3Utils.buildProxyConfiguration(proxyConfig);
if (proxyConfiguration != null) {
httpClientBuilder.proxyConfiguration(proxyConfiguration);
}

// Build S3 configuration
// Note: forcePathStyle is configured on the S3ClientBuilder, not in S3Configuration
S3Configuration s3Configuration = S3Configuration.builder()
.chunkedEncodingEnabled(!clientConfig.isDisableChunkedEncoding())
.build();

S3ClientBuilder s3ClientBuilder = S3Client.builder()
.credentialsProvider(provider)
.httpClientBuilder(httpClientBuilder)
.serviceConfiguration(s3Configuration)
.forcePathStyle(clientConfig.isEnablePathStyleAccess())
.crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled());

if (endpointOverride != null) {
s3ClientBuilder.endpointOverride(endpointOverride);
}

if (region != null) {
s3ClientBuilder.region(region);
}

return s3ClientBuilder.build();
};

// Create async client supplier for S3TransferManager
final AsyncHttpClientType asyncHttpClientType =
AsyncHttpClientType.fromString(storageConfig.getS3TransferConfig().getAsyncHttpClientType());
final Supplier<S3AsyncClient> s3AsyncClientSupplier = () -> {
S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
.credentialsProvider(provider)
.httpClientBuilder(asyncHttpClientType.buildBuilder(clientConfig))
.forcePathStyle(clientConfig.isEnablePathStyleAccess())
.crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled())
.multipartEnabled(true);

if (endpointOverride != null) {
s3AsyncClientBuilder.endpointOverride(endpointOverride);
}

if (region != null) {
s3AsyncClientBuilder.region(region);
}

return s3AsyncClientBuilder.build();
};

return ServerSideEncryptingAmazonS3.builder()
.setS3ClientSupplier(s3ClientSupplier)
.setS3AsyncClientSupplier(s3AsyncClientSupplier)
.setS3StorageConfig(storageConfig);
return ServerSideEncryptingAmazonS3.builder(
provider,
storageConfig,
proxyConfig,
endpointConfig,
clientConfig,
null,
null
);
}

public enum AsyncHttpClientType
Expand Down
Loading
Loading