From 359b225e7117539eaa8d322a7634e89b9b5c90bf Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 28 Apr 2026 14:24:47 +0200 Subject: [PATCH] [FLINK-39561][security] Extend sensitive key redaction with missing access key patterns and user-configurable additional keys --- .../checkpointing_configuration.html | 12 +- .../common_checkpointing_section.html | 12 +- .../generated/security_configuration.html | 6 + .../flink/configuration/Configuration.java | 7 +- .../configuration/ConfigurationUtils.java | 10 +- .../configuration/GlobalConfiguration.java | 27 ++++- .../flink/configuration/SecurityOptions.java | 23 ++++ .../configuration/ConfigurationTest.java | 7 +- .../configuration/ConfigurationUtilsTest.java | 3 +- .../GlobalConfigurationTest.java | 106 ++++++++++++++---- .../flink/client/python/PythonEnvUtils.java | 16 ++- .../client/python/PythonEnvUtilsTest.java | 5 +- .../rpc/pekko/ActorSystemBootstrapTools.java | 4 +- .../rest/handler/job/JobConfigHandler.java | 13 ++- .../rest/messages/ConfigurationInfo.java | 4 +- .../runtime/rest/messages/JobConfigInfo.java | 8 +- .../runtime/util/EnvironmentInformation.java | 3 +- .../webmonitor/WebMonitorEndpoint.java | 4 +- .../handler/job/JobConfigHandlerTest.java | 5 +- .../flink/table/factories/FactoryUtil.java | 49 ++++++-- .../WorkflowSchedulerFactoryUtil.java | 6 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 3 +- 22 files changed, 261 insertions(+), 72 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index b5e4025442105..704ac492aa8a2 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -44,6 +44,12 @@ String The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory. + +
execution.checkpointing.during-recovery.enabled
+ false + Boolean + Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.

This option requires execution.checkpointing.unaligned.recover-output-on-downstream.enabled to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution. +
execution.checkpointing.externalized-checkpoint-retention
NO_EXTERNALIZED_CHECKPOINTS @@ -158,12 +164,6 @@ Integer The tolerable checkpoint consecutive failure number. If set to 0, that means we do not tolerance any checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored. - -
execution.checkpointing.during-recovery.enabled
- false - Boolean - Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.

This option requires execution.checkpointing.unaligned.recover-output-on-downstream.enabled to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution. -
execution.checkpointing.unaligned.enabled
false diff --git a/docs/layouts/shortcodes/generated/common_checkpointing_section.html b/docs/layouts/shortcodes/generated/common_checkpointing_section.html index 938a86ddde5eb..1a5159022eb7e 100644 --- a/docs/layouts/shortcodes/generated/common_checkpointing_section.html +++ b/docs/layouts/shortcodes/generated/common_checkpointing_section.html @@ -32,6 +32,12 @@ Boolean Option whether to discard a checkpoint's states in parallel using the ExecutorService passed into the cleaner + +
execution.checkpointing.during-recovery.enabled
+ false + Boolean + Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.

This option requires execution.checkpointing.unaligned.recover-output-on-downstream.enabled to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution. +
execution.checkpointing.incremental
false @@ -56,12 +62,6 @@ Integer The maximum number of completed checkpoints to retain. - -
execution.checkpointing.during-recovery.enabled
- false - Boolean - Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.

This option requires execution.checkpointing.unaligned.recover-output-on-downstream.enabled to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution. -
execution.checkpointing.unaligned.recover-output-on-downstream.enabled
false diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html index 9bd0e39823b71..fdf554c6670bd 100644 --- a/docs/layouts/shortcodes/generated/security_configuration.html +++ b/docs/layouts/shortcodes/generated/security_configuration.html @@ -86,6 +86,12 @@ List<String> List of factories that should be used to instantiate security modules. All listed modules will be installed. Keep in mind that the configured security context might rely on some modules being present. + +
security.redaction.additional-keys
+ + List<String> + Comma-separated list of additional configuration key substrings whose values should be redacted in logs and REST API responses. Matching is case-insensitive and based on substring containment. The built-in sensitive key patterns are immutable and cannot be overridden via this option. +
security.ssl.algorithms
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index d7294752788b7..6676d1a7dab27 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -376,7 +376,9 @@ public Optional getOptional(ConfigOption option) { } } catch (Exception e) { throw new IllegalArgumentException( - GlobalConfiguration.isSensitive(option.key()) + GlobalConfiguration.isSensitive( + option.key(), + this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)) ? String.format("Could not parse value for key '%s'.", option.key()) : String.format( "Could not parse value '%s' for key '%s'.", @@ -680,7 +682,8 @@ public String toString() { .collect( Collectors.toMap( Map.Entry::getKey, - entry -> entry.getValue().toString()))) + entry -> entry.getValue().toString())), + this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)) .toString(); } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 338f9b660dc34..af3b44da5baab 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -170,19 +170,23 @@ public static Configuration createConfiguration(Properties properties) { /** * Replaces values whose keys are sensitive according to {@link - * GlobalConfiguration#isSensitive(String)} with {@link GlobalConfiguration#HIDDEN_CONTENT}. + * GlobalConfiguration#isSensitive(String, List)} with {@link + * GlobalConfiguration#HIDDEN_CONTENT}. * *

This can be useful when displaying configuration values. * * @param keyValuePairs for which to hide sensitive values + * @param additionalSensitiveKeys user-defined additional sensitive key substrings; use {@link + * SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain these from a loaded configuration * @return A map where all sensitive value are hidden */ @Nonnull - public static Map hideSensitiveValues(Map keyValuePairs) { + public static Map hideSensitiveValues( + Map keyValuePairs, List additionalSensitiveKeys) { final HashMap result = new HashMap<>(); for (Map.Entry keyValuePair : keyValuePairs.entrySet()) { - if (GlobalConfiguration.isSensitive(keyValuePair.getKey())) { + if (GlobalConfiguration.isSensitive(keyValuePair.getKey(), additionalSensitiveKeys)) { result.put(keyValuePair.getKey(), GlobalConfiguration.HIDDEN_CONTENT); } else { result.put(keyValuePair.getKey(), keyValuePair.getValue()); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 513ab677a990e..878f7304a3ca1 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -58,7 +58,10 @@ public final class GlobalConfiguration { "token", "basic-auth", "jaas.config", - "http-headers" + "http-headers", + "access-key", + "access.key", + "accesskey" }; // the hidden content to be displayed @@ -151,24 +154,27 @@ public static Configuration loadConfiguration( configuration = loadYAMLResource(yamlConfigFile); } - logConfiguration("Loading", configuration); + final List additionalKeys = + configuration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS); + logConfiguration("Loading", configuration, additionalKeys); if (dynamicProperties != null) { - logConfiguration("Loading dynamic", dynamicProperties); + logConfiguration("Loading dynamic", dynamicProperties, additionalKeys); configuration.addAll(dynamicProperties); } return configuration; } - private static void logConfiguration(String prefix, Configuration config) { + private static void logConfiguration( + String prefix, Configuration config, List additionalKeys) { config.confData.forEach( (key, value) -> LOG.info( "{} configuration property: {}, {}", prefix, key, - isSensitive(key) ? HIDDEN_CONTENT : value)); + isSensitive(key, additionalKeys) ? HIDDEN_CONTENT : value)); } /** @@ -263,8 +269,11 @@ private static Configuration loadYAMLResource(File file) { * Check whether the key is a hidden key. * * @param key the config key + * @param additionalKeys user-defined additional sensitive key substrings to check in addition + * to the built-in list; use {@link SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain + * these from a loaded {@link Configuration} */ - public static boolean isSensitive(String key) { + public static boolean isSensitive(String key, List additionalKeys) { Preconditions.checkNotNull(key, "key is null"); final String keyInLower = key.toLowerCase(); for (String hideKey : SENSITIVE_KEYS) { @@ -272,6 +281,12 @@ public static boolean isSensitive(String key) { return true; } } + for (String hideKey : additionalKeys) { + final String hideKeyLower = hideKey.toLowerCase(); + if (keyInLower.length() >= hideKeyLower.length() && keyInLower.contains(hideKeyLower)) { + return true; + } + } return false; } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 7ec79f0c80a5b..32d8ee7a1e65a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -649,4 +649,27 @@ public static boolean isRestSSLAuthenticationEnabled(Configuration sslConfig) { checkNotNull(sslConfig, "sslConfig"); return isRestSSLEnabled(sslConfig) && sslConfig.get(SSL_REST_AUTHENTICATION_ENABLED); } + + // ------------------------------------------------------------------------ + // Sensitive key redaction + // ------------------------------------------------------------------------ + + /** + * Additional sensitive key substrings to redact beyond the built-in list. + * + *

Values are matched case-insensitively as substrings of configuration key names. The + * built-in sensitive key patterns are immutable and cannot be overridden or removed via this + * option. + */ + public static final ConfigOption> ADDITIONAL_SENSITIVE_KEYS = + key("security.redaction.additional-keys") + .stringType() + .asList() + .defaultValues() + .withDescription( + "Comma-separated list of additional configuration key substrings whose" + + " values should be redacted in logs and REST API responses." + + " Matching is case-insensitive and based on substring containment." + + " The built-in sensitive key patterns are immutable and cannot" + + " be overridden via this option."); } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index c61819fa08826..758be3f4019d9 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -481,7 +482,7 @@ void testListParserErrorDoesNotLeakSensitiveData() { ConfigOption> secret = ConfigOptions.key("secret").stringType().asList().noDefaultValue(); - assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue(); + assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue(); final Configuration cfg = new Configuration(); // missing closing quote @@ -500,7 +501,7 @@ void testMapParserErrorDoesNotLeakSensitiveData() { ConfigOption> secret = ConfigOptions.key("secret").mapType().noDefaultValue(); - assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue(); + assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue(); final Configuration cfg = new Configuration(); // malformed map representation @@ -519,7 +520,7 @@ void testToStringDoesNotLeakSensitiveData() { ConfigOption> secret = ConfigOptions.key("secret").mapType().noDefaultValue(); - assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue(); + assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue(); final Configuration cfg = new Configuration(); cfg.setString(secret.key(), "secret_value"); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java index 12c1dab1def94..7b4b11496ed6f 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java @@ -26,6 +26,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -173,7 +174,7 @@ void testHideSensitiveValues() { } final Map hiddenSensitiveValues = - ConfigurationUtils.hideSensitiveValues(keyValuePairs); + ConfigurationUtils.hideSensitiveValues(keyValuePairs, Collections.emptyList()); assertThat(hiddenSensitiveValues).isEqualTo(expectedKeyValuePairs); } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java index 5f57777d264cc..24340dc46ac2f 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -135,29 +136,94 @@ void testInvalidStandardYamlFile() throws IOException { @Test void testHiddenKey() { - assertThat(GlobalConfiguration.isSensitive("password123")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("123pasSword")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("PasSword")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("Secret")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("polaris.client-secret")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("client-secret")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("service-key-json")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("auth.basic.password")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("auth.basic.token")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("avro-confluent.basic-auth.user-info")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("key.avro-confluent.basic-auth.user-info")) - .isTrue(); - assertThat(GlobalConfiguration.isSensitive("value.avro-confluent.basic-auth.user-info")) - .isTrue(); - assertThat(GlobalConfiguration.isSensitive("kafka.jaas.config")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("properties.ssl.truststore.password")).isTrue(); - assertThat(GlobalConfiguration.isSensitive("properties.ssl.keystore.password")).isTrue(); + assertThat(GlobalConfiguration.isSensitive("password123", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("123pasSword", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("PasSword", Collections.emptyList())).isTrue(); + assertThat(GlobalConfiguration.isSensitive("Secret", Collections.emptyList())).isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "polaris.client-secret", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("client-secret", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("service-key-json", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("auth.basic.password", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("auth.basic.token", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "avro-confluent.basic-auth.user-info", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "key.avro-confluent.basic-auth.user-info", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "value.avro-confluent.basic-auth.user-info", + Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("kafka.jaas.config", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "properties.ssl.truststore.password", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "properties.ssl.keystore.password", Collections.emptyList())) + .isTrue(); + assertThat( + GlobalConfiguration.isSensitive( + "fs.azure.account.key.storageaccount123456.core.windows.net", + Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("Hello", Collections.emptyList())).isFalse(); + assertThat( + GlobalConfiguration.isSensitive( + "metrics.reporter.dghttp.apikey", Collections.emptyList())) + .isTrue(); + // access-key / access.key / accesskey patterns + assertThat(GlobalConfiguration.isSensitive("s3.access-key", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("fs.s3a.access.key", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("s3.access.key", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("fs.oss.accessKeyId", Collections.emptyList())) + .isTrue(); + assertThat(GlobalConfiguration.isSensitive("fs.oss.accesskey", Collections.emptyList())) + .isTrue(); + } + + @Test + void testAdditionalSensitiveKeys() { + assertThat( + GlobalConfiguration.isSensitive( + "my.custom.credential", + Arrays.asList("my.custom.credential", "VENDOR_TOKEN_ID"))) + .isTrue(); assertThat( GlobalConfiguration.isSensitive( - "fs.azure.account.key.storageaccount123456.core.windows.net")) + "prefix.my.custom.credential.suffix", + Arrays.asList("my.custom.credential"))) .isTrue(); - assertThat(GlobalConfiguration.isSensitive("Hello")).isFalse(); - assertThat(GlobalConfiguration.isSensitive("metrics.reporter.dghttp.apikey")).isTrue(); + // case-insensitive matching + assertThat( + GlobalConfiguration.isSensitive( + "vendor_token_id", Arrays.asList("VENDOR_TOKEN_ID"))) + .isTrue(); + // built-in keys are unaffected when additional list is empty + assertThat(GlobalConfiguration.isSensitive("password", Collections.emptyList())).isTrue(); + // unrelated key not matched + assertThat( + GlobalConfiguration.isSensitive( + "unrelated.key", Arrays.asList("my.custom.credential"))) + .isFalse(); } } diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java index 94c6db2656c2b..bd816b6424abf 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.python.util.PythonDependencyUtils; import org.apache.flink.util.CompressionUtils; @@ -342,7 +343,10 @@ private static void addToPythonPath(PythonEnvironment env, List pythonFile * @throws IOException Thrown if an error occurred when python process start. */ static Process startPythonProcess( - PythonEnvironment pythonEnv, List commands, boolean redirectToPipe) + PythonEnvironment pythonEnv, + List commands, + boolean redirectToPipe, + List additionalSensitiveKeys) throws IOException { ProcessBuilder pythonProcessBuilder = new ProcessBuilder(); Map env = pythonProcessBuilder.environment(); @@ -372,7 +376,9 @@ static Process startPythonProcess( } LOG.info( "Starting Python process with environment variables: {{}}, command: {}", - ConfigurationUtils.hideSensitiveValues(env).entrySet().stream() + ConfigurationUtils.hideSensitiveValues(env, additionalSensitiveKeys) + .entrySet() + .stream() .map(e -> e.getKey() + "=" + e.getValue()) .collect(Collectors.joining(", ")), String.join(" ", commands)); @@ -495,7 +501,11 @@ static Process launchPy4jPythonClient( pythonEnv.systemEnv.put( "PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); // start the python process. - return PythonEnvUtils.startPythonProcess(pythonEnv, commands, redirectToPipe); + return PythonEnvUtils.startPythonProcess( + pythonEnv, + commands, + redirectToPipe, + config.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); } public static void setPythonException(Throwable pythonException) { diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java index 7f45d4e3bc50f..cf79a9a377ddc 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java @@ -38,6 +38,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -160,7 +161,9 @@ void testStartPythonProcess() { String result = String.join(File.separator, tmpDirPath, "python_working_directory.txt"); commands.add(pyPath); commands.add(result); - Process pythonProcess = PythonEnvUtils.startPythonProcess(pythonEnv, commands, false); + Process pythonProcess = + PythonEnvUtils.startPythonProcess( + pythonEnv, commands, false, Collections.emptyList()); int exitCode = pythonProcess.waitFor(); if (exitCode != 0) { throw new RuntimeException("Python process exits with code: " + exitCode); diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index a206e4ef08bef..9d6ff5e8b1507 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.net.BindException; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.Optional; @@ -257,7 +258,8 @@ static Map toMaskedMap(Config config) { .collect( Collectors.toMap( Map.Entry::getKey, - entry -> String.valueOf(entry.getValue().unwrapped())))); + entry -> String.valueOf(entry.getValue().unwrapped()))), + Collections.emptyList()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index 649f3cee13e2b..e8650e1ca56eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -37,6 +37,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -45,13 +46,16 @@ public class JobConfigHandler extends AbstractAccessExecutionGraphHandler implements OnlyExecutionGraphJsonArchivist { + private final List additionalSensitiveKeys; + public JobConfigHandler( GatewayRetriever leaderRetriever, Duration timeout, Map responseHeaders, MessageHeaders messageHeaders, ExecutionGraphCache executionGraphCache, - Executor executor) { + Executor executor, + List additionalSensitiveKeys) { super( leaderRetriever, @@ -60,6 +64,7 @@ public JobConfigHandler( messageHeaders, executionGraphCache, executor); + this.additionalSensitiveKeys = additionalSensitiveKeys; } @Override @@ -79,12 +84,14 @@ public Collection archiveJsonWithPath(AccessExecutionGraph graph) return Collections.singleton(new ArchivedJson(path, json)); } - private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph executionGraph) { + private JobConfigInfo createJobConfigInfo(AccessExecutionGraph executionGraph) { final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig(); final JobConfigInfo.ExecutionConfigInfo executionConfigInfo; if (executionConfig != null) { - executionConfigInfo = JobConfigInfo.ExecutionConfigInfo.from(executionConfig); + executionConfigInfo = + JobConfigInfo.ExecutionConfigInfo.from( + executionConfig, additionalSensitiveKeys); } else { executionConfigInfo = null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java index e0dcd23ae47bd..82abe13c6f527 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; @@ -51,7 +52,8 @@ public boolean isEmpty() { public static ConfigurationInfo from(Configuration config) { final ConfigurationInfo clusterConfig = new ConfigurationInfo(config.keySet().size()); final Map configurationWithHiddenSensitiveValues = - ConfigurationUtils.hideSensitiveValues(config.toMap()); + ConfigurationUtils.hideSensitiveValues( + config.toMap(), config.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); for (Map.Entry keyValuePair : configurationWithHiddenSensitiveValues.entrySet()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java index 77d007cf2a25d..80d62bab8d22d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -235,13 +236,16 @@ public int hashCode() { return Objects.hash(restartStrategy, parallelism, isObjectReuse, globalJobParameters); } - public static ExecutionConfigInfo from(ArchivedExecutionConfig archivedExecutionConfig) { + public static ExecutionConfigInfo from( + ArchivedExecutionConfig archivedExecutionConfig, + List additionalSensitiveKeys) { return new ExecutionConfigInfo( archivedExecutionConfig.getRestartStrategyDescription(), archivedExecutionConfig.getParallelism(), archivedExecutionConfig.getObjectReuseEnabled(), ConfigurationUtils.hideSensitiveValues( - archivedExecutionConfig.getGlobalJobParameters())); + archivedExecutionConfig.getGlobalJobParameters(), + additionalSensitiveKeys)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java index eb17d2235407b..96c9704e2e8eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java @@ -34,6 +34,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -466,7 +467,7 @@ public static void logEnvironmentInfo( } else { log.info(" Program Arguments:"); for (String s : commandLineArgs) { - if (GlobalConfiguration.isSensitive(s)) { + if (GlobalConfiguration.isSensitive(s, Collections.emptyList())) { log.info( " " + GlobalConfiguration.HIDDEN_CONTENT diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 9e3f295e6e718..16cf06d4dd966 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.application.ArchivedApplication; import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; @@ -402,7 +403,8 @@ protected List> initiali responseHeaders, JobConfigHeaders.getInstance(), executionGraphCache, - executor); + executor, + clusterConfiguration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS)); JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler = new JobManagerJobConfigurationHandler( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java index c2d2968e5ed7d..b0a4e69f87faa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java @@ -56,7 +56,8 @@ void handleRequest_executionConfigWithSecretValues_excludesSecretValuesFromRespo Collections.emptyMap(), JobConfigHeaders.getInstance(), new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, TestingUtils.TIMEOUT), - Executors.directExecutor()); + Executors.directExecutor(), + Collections.emptyList()); final Map globalJobParameters = new HashMap<>(); globalJobParameters.put("foobar", "barfoo"); @@ -85,7 +86,7 @@ void handleRequest_executionConfigWithSecretValues_excludesSecretValuesFromRespo } private Map filterSecretValues(Map globalJobParameters) { - return ConfigurationUtils.hideSensitiveValues(globalJobParameters); + return ConfigurationUtils.hideSensitiveValues(globalJobParameters, Collections.emptyList()); } private HandlerRequest createRequest(JobID jobId) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 594a7bf1d5d9b..a61da0902ae9d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.FallbackKey; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; @@ -248,7 +249,14 @@ public static DynamicTableSource createDynamicTableSource( + "%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream() - .map(e -> stringifyOption(e.getKey(), e.getValue())) + .map( + e -> + stringifyOption( + e.getKey(), + e.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); @@ -294,7 +302,14 @@ public static DynamicTableSink createDynamicTableSink( + "%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream() - .map(e -> stringifyOption(e.getKey(), e.getValue())) + .map( + e -> + stringifyOption( + e.getKey(), + e.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); @@ -334,7 +349,14 @@ public static ModelProvider createModelProvider( + "%s", objectIdentifier.asSummaryString(), catalogModel.getOptions().entrySet().stream() - .map(e -> stringifyOption(e.getKey(), e.getValue())) + .map( + e -> + stringifyOption( + e.getKey(), + e.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); @@ -460,7 +482,10 @@ public static Catalog createCatalog( optionEntry -> stringifyOption( optionEntry.getKey(), - optionEntry.getValue())) + optionEntry.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); @@ -509,7 +534,10 @@ public static Module createModule( optionEntry -> stringifyOption( optionEntry.getKey(), - optionEntry.getValue())) + optionEntry.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); @@ -756,7 +784,11 @@ private static ValidationException enrichNoMatchingConnectorError( return new ValidationException( String.format( "Cannot discover a connector using option: %s", - stringifyOption(CONNECTOR.key(), connectorOption)), + stringifyOption( + CONNECTOR.key(), + connectorOption, + context.getConfiguration() + .get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))), e); } @@ -818,8 +850,9 @@ static List discoverFactories(ClassLoader classLoader) { return loadResults; } - public static String stringifyOption(String key, String value) { - if (GlobalConfiguration.isSensitive(key)) { + public static String stringifyOption( + String key, String value, List additionalSensitiveKeys) { + if (GlobalConfiguration.isSensitive(key, additionalSensitiveKeys)) { value = HIDDEN_CONTENT; } return String.format( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java index e8d1a6333f6b5..91321a0c1e515 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.workflow.WorkflowScheduler; import org.apache.flink.util.StringUtils; @@ -84,7 +85,10 @@ private WorkflowSchedulerFactoryUtil() { optionEntry -> stringifyOption( optionEntry.getKey(), - optionEntry.getValue())) + optionEntry.getValue(), + configuration.get( + SecurityOptions + .ADDITIONAL_SENSITIVE_KEYS))) .sorted() .collect(Collectors.joining("\n"))), t); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 66fe7b1b5ea19..84cad5d79eb47 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -784,7 +784,8 @@ private String encodeDynamicProperties(final CommandLine cmd) { LOG.info( "Dynamic Property set: {}={}", key, - GlobalConfiguration.isSensitive(key) + GlobalConfiguration.isSensitive( + key, Collections.emptyList()) ? GlobalConfiguration.HIDDEN_CONTENT : value);