diff --git a/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java b/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java index 383bec30111b..80e5c485124f 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java +++ b/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java @@ -88,6 +88,17 @@ public class AzureProperties implements Serializable { /** Controls whether vended credentials should be refreshed or not. Defaults to true. */ public static final String ADLS_REFRESH_CREDENTIALS_ENABLED = "adls.refresh-credentials-enabled"; + /** + * Determines whether the stack trace of the call site is captured when an ADLS input or output + * stream is created. The captured stack is only used to report the creation site if a stream is + * garbage collected without being closed. Capturing it is relatively expensive, so it can be + * disabled to avoid the cost on the per-file stream creation path. ResolvingFileIO sets this to + * {@code false} for the FileIO it delegates to, because it already tracks that creation stack. + */ + public static final String CREATE_STACK_TRACE_ENABLED = "init-creation-stacktrace"; + + public static final boolean CREATE_STACK_TRACE_ENABLED_DEFAULT = true; + private Map adlsSasTokens = Collections.emptyMap(); private Map adlsConnectionStrings = Collections.emptyMap(); private Map.Entry namedKeyCreds; @@ -99,6 +110,7 @@ public class AzureProperties implements Serializable { private Map allProperties = Collections.emptyMap(); private String keyWrapAlgorithm; private String keyVaultUrl; + private boolean createStackTraceEnabled = CREATE_STACK_TRACE_ENABLED_DEFAULT; public AzureProperties() {} @@ -130,6 +142,9 @@ public AzureProperties(Map properties) { properties.get(ADLS_REFRESH_CREDENTIALS_ENDPOINT)); this.adlsRefreshCredentialsEnabled = PropertyUtil.propertyAsBoolean(properties, ADLS_REFRESH_CREDENTIALS_ENABLED, true); + this.createStackTraceEnabled = + PropertyUtil.propertyAsBoolean( + properties, CREATE_STACK_TRACE_ENABLED, CREATE_STACK_TRACE_ENABLED_DEFAULT); this.token = properties.get(ADLS_TOKEN); this.allProperties = SerializableMap.copyOf(properties); if (properties.containsKey(AZURE_KEYVAULT_URL)) { @@ -149,6 +164,10 @@ public Optional adlsWriteBlockSize() { return Optional.ofNullable(adlsWriteBlockSize); } + public boolean isCreateStackTraceEnabled() { + return createStackTraceEnabled; + } + public Optional vendedAdlsCredentialProvider() { if (adlsRefreshCredentialsEnabled && !Strings.isNullOrEmpty(adlsRefreshCredentialsEndpoint)) { Map credentialProviderProperties = Maps.newHashMap(allProperties); diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java index bb96fc890010..41a666ff8b09 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSInputStream.java @@ -77,7 +77,8 @@ class ADLSInputStream extends SeekableInputStream implements RangeReadable { this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES); this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); - this.createStack = Thread.currentThread().getStackTrace(); + this.createStack = + azureProperties.isCreateStackTraceEnabled() ? Thread.currentThread().getStackTrace() : null; openStream(); } @@ -222,8 +223,15 @@ protected void finalize() throws Throwable { super.finalize(); if (!closed) { close(); // releasing resources is more important than printing the warning - String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); - LOG.warn("Unclosed input stream created by:\n\t{}", trace); + if (createStack != null) { + String trace = + Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); + LOG.warn("Unclosed input stream created by:\n\t{}", trace); + } else { + LOG.warn( + "Unclosed input stream; enable '{}' to capture its creation stack.", + AzureProperties.CREATE_STACK_TRACE_ENABLED); + } } } diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSOutputStream.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSOutputStream.java index dfe1130e0dc3..f89bf1fa6a09 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSOutputStream.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSOutputStream.java @@ -56,7 +56,8 @@ class ADLSOutputStream extends PositionOutputStream { this.fileClient = fileClient; this.azureProperties = azureProperties; - this.createStack = Thread.currentThread().getStackTrace(); + this.createStack = + azureProperties.isCreateStackTraceEnabled() ? Thread.currentThread().getStackTrace() : null; this.writeBytes = metrics.counter(FileIOMetricsContext.WRITE_BYTES, Unit.BYTES); this.writeOperations = metrics.counter(FileIOMetricsContext.WRITE_OPERATIONS); @@ -121,8 +122,15 @@ protected void finalize() throws Throwable { super.finalize(); if (!closed) { close(); // releasing resources is more important than printing the warning - String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); - LOG.warn("Unclosed output stream created by:\n\t{}", trace); + if (createStack != null) { + String trace = + Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); + LOG.warn("Unclosed output stream created by:\n\t{}", trace); + } else { + LOG.warn( + "Unclosed output stream; enable '{}' to capture its creation stack.", + AzureProperties.CREATE_STACK_TRACE_ENABLED); + } } } } diff --git a/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java b/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java index c301d4de4741..726820f347aa 100644 --- a/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java +++ b/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.azure.AzureProperties.ADLS_WRITE_BLOCK_SIZE; import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_KEY_WRAP_ALGORITHM; import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_URL; +import static org.apache.iceberg.azure.AzureProperties.CREATE_STACK_TRACE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.ArgumentMatchers.any; @@ -78,6 +79,7 @@ public void testSerializable(TestHelpers.RoundTripSerializer ro .put(AzureProperties.ADLS_TOKEN_PROVIDER_PREFIX + "client-id", "clientId") .put(AZURE_KEYVAULT_URL, "https://test-key-vault.vault.azure.net") .put(AZURE_KEYVAULT_KEY_WRAP_ALGORITHM, KeyWrapAlgorithm.RSA1_5.getValue()) + .put(CREATE_STACK_TRACE_ENABLED, "false") .build()); AzureProperties serdedProps = roundTripSerializer.apply(props); @@ -85,6 +87,20 @@ public void testSerializable(TestHelpers.RoundTripSerializer ro assertThat(serdedProps.adlsWriteBlockSize()).isEqualTo(props.adlsWriteBlockSize()); assertThat(serdedProps.keyVaultUrl()).isEqualTo(props.keyVaultUrl()); assertThat(serdedProps.keyWrapAlgorithm()).isEqualTo(props.keyWrapAlgorithm()); + assertThat(serdedProps.isCreateStackTraceEnabled()) + .isEqualTo(props.isCreateStackTraceEnabled()); + } + + @Test + public void testCreateStackTraceEnabled() { + assertThat(new AzureProperties(ImmutableMap.of()).isCreateStackTraceEnabled()) + .isEqualTo(AzureProperties.CREATE_STACK_TRACE_ENABLED_DEFAULT) + .isTrue(); + assertThat(new AzureProperties().isCreateStackTraceEnabled()).isTrue(); + assertThat( + new AzureProperties(ImmutableMap.of(CREATE_STACK_TRACE_ENABLED, "false")) + .isCreateStackTraceEnabled()) + .isFalse(); } @Test