Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>String</td>
<td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.<br /><br />This option requires <code class="highlighter-rouge">execution.checkpointing.unaligned.recover-output-on-downstream.enabled</code> to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
<td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td>
Expand Down Expand Up @@ -158,12 +164,6 @@
<td>Integer</td>
<td>The tolerable checkpoint consecutive failure number. If set to 0, that means we do not tolerance any checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.<br /><br />This option requires <code class="highlighter-rouge">execution.checkpointing.unaligned.recover-output-on-downstream.enabled</code> to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.unaligned.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>Option whether to discard a checkpoint's states in parallel using the ExecutorService passed into the cleaner</td>
</tr>
<tr>
<td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.<br /><br />This option requires <code class="highlighter-rouge">execution.checkpointing.unaligned.recover-output-on-downstream.enabled</code> to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -56,12 +62,6 @@
<td>Integer</td>
<td>The maximum number of completed checkpoints to retain.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable checkpointing during recovery from an unaligned checkpoint. When enabled, the job can take checkpoints while still recovering channel state (inflight data) from a previous unaligned checkpoint. This avoids the need to wait for full recovery before the first checkpoint can be triggered, which reduces the window of vulnerability to failures during recovery.<br /><br />This option requires <code class="highlighter-rouge">execution.checkpointing.unaligned.recover-output-on-downstream.enabled</code> to be enabled. It does not require unaligned checkpoints to be currently enabled, because a job may restore from an unaligned checkpoint while having unaligned checkpoints disabled for the new execution.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.unaligned.recover-output-on-downstream.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/security_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<td>List&lt;String&gt;</td>
<td>List of factories that should be used to instantiate security modules. All listed modules will be installed. Keep in mind that the configured security context might rely on some modules being present.</td>
</tr>
<tr>
<td><h5>security.redaction.additional-keys</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>Comma-separated list of additional configuration key substrings whose values should be redacted in logs and REST API responses. Matching is case-insensitive and based on substring containment. The built-in sensitive key patterns are immutable and cannot be overridden via this option.</td>
</tr>
<tr>
<td><h5>security.ssl.algorithms</h5></td>
<td style="word-wrap: break-word;">"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,9 @@ public <T> Optional<T> getOptional(ConfigOption<T> option) {
}
} catch (Exception e) {
throw new IllegalArgumentException(
GlobalConfiguration.isSensitive(option.key())
GlobalConfiguration.isSensitive(
option.key(),
this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
? String.format("Could not parse value for key '%s'.", option.key())
: String.format(
"Could not parse value '%s' for key '%s'.",
Expand Down Expand Up @@ -680,7 +682,8 @@ public String toString() {
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().toString())))
entry -> entry.getValue().toString())),
this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,23 @@ public static Configuration createConfiguration(Properties properties) {

/**
* Replaces values whose keys are sensitive according to {@link
* GlobalConfiguration#isSensitive(String)} with {@link GlobalConfiguration#HIDDEN_CONTENT}.
* GlobalConfiguration#isSensitive(String, List)} with {@link
* GlobalConfiguration#HIDDEN_CONTENT}.
*
* <p>This can be useful when displaying configuration values.
*
* @param keyValuePairs for which to hide sensitive values
* @param additionalSensitiveKeys user-defined additional sensitive key substrings; use {@link
* SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain these from a loaded configuration
* @return A map where all sensitive value are hidden
*/
@Nonnull
public static Map<String, String> hideSensitiveValues(Map<String, String> keyValuePairs) {
public static Map<String, String> hideSensitiveValues(
Map<String, String> keyValuePairs, List<String> additionalSensitiveKeys) {
final HashMap<String, String> result = new HashMap<>();

for (Map.Entry<String, String> keyValuePair : keyValuePairs.entrySet()) {
if (GlobalConfiguration.isSensitive(keyValuePair.getKey())) {
if (GlobalConfiguration.isSensitive(keyValuePair.getKey(), additionalSensitiveKeys)) {
result.put(keyValuePair.getKey(), GlobalConfiguration.HIDDEN_CONTENT);
} else {
result.put(keyValuePair.getKey(), keyValuePair.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public final class GlobalConfiguration {
"token",
"basic-auth",
"jaas.config",
"http-headers"
"http-headers",
"access-key",
"access.key",
"accesskey"
};

// the hidden content to be displayed
Expand Down Expand Up @@ -151,24 +154,27 @@ public static Configuration loadConfiguration(
configuration = loadYAMLResource(yamlConfigFile);
}

logConfiguration("Loading", configuration);
final List<String> additionalKeys =
configuration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
logConfiguration("Loading", configuration, additionalKeys);

if (dynamicProperties != null) {
logConfiguration("Loading dynamic", dynamicProperties);
logConfiguration("Loading dynamic", dynamicProperties, additionalKeys);
configuration.addAll(dynamicProperties);
}

return configuration;
}

private static void logConfiguration(String prefix, Configuration config) {
private static void logConfiguration(
String prefix, Configuration config, List<String> additionalKeys) {
config.confData.forEach(
(key, value) ->
LOG.info(
"{} configuration property: {}, {}",
prefix,
key,
isSensitive(key) ? HIDDEN_CONTENT : value));
isSensitive(key, additionalKeys) ? HIDDEN_CONTENT : value));
}

/**
Expand Down Expand Up @@ -263,15 +269,24 @@ private static Configuration loadYAMLResource(File file) {
* Check whether the key is a hidden key.
*
* @param key the config key
* @param additionalKeys user-defined additional sensitive key substrings to check in addition
* to the built-in list; use {@link SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain
* these from a loaded {@link Configuration}
*/
public static boolean isSensitive(String key) {
public static boolean isSensitive(String key, List<String> additionalKeys) {
Preconditions.checkNotNull(key, "key is null");
final String keyInLower = key.toLowerCase();
for (String hideKey : SENSITIVE_KEYS) {
if (keyInLower.length() >= hideKey.length() && keyInLower.contains(hideKey)) {
return true;
}
}
for (String hideKey : additionalKeys) {
final String hideKeyLower = hideKey.toLowerCase();
if (keyInLower.length() >= hideKeyLower.length() && keyInLower.contains(hideKeyLower)) {
return true;
}
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,4 +649,27 @@ public static boolean isRestSSLAuthenticationEnabled(Configuration sslConfig) {
checkNotNull(sslConfig, "sslConfig");
return isRestSSLEnabled(sslConfig) && sslConfig.get(SSL_REST_AUTHENTICATION_ENABLED);
}

// ------------------------------------------------------------------------
// Sensitive key redaction
// ------------------------------------------------------------------------

/**
* Additional sensitive key substrings to redact beyond the built-in list.
*
* <p>Values are matched case-insensitively as substrings of configuration key names. The
* built-in sensitive key patterns are immutable and cannot be overridden or removed via this
* option.
*/
public static final ConfigOption<List<String>> ADDITIONAL_SENSITIVE_KEYS =
key("security.redaction.additional-keys")
.stringType()
.asList()
.defaultValues()
.withDescription(
"Comma-separated list of additional configuration key substrings whose"
+ " values should be redacted in logs and REST API responses."
+ " Matching is case-insensitive and based on substring containment."
+ " The built-in sensitive key patterns are immutable and cannot"
+ " be overridden via this option.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -481,7 +482,7 @@ void testListParserErrorDoesNotLeakSensitiveData() {
ConfigOption<List<String>> secret =
ConfigOptions.key("secret").stringType().asList().noDefaultValue();

assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue();

final Configuration cfg = new Configuration();
// missing closing quote
Expand All @@ -500,7 +501,7 @@ void testMapParserErrorDoesNotLeakSensitiveData() {
ConfigOption<Map<String, String>> secret =
ConfigOptions.key("secret").mapType().noDefaultValue();

assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue();

final Configuration cfg = new Configuration();
// malformed map representation
Expand All @@ -519,7 +520,7 @@ void testToStringDoesNotLeakSensitiveData() {
ConfigOption<Map<String, String>> secret =
ConfigOptions.key("secret").mapType().noDefaultValue();

assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
assertThat(GlobalConfiguration.isSensitive(secret.key(), Collections.emptyList())).isTrue();

final Configuration cfg = new Configuration();
cfg.setString(secret.key(), "secret_value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -173,7 +174,7 @@ void testHideSensitiveValues() {
}

final Map<String, String> hiddenSensitiveValues =
ConfigurationUtils.hideSensitiveValues(keyValuePairs);
ConfigurationUtils.hideSensitiveValues(keyValuePairs, Collections.emptyList());

assertThat(hiddenSensitiveValues).isEqualTo(expectedKeyValuePairs);
}
Expand Down
Loading