From 7e97bf60444abae997a58987ce02300ada9d4660 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 22 Apr 2026 15:59:53 +0900 Subject: [PATCH] Core: Add support for HashiCorp Vault KMS client Co-Authored-By: Endi Caushi <42871239+mrendi29@users.noreply.github.com> --- build.gradle | 16 ++ .../org/apache/iceberg/CatalogProperties.java | 3 + .../iceberg/encryption/EncryptionUtil.java | 2 + docs/docs/encryption.md | 2 +- gradle/libs.versions.toml | 3 + .../hashicorp/VaultKeyManagementClient.java | 256 ++++++++++++++++++ .../TestVaultKeyManagementClient.java | 73 +++++ settings.gradle | 2 + spark/v4.1/spark-runtime/runtime-deps.txt | 1 + 9 files changed, 357 insertions(+), 1 deletion(-) create mode 100644 hashicorp/src/main/java/org/apache/iceberg/hashicorp/VaultKeyManagementClient.java create mode 100644 hashicorp/src/test/java/org/apache/iceberg/hashicorp/TestVaultKeyManagementClient.java diff --git a/build.gradle b/build.gradle index 261dfabf0412..80211da432df 100644 --- a/build.gradle +++ b/build.gradle @@ -384,6 +384,7 @@ project(':iceberg-core') { } implementation libs.aircompressor + implementation libs.bettercloud.vault implementation libs.httpcomponents.httpclient5 implementation platform(libs.jackson.bom) implementation libs.jackson.core @@ -802,6 +803,21 @@ project(':iceberg-gcp') { } } +project(':iceberg-hashicorp') { + test { + useJUnitPlatform() + } + + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-core') + implementation libs.bettercloud.vault + + testImplementation libs.testcontainers + testImplementation libs.testcontainers.vault + } +} + project(':iceberg-hive-metastore') { test { useJUnitPlatform() diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 59744e50924f..a38e977bc35e 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -165,6 +165,7 @@ private CatalogProperties() {} public static final String ENCRYPTION_KMS_TYPE_AWS = "aws"; public static final String ENCRYPTION_KMS_TYPE_AZURE = "azure"; public static final String ENCRYPTION_KMS_TYPE_GCP = "gcp"; + public static final String ENCRYPTION_KMS_TYPE_HASHICORP = "hashicorp"; public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl"; public static final String ENCRYPTION_KMS_IMPL_AWS = @@ -173,4 +174,6 @@ private CatalogProperties() {} "org.apache.iceberg.azure.keymanagement.AzureKeyManagementClient"; public static final String ENCRYPTION_KMS_IMPL_GCP = "org.apache.iceberg.gcp.GcpKeyManagementClient"; + public static final String ENCRYPTION_KMS_IMPL_HASHICORP = + "org.apache.iceberg.hashicorp.VaultKeyManagementClient"; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 382d244883d6..4be58f78e315 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -63,6 +63,8 @@ public static KeyManagementClient createKmsClient(Map catalogPro CatalogProperties.ENCRYPTION_KMS_IMPL_AZURE; case CatalogProperties.ENCRYPTION_KMS_TYPE_GCP -> CatalogProperties.ENCRYPTION_KMS_IMPL_GCP; + case CatalogProperties.ENCRYPTION_KMS_TYPE_HASHICORP -> + CatalogProperties.ENCRYPTION_KMS_IMPL_HASHICORP; default -> throw new IllegalStateException("Unsupported KMS type: " + kmsType); }; } diff --git a/docs/docs/encryption.md b/docs/docs/encryption.md index cbdce85e760e..64a17a9124ab 100644 --- a/docs/docs/encryption.md +++ b/docs/docs/encryption.md @@ -28,7 +28,7 @@ Currently, encryption is supported in the Hive and REST catalogs for tables with Two parameters are required to activate encryption of a table: -1. Catalog property that specifies the KMS ("key management service"). It can be either `encryption.kms-type` for pre-defined KMS clients (`aws`, `azure` or `gcp`) or `encryption.kms-impl` with the client class path for custom KMS clients. +1. Catalog property that specifies the KMS ("key management service"). It can be either `encryption.kms-type` for pre-defined KMS clients (`aws`, `azure`, `gcp` or `hashicorp`) or `encryption.kms-impl` with the client class path for custom KMS clients. 2. Table property `encryption.key-id`, that specifies the ID of a master key used to encrypt and decrypt the table. Master keys are stored and managed in the KMS. For more details on table encryption, see the "Appendix: Internals Overview" [subsection](#appendix-internals-overview). diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 87ceb9012bd6..859b63b9d5ea 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -38,6 +38,7 @@ azuresdk-bom = "1.3.6" awssdk-s3accessgrants = "2.4.1" bouncycastle = "1.82" bson-ver = "4.11.5" +bettercloud-vault = "5.1.0" caffeine = "2.9.3" calcite = "1.41.0" datasketches = "6.2.0" @@ -111,6 +112,7 @@ awssdk-bom = { module = "software.amazon.awssdk:bom", version.ref = "awssdk-bom" awssdk-s3accessgrants = { module = "software.amazon.s3.accessgrants:aws-s3-accessgrants-java-plugin", version.ref = "awssdk-s3accessgrants" } azuresdk-bom = { module = "com.azure:azure-sdk-bom", version.ref = "azuresdk-bom" } bson = { module = "org.mongodb:bson", version.ref = "bson-ver"} +bettercloud-vault = { module = "com.bettercloud:vault-java-driver", version.ref = "bettercloud-vault" } bouncycastle-bcpkix = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" } bouncycastle-bcprov = { module = "org.bouncycastle:bcprov-jdk18on", version.ref = "bouncycastle" } bouncycastle-bcutil = { module = "org.bouncycastle:bcutil-jdk18on", version.ref = "bouncycastle" } @@ -229,5 +231,6 @@ sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" } testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" } testcontainers-junit-jupiter = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" } testcontainers-minio = { module = "org.testcontainers:testcontainers-minio", version.ref = "testcontainers" } +testcontainers-vault = { module = "org.testcontainers:testcontainers-vault", version.ref = "testcontainers" } tez08-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez08" } tez08-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez08" } diff --git a/hashicorp/src/main/java/org/apache/iceberg/hashicorp/VaultKeyManagementClient.java b/hashicorp/src/main/java/org/apache/iceberg/hashicorp/VaultKeyManagementClient.java new file mode 100644 index 000000000000..34a3dba088b9 --- /dev/null +++ b/hashicorp/src/main/java/org/apache/iceberg/hashicorp/VaultKeyManagementClient.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hashicorp; + +import com.bettercloud.vault.SslConfig; +import com.bettercloud.vault.Vault; +import com.bettercloud.vault.VaultConfig; +import com.bettercloud.vault.VaultException; +import com.bettercloud.vault.response.AuthResponse; +import com.bettercloud.vault.response.LogicalResponse; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; + +/** + * KMS client implementation using HashiCorp Vault Transit secrets engine with AppRole + * authentication. + * + *

Required properties: + * + *

+ * + *

Required environment variables: + * + *

+ * + *

Optional environment variables: + * + *

+ */ +public class VaultKeyManagementClient implements KeyManagementClient { + + // Vault server address + private static final String VAULT_ADDRESS_PROP = "vault.address"; + + // Direct token authentication (optional; takes precedence over AppRole) + private static final String VAULT_TOKEN_PROP = "vault.token"; + + // Transit secrets engine mount path (default: "transit") + private static final String VAULT_TRANSIT_MOUNT_PROP = "vault.transit.mount"; + private static final String DEFAULT_TRANSIT_MOUNT = "transit"; + + // AppRole authentication path (default: "approle") + private static final String VAULT_APPROLE_PATH_PROP = "vault.approle.path"; + private static final String DEFAULT_APPROLE_PATH = "approle"; + + // Environment variables for AppRole credentials + private static final String VAULT_ROLE_ID_ENV_VAR = "VAULT_ROLE_ID"; + private static final String VAULT_SECRET_ID_ENV_VAR = "VAULT_SECRET_ID"; + private static final String VAULT_ENABLE_TOKEN_ROTATION_ENV_VAR = "VAULT_ENABLE_TOKEN_ROTATION"; + + private transient Vault vault; + private String vaultAddress; + private String transitMount; + private String appRolePath; + private boolean enableTokenRotation; + private volatile String token; + private volatile long tokenExpiry; + + @Override + public void initialize(Map properties) { + vaultAddress = properties.get(VAULT_ADDRESS_PROP); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(vaultAddress), "%s must be set in properties", VAULT_ADDRESS_PROP); + + transitMount = properties.getOrDefault(VAULT_TRANSIT_MOUNT_PROP, DEFAULT_TRANSIT_MOUNT); + appRolePath = properties.getOrDefault(VAULT_APPROLE_PATH_PROP, DEFAULT_APPROLE_PATH); + + // Check if token rotation is enabled (default: false) + String enableRotation = System.getenv(VAULT_ENABLE_TOKEN_ROTATION_ENV_VAR); + enableTokenRotation = "true".equalsIgnoreCase(enableRotation); + + String vaultToken = properties.get(VAULT_TOKEN_PROP); + if (!Strings.isNullOrEmpty(vaultToken)) { + // Use direct token authentication (e.g., for tests or root tokens) + authenticateWithToken(vaultToken); + } else { + String roleId = System.getenv(VAULT_ROLE_ID_ENV_VAR); + String secretId = System.getenv(VAULT_SECRET_ID_ENV_VAR); + authenticate(roleId, secretId, appRolePath); + } + } + + private void authenticateWithToken(String vaultToken) { + try { + VaultConfig config = + new VaultConfig() + .address(vaultAddress) + .token(vaultToken) + .engineVersion(1) + .sslConfig(new SslConfig().build()) + .build(); + vault = new Vault(config); + token = vaultToken; + } catch (VaultException e) { + throw new RuntimeException("Failed to configure Vault client with token", e); + } + } + + private void authenticate(String roleId, String secretId, String path) { + try { + VaultConfig config = + new VaultConfig() + .address(vaultAddress) + .engineVersion(1) + .sslConfig(new SslConfig().build()) + .build(); + AuthResponse response = new Vault(config).auth().loginByAppRole(path, roleId, secretId); + token = response.getAuthClientToken(); + + if (enableTokenRotation) { + tokenExpiry = System.currentTimeMillis() + (response.getAuthLeaseDuration() * 1000); + } + + VaultConfig authenticatedConfig = + new VaultConfig() + .address(vaultAddress) + .token(token) + .engineVersion(1) + .sslConfig(new SslConfig().build()) + .build(); + vault = new Vault(authenticatedConfig); + } catch (VaultException e) { + throw new RuntimeException("Failed to authenticate with Vault using AppRole", e); + } + } + + private void ensureValidToken() { + if (!enableTokenRotation) { + return; + } + + // Refresh token if it expires in less than 5 minutes + if (System.currentTimeMillis() > (tokenExpiry - 300_000)) { + String roleId = System.getenv(VAULT_ROLE_ID_ENV_VAR); + String secretId = System.getenv(VAULT_SECRET_ID_ENV_VAR); + authenticate(roleId, secretId, appRolePath); + } + } + + @Override + public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) { + ensureValidToken(); + + try { + byte[] keyBytes = new byte[key.remaining()]; + key.duplicate().get(keyBytes); + String plaintext = Base64.getEncoder().encodeToString(keyBytes); + + LogicalResponse response = + vault + .logical() + .write(transitMount + "/encrypt/" + wrappingKeyId, Map.of("plaintext", plaintext)); + + String ciphertext = response.getData().get("ciphertext"); + if (ciphertext == null) { + throw new RuntimeException("Failed to wrap key: no ciphertext returned"); + } + + return ByteBuffer.wrap(ciphertext.getBytes(StandardCharsets.UTF_8)); + + } catch (VaultException e) { + throw new RuntimeException("Failed to wrap key with wrapping key " + wrappingKeyId, e); + } + } + + @Override + public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) { + ensureValidToken(); + + try { + byte[] ciphertextBytes = new byte[wrappedKey.remaining()]; + wrappedKey.duplicate().get(ciphertextBytes); + String ciphertext = new String(ciphertextBytes, StandardCharsets.UTF_8); + + // Decrypt using Transit + LogicalResponse response = + vault + .logical() + .write(transitMount + "/decrypt/" + wrappingKeyId, Map.of("ciphertext", ciphertext)); + + String plaintext = response.getData().get("plaintext"); + if (plaintext == null) { + throw new RuntimeException("Failed to unwrap key: no plaintext returned"); + } + + byte[] keyBytes = Base64.getDecoder().decode(plaintext); + return ByteBuffer.wrap(keyBytes); + + } catch (VaultException e) { + throw new RuntimeException("Failed to unwrap key with wrapping key " + wrappingKeyId, e); + } + } + + @Override + public boolean supportsKeyGeneration() { + return true; + } + + @Override + public KeyGenerationResult generateKey(String wrappingKeyId) { + ensureValidToken(); + + try { + LogicalResponse response = + vault.logical().write(transitMount + "/datakey/plaintext/" + wrappingKeyId, Map.of()); + + String plaintext = response.getData().get("plaintext"); + String ciphertext = response.getData().get("ciphertext"); + + if (plaintext == null || ciphertext == null) { + throw new RuntimeException("Failed to generate key: missing plaintext or ciphertext"); + } + + byte[] keyBytes = Base64.getDecoder().decode(plaintext); + ByteBuffer key = ByteBuffer.wrap(keyBytes); + ByteBuffer wrappedKey = ByteBuffer.wrap(ciphertext.getBytes(StandardCharsets.UTF_8)); + + return new KeyGenerationResult(key, wrappedKey); + } catch (VaultException e) { + throw new RuntimeException("Failed to generate key with wrapping key " + wrappingKeyId, e); + } + } +} diff --git a/hashicorp/src/test/java/org/apache/iceberg/hashicorp/TestVaultKeyManagementClient.java b/hashicorp/src/test/java/org/apache/iceberg/hashicorp/TestVaultKeyManagementClient.java new file mode 100644 index 000000000000..83fb5e747bf6 --- /dev/null +++ b/hashicorp/src/test/java/org/apache/iceberg/hashicorp/TestVaultKeyManagementClient.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hashicorp; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.vault.VaultContainer; + +class TestVaultKeyManagementClient { + private static final String ICEBERG_TEST_KEY_NAME = "iceberg-test-key"; + private static final String VAULT_TOKEN = "token"; + + @SuppressWarnings("resource") + private static final VaultContainer VAULT_CONTAINER = + new VaultContainer<>("hashicorp/vault:1.21") + .withVaultToken(VAULT_TOKEN) + .withInitCommand("secrets enable transit"); + + private static KeyManagementClient vaultKeyManagementClient; + + @BeforeAll + static void startContainer() { + VAULT_CONTAINER.start(); + vaultKeyManagementClient = new VaultKeyManagementClient(); + vaultKeyManagementClient.initialize( + Map.of( + "vault.address", + VAULT_CONTAINER.getHttpHostAddress(), + "vault.token", + VAULT_TOKEN, + "vault.transit.mount", + "transit", + "vault.approle.path", + "approle")); + } + + @Test + public void keyWrapping() { + ByteBuffer key = ByteBuffer.wrap("table-master-key".getBytes()); + + ByteBuffer encryptedKey = vaultKeyManagementClient.wrapKey(key, ICEBERG_TEST_KEY_NAME); + ByteBuffer decryptedKey = + vaultKeyManagementClient.unwrapKey(encryptedKey, ICEBERG_TEST_KEY_NAME); + + assertThat(decryptedKey).isEqualTo(key); + } + + @Test + public void keyGenerationSupported() { + assertThat(vaultKeyManagementClient.supportsKeyGeneration()).isTrue(); + } +} diff --git a/settings.gradle b/settings.gradle index 70f9343a252b..12cbe54c0c2b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,6 +33,7 @@ include 'arrow' include 'parquet' include 'bundled-guava' include 'spark' +include 'hashicorp' include 'hive-metastore' include 'nessie' include 'gcp' @@ -59,6 +60,7 @@ project(':arrow').name = 'iceberg-arrow' project(':parquet').name = 'iceberg-parquet' project(':bundled-guava').name = 'iceberg-bundled-guava' project(':spark').name = 'iceberg-spark' +project(':hashicorp').name = 'iceberg-hashicorp' project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' diff --git a/spark/v4.1/spark-runtime/runtime-deps.txt b/spark/v4.1/spark-runtime/runtime-deps.txt index e275e24372af..833c6df06b33 100644 --- a/spark/v4.1/spark-runtime/runtime-deps.txt +++ b/spark/v4.1/spark-runtime/runtime-deps.txt @@ -1,3 +1,4 @@ +com.bettercloud:vault-java-driver:5.1.0 com.fasterxml.jackson.core:jackson-annotations:2.21 com.fasterxml.jackson.core:jackson-core:2.15.2 com.fasterxml.jackson.core:jackson-databind:2.15.2