Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ public class AzureProperties implements Serializable {
/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
public static final String ADLS_REFRESH_CREDENTIALS_ENABLED = "adls.refresh-credentials-enabled";

/**
* Determines whether the stack trace of the call site is captured when an ADLS input or output
* stream is created. The captured stack is only used to report the creation site if a stream is
* garbage collected without being closed. Capturing it is relatively expensive, so it can be
* disabled to avoid the cost on the per-file stream creation path. ResolvingFileIO sets this to
* {@code false} for the FileIO it delegates to, because it already tracks that creation stack.
*/
public static final String CREATE_STACK_TRACE_ENABLED = "init-creation-stacktrace";

public static final boolean CREATE_STACK_TRACE_ENABLED_DEFAULT = true;

private Map<String, String> adlsSasTokens = Collections.emptyMap();
private Map<String, String> adlsConnectionStrings = Collections.emptyMap();
private Map.Entry<String, String> namedKeyCreds;
Expand All @@ -99,6 +110,7 @@ public class AzureProperties implements Serializable {
private Map<String, String> allProperties = Collections.emptyMap();
private String keyWrapAlgorithm;
private String keyVaultUrl;
private boolean createStackTraceEnabled = CREATE_STACK_TRACE_ENABLED_DEFAULT;

public AzureProperties() {}

Expand Down Expand Up @@ -130,6 +142,9 @@ public AzureProperties(Map<String, String> properties) {
properties.get(ADLS_REFRESH_CREDENTIALS_ENDPOINT));
this.adlsRefreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, ADLS_REFRESH_CREDENTIALS_ENABLED, true);
this.createStackTraceEnabled =
PropertyUtil.propertyAsBoolean(
properties, CREATE_STACK_TRACE_ENABLED, CREATE_STACK_TRACE_ENABLED_DEFAULT);
this.token = properties.get(ADLS_TOKEN);
this.allProperties = SerializableMap.copyOf(properties);
if (properties.containsKey(AZURE_KEYVAULT_URL)) {
Expand All @@ -149,6 +164,10 @@ public Optional<Long> adlsWriteBlockSize() {
return Optional.ofNullable(adlsWriteBlockSize);
}

public boolean isCreateStackTraceEnabled() {
return createStackTraceEnabled;
}

public Optional<VendedAdlsCredentialProvider> vendedAdlsCredentialProvider() {
if (adlsRefreshCredentialsEnabled && !Strings.isNullOrEmpty(adlsRefreshCredentialsEndpoint)) {
Map<String, String> credentialProviderProperties = Maps.newHashMap(allProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class ADLSInputStream extends SeekableInputStream implements RangeReadable {
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);

this.createStack = Thread.currentThread().getStackTrace();
this.createStack =
azureProperties.isCreateStackTraceEnabled() ? Thread.currentThread().getStackTrace() : null;

openStream();
}
Expand Down Expand Up @@ -222,8 +223,15 @@ protected void finalize() throws Throwable {
super.finalize();
if (!closed) {
close(); // releasing resources is more important than printing the warning
String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed input stream created by:\n\t{}", trace);
if (createStack != null) {
String trace =
Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed input stream created by:\n\t{}", trace);
} else {
LOG.warn(
"Unclosed input stream; enable '{}' to capture its creation stack.",
AzureProperties.CREATE_STACK_TRACE_ENABLED);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class ADLSOutputStream extends PositionOutputStream {
this.fileClient = fileClient;
this.azureProperties = azureProperties;

this.createStack = Thread.currentThread().getStackTrace();
this.createStack =
azureProperties.isCreateStackTraceEnabled() ? Thread.currentThread().getStackTrace() : null;

this.writeBytes = metrics.counter(FileIOMetricsContext.WRITE_BYTES, Unit.BYTES);
this.writeOperations = metrics.counter(FileIOMetricsContext.WRITE_OPERATIONS);
Expand Down Expand Up @@ -121,8 +122,15 @@ protected void finalize() throws Throwable {
super.finalize();
if (!closed) {
close(); // releasing resources is more important than printing the warning
String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed output stream created by:\n\t{}", trace);
if (createStack != null) {
String trace =
Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length));
LOG.warn("Unclosed output stream created by:\n\t{}", trace);
} else {
LOG.warn(
"Unclosed output stream; enable '{}' to capture its creation stack.",
AzureProperties.CREATE_STACK_TRACE_ENABLED);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.iceberg.azure.AzureProperties.ADLS_WRITE_BLOCK_SIZE;
import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_KEY_WRAP_ALGORITHM;
import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_URL;
import static org.apache.iceberg.azure.AzureProperties.CREATE_STACK_TRACE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -78,13 +79,28 @@ public void testSerializable(TestHelpers.RoundTripSerializer<AzureProperties> ro
.put(AzureProperties.ADLS_TOKEN_PROVIDER_PREFIX + "client-id", "clientId")
.put(AZURE_KEYVAULT_URL, "https://test-key-vault.vault.azure.net")
.put(AZURE_KEYVAULT_KEY_WRAP_ALGORITHM, KeyWrapAlgorithm.RSA1_5.getValue())
.put(CREATE_STACK_TRACE_ENABLED, "false")
.build());

AzureProperties serdedProps = roundTripSerializer.apply(props);
assertThat(serdedProps.adlsReadBlockSize()).isEqualTo(props.adlsReadBlockSize());
assertThat(serdedProps.adlsWriteBlockSize()).isEqualTo(props.adlsWriteBlockSize());
assertThat(serdedProps.keyVaultUrl()).isEqualTo(props.keyVaultUrl());
assertThat(serdedProps.keyWrapAlgorithm()).isEqualTo(props.keyWrapAlgorithm());
assertThat(serdedProps.isCreateStackTraceEnabled())
.isEqualTo(props.isCreateStackTraceEnabled());
}

@Test
public void testCreateStackTraceEnabled() {
assertThat(new AzureProperties(ImmutableMap.of()).isCreateStackTraceEnabled())
.isEqualTo(AzureProperties.CREATE_STACK_TRACE_ENABLED_DEFAULT)
.isTrue();
assertThat(new AzureProperties().isCreateStackTraceEnabled()).isTrue();
assertThat(
new AzureProperties(ImmutableMap.of(CREATE_STACK_TRACE_ENABLED, "false"))
.isCreateStackTraceEnabled())
.isFalse();
}

@Test
Expand Down