option) {
}
} catch (Exception e) {
throw new IllegalArgumentException(
- GlobalConfiguration.isSensitive(option.key())
+ GlobalConfiguration.isSensitive(
+ option.key(),
+ this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
? String.format("Could not parse value for key '%s'.", option.key())
: String.format(
"Could not parse value '%s' for key '%s'.",
@@ -680,7 +682,8 @@ public String toString() {
.collect(
Collectors.toMap(
Map.Entry::getKey,
- entry -> entry.getValue().toString())))
+ entry -> entry.getValue().toString())),
+ this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
.toString();
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 338f9b660dc34..af3b44da5baab 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -170,19 +170,23 @@ public static Configuration createConfiguration(Properties properties) {
/**
* Replaces values whose keys are sensitive according to {@link
- * GlobalConfiguration#isSensitive(String)} with {@link GlobalConfiguration#HIDDEN_CONTENT}.
+ * GlobalConfiguration#isSensitive(String, List)} with {@link
+ * GlobalConfiguration#HIDDEN_CONTENT}.
*
* This can be useful when displaying configuration values.
*
* @param keyValuePairs for which to hide sensitive values
+ * @param additionalSensitiveKeys user-defined additional sensitive key substrings; use {@link
+ * SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain these from a loaded configuration
* @return A map where all sensitive value are hidden
*/
@Nonnull
- public static Map hideSensitiveValues(Map keyValuePairs) {
+ public static Map hideSensitiveValues(
+ Map keyValuePairs, List additionalSensitiveKeys) {
final HashMap result = new HashMap<>();
for (Map.Entry keyValuePair : keyValuePairs.entrySet()) {
- if (GlobalConfiguration.isSensitive(keyValuePair.getKey())) {
+ if (GlobalConfiguration.isSensitive(keyValuePair.getKey(), additionalSensitiveKeys)) {
result.put(keyValuePair.getKey(), GlobalConfiguration.HIDDEN_CONTENT);
} else {
result.put(keyValuePair.getKey(), keyValuePair.getValue());
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 513ab677a990e..878f7304a3ca1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -58,7 +58,10 @@ public final class GlobalConfiguration {
"token",
"basic-auth",
"jaas.config",
- "http-headers"
+ "http-headers",
+ "access-key",
+ "access.key",
+ "accesskey"
};
// the hidden content to be displayed
@@ -151,24 +154,27 @@ public static Configuration loadConfiguration(
configuration = loadYAMLResource(yamlConfigFile);
}
- logConfiguration("Loading", configuration);
+ final List additionalKeys =
+ configuration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
+ logConfiguration("Loading", configuration, additionalKeys);
if (dynamicProperties != null) {
- logConfiguration("Loading dynamic", dynamicProperties);
+ logConfiguration("Loading dynamic", dynamicProperties, additionalKeys);
configuration.addAll(dynamicProperties);
}
return configuration;
}
- private static void logConfiguration(String prefix, Configuration config) {
+ private static void logConfiguration(
+ String prefix, Configuration config, List additionalKeys) {
config.confData.forEach(
(key, value) ->
LOG.info(
"{} configuration property: {}, {}",
prefix,
key,
- isSensitive(key) ? HIDDEN_CONTENT : value));
+ isSensitive(key, additionalKeys) ? HIDDEN_CONTENT : value));
}
/**
@@ -263,8 +269,11 @@ private static Configuration loadYAMLResource(File file) {
* Check whether the key is a hidden key.
*
* @param key the config key
+ * @param additionalKeys user-defined additional sensitive key substrings to check in addition
+ * to the built-in list; use {@link SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain
+ * these from a loaded {@link Configuration}
*/
- public static boolean isSensitive(String key) {
+ public static boolean isSensitive(String key, List additionalKeys) {
Preconditions.checkNotNull(key, "key is null");
final String keyInLower = key.toLowerCase();
for (String hideKey : SENSITIVE_KEYS) {
@@ -272,6 +281,12 @@ public static boolean isSensitive(String key) {
return true;
}
}
+ for (String hideKey : additionalKeys) {
+ final String hideKeyLower = hideKey.toLowerCase();
+ if (keyInLower.length() >= hideKeyLower.length() && keyInLower.contains(hideKeyLower)) {
+ return true;
+ }
+ }
return false;
}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 7ec79f0c80a5b..32d8ee7a1e65a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -649,4 +649,27 @@ public static boolean isRestSSLAuthenticationEnabled(Configuration sslConfig) {
checkNotNull(sslConfig, "sslConfig");
return isRestSSLEnabled(sslConfig) && sslConfig.get(SSL_REST_AUTHENTICATION_ENABLED);
}
+
+ // ------------------------------------------------------------------------
+ // Sensitive key redaction
+ // ------------------------------------------------------------------------
+
+ /**
+ * Additional sensitive key substrings to redact beyond the built-in list.
+ *
+ * Values are matched case-insensitively as substrings of configuration key names. The
+ * built-in sensitive key patterns are immutable and cannot be overridden or removed via this
+ * option.
+ */
+ public static final ConfigOption> ADDITIONAL_SENSITIVE_KEYS =
+ key("security.redaction.additional-keys")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "Comma-separated list of additional configuration key substrings whose"
+ + " values should be redacted in logs and REST API responses."
+ + " Matching is case-insensitive and based on substring containment."
+ + " The built-in sensitive key patterns are immutable and cannot"
+ + " be overridden via this option.");
}
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index c61819fa08826..758be3f4019d9 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -481,7 +482,7 @@ void testListParserErrorDoesNotLeakSensitiveData() {
ConfigOption> secret =
ConfigOptions.key("secret").stringType().asList().noDefaultValue();
- assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
+ assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue();
final Configuration cfg = new Configuration();
// missing closing quote
@@ -500,7 +501,7 @@ void testMapParserErrorDoesNotLeakSensitiveData() {
ConfigOption