diff --git a/artemis-bom/pom.xml b/artemis-bom/pom.xml
index 9902e2fdee0..7e3a8080fee 100644
--- a/artemis-bom/pom.xml
+++ b/artemis-bom/pom.xml
@@ -393,6 +393,11 @@
artemis-lockmanager-ri
${project.version}
+
+ org.apache.activemq
+ artemis-kube-lock
+ ${project.version}
+
org.apache.activemq
artemis-ra
diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml
index d8b57f77aa9..2a2b5a2a050 100644
--- a/artemis-commons/pom.xml
+++ b/artemis-commons/pom.xml
@@ -83,6 +83,10 @@
io.netty
netty-transport
+
+ de.dentrassi.crypto
+ pem-keystore
+
commons-beanutils
commons-beanutils
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/kubernetes/KubernetesClient.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/kubernetes/KubernetesClient.java
new file mode 100644
index 00000000000..5e51a58c421
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/kubernetes/KubernetesClient.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.kubernetes;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.utils.JsonLoader;
+import org.apache.activemq.artemis.utils.ssl.KeyStoreSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(KubernetesClient.class);
+
+ private static final String KUBERNETES_HOST = "KUBERNETES_SERVICE_HOST";
+ private static final String KUBERNETES_PORT = "KUBERNETES_SERVICE_PORT";
+ private static final String KUBERNETES_TOKEN_PATH = "KUBERNETES_TOKEN_PATH";
+ private static final String KUBERNETES_CA_PATH = "KUBERNETES_CA_PATH";
+
+ private static final String KUBERNETES_TOKENREVIEW_URI_PATTERN = "https://%s:%s";
+
+ private static final String DEFAULT_KUBERNETES_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token";
+ private static final String DEFAULT_KUBERNETES_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
+
+ private static String authToken;
+
+ private static volatile HttpClient httpClient;
+ private static volatile Map params;
+ private static URI apiUri;
+
+ private KubernetesClient() {
+ }
+
+ public static HttpClient getHttpClient() {
+ HttpClient result = httpClient;
+ if (result != null) {
+ return result;
+ }
+ synchronized (KubernetesClient.class) {
+ if (httpClient == null) {
+ try {
+ httpClient = HttpClient.newBuilder().sslContext(buildSSLContext()).build();
+ } catch (Exception e) {
+ logger.error("Unable to build a valid SSLContext or HttpClient", e);
+ }
+ }
+ if (authToken == null) {
+ String tokenPath = getParam(KUBERNETES_TOKEN_PATH, DEFAULT_KUBERNETES_TOKEN_PATH);
+ try {
+ logger.debug("Loading client authentication token from {}", tokenPath);
+ authToken = readFile(tokenPath);
+ logger.debug("Loaded client authentication token from {}", tokenPath);
+ } catch (IOException e) {
+ logger.error("Cannot retrieve Service Account Authentication Token from " + tokenPath, e);
+ }
+ }
+ String host = getParam(KUBERNETES_HOST);
+ String port = getParam(KUBERNETES_PORT);
+ apiUri = URI.create(String.format(KUBERNETES_TOKENREVIEW_URI_PATTERN, host, port));
+ }
+ return httpClient;
+ }
+
+ // for tests
+ public static void clearHttpClient() {
+ httpClient = null;
+ authToken = null;
+ apiUri = null;
+ }
+
+ // for tests
+ public static void setParam(String name, String value) {
+ if (params == null) {
+ synchronized (KubernetesClient.class) {
+ if (params == null) {
+ params = new ConcurrentHashMap<>();
+ }
+ }
+ }
+ params.put(name, value);
+ }
+
+ // for tests
+ public static void clearParams() {
+ if (params != null) {
+ params = null;
+ }
+ }
+
+ public static String getParam(String name, String defaultValue) {
+ String value = null;
+ if (params != null) {
+ value = params.get(name);
+ }
+ if (value == null) {
+ value = System.getProperty(name);
+ }
+ if (value == null) {
+ value = System.getenv(name);
+ }
+ if (value == null) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ public static String getParam(String name) {
+ return getParam(name, null);
+ }
+
+ public static JsonObject get(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(apiUri.resolve(path))
+ .header("Authorization", "Bearer " + authToken)
+ .header("Accept", "application/json; charset=utf-8")
+ .GET()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() == 404) {
+ return null;
+ }
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+
+ public static JsonObject put(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(apiUri.resolve(path))
+ .header("Authorization", "Bearer " + authToken)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json; charset=utf-8")
+ .PUT(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public static void delete(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(apiUri.resolve(path))
+ .header("Authorization", "Bearer " + authToken)
+ .header("Accept", "application/json")
+ .DELETE()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() != 404 && (response.statusCode() < 200 || response.statusCode() >= 300)) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+ }
+
+ public static JsonObject post(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpClient httpClient = getHttpClient();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(apiUri.resolve(path))
+ .header("Authorization", "Bearer " + authToken)
+ .header("Accept", "application/json; charset=utf-8")
+ .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ private static String readFile(String path) throws IOException {
+ try (Scanner scanner = new Scanner(Path.of(path))) {
+ StringBuilder buffer = new StringBuilder();
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ if (!line.isBlank() && !line.startsWith("#")) {
+ buffer.append(line);
+ }
+ }
+ return buffer.toString();
+ }
+ }
+
+ private static SSLContext buildSSLContext() throws Exception {
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ String caPath = getParam(KUBERNETES_CA_PATH, DEFAULT_KUBERNETES_CA_PATH);
+ File certFile = new File(caPath);
+ if (!certFile.exists()) {
+ // TODO-IMPORTANT: I think this should throw an exception. Having no authority is okay or is this a security issue?
+ logger.debug("Kubernetes CA certificate not found at: {}. Truststore not configured", caPath);
+ return ctx;
+ }
+ KeyStore trustStore = KeyStoreSupport.loadKeystore(null, KeyStoreSupport.PEMCA, caPath, null);
+ TrustManagerFactory tmFactory = TrustManagerFactory
+ .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmFactory.init(trustStore);
+
+ ctx.init(null, tmFactory.getTrustManagers(), new SecureRandom());
+ return ctx;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccess.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java
similarity index 96%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccess.java
rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java
index 61f039c6d18..b3fc4a1777a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccess.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.spi.core.security.jaas.oidc;
+package org.apache.activemq.artemis.utils.rest;
import javax.security.auth.login.LoginContext;
import java.net.URI;
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java
new file mode 100644
index 00000000000..f69d5a16f96
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.rest;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.utils.ssl.KeyStoreSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Access class for {@link HttpClient} instances which keeps already created clients in a static
+ * cache, but creates new clients using instance options passed from just-initialized
+ * {@link javax.security.auth.spi.LoginModule}.
+ */
+public class SharedHttpClientAccess implements HttpClientAccess {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final Map cache = new ConcurrentHashMap<>();
+
+ private final int httpTimeout;
+ private final String tlsVersion;
+ private final String caCertificate;
+
+ public SharedHttpClientAccess(final int httpTimeout, String tlsVersion, String caCertificate) {
+ this.httpTimeout = httpTimeout;
+ this.tlsVersion = tlsVersion;
+ this.caCertificate = caCertificate;
+ }
+
+ @Override
+ public HttpClient getClient(URI baseURI) {
+ HttpClient client = cache.get(baseURI);
+ if (client == null) {
+ synchronized (SharedHttpClientAccess.class) {
+ client = cache.get(baseURI);
+ if (client == null) {
+ client = createDefaultHttpClient();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created new HTTP Client at {}", baseURI);
+ }
+ cache.put(baseURI, client);
+ }
+ }
+ }
+
+ return client;
+ }
+
+ /**
+ * Create a slightly customized {@link HttpClient}
+ *
+ * @return newly created {@link HttpClient}
+ */
+ public HttpClient createDefaultHttpClient() {
+ HttpClient.Builder builder = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_1_1)
+ .connectTimeout(Duration.ofMillis(httpTimeout));
+
+ boolean sslContextSet = false;
+ if (tlsVersion != null && caCertificate != null) {
+ try {
+ File caCertificateFile = new File(caCertificate);
+ if (!caCertificateFile.isFile()) {
+ logger.warn("The certificate file {} does not exist", caCertificate);
+ } else {
+ SSLContext sslContext = SSLContext.getInstance(tlsVersion);
+ KeyStore trustStore = KeyStoreSupport.loadKeystore(null, "PEMCA", caCertificate, null);
+ TrustManagerFactory tmFactory = TrustManagerFactory
+ .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmFactory.init(trustStore);
+ sslContext.init(null, tmFactory.getTrustManagers(), new SecureRandom());
+ builder.sslContext(sslContext);
+ sslContextSet = true;
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ } catch (Exception e) {
+ throw new RuntimeException("Can't configure SSL Context for HTTP Client", e);
+ }
+ }
+
+ if (!sslContextSet) {
+ try {
+ SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
+ sslContext.init(null, null, new SecureRandom());
+ builder.sslContext(sslContext);
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new RuntimeException("Can't configure default SSL Context for HTTP Client", e);
+ }
+ }
+
+ return builder.build();
+ }
+
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/KeyStoreSupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/KeyStoreSupport.java
new file mode 100644
index 00000000000..58bd627e58d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/KeyStoreSupport.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.ssl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.PrivilegedAction;
+import java.security.Security;
+
+import org.apache.activemq.artemis.utils.ClassloadingUtil;
+import org.apache.activemq.artemis.utils.sm.SecurityManagerShim;
+
+public class KeyStoreSupport {
+ public static final String NONE = "NONE";
+ public static final String PKCS_11 = "PKCS11";
+ public static final String PEMCA = "PEMCA";
+
+ public static KeyStore loadKeystore(final String keystoreProvider,
+ final String keystoreType,
+ final String keystorePath,
+ final String keystorePassword) throws Exception {
+ checkPemProviderLoaded(keystoreType);
+ KeyStore ks = keystoreProvider == null ? KeyStore.getInstance(keystoreType) : KeyStore.getInstance(keystoreType, keystoreProvider);
+ InputStream in = null;
+ try {
+ if (keystorePath != null && !keystorePath.isEmpty() && !keystorePath.equalsIgnoreCase(NONE)) {
+ URL keystoreURL = KeyStoreSupport.validateStoreURL(keystorePath);
+ in = keystoreURL.openStream();
+ }
+ ks.load(in, keystorePassword == null ? null : keystorePassword.toCharArray());
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ return ks;
+ }
+
+ /**
+ * This method calls out to a separate class in order to avoid a hard dependency on the provider's implementation.
+ * This allows folks who don't use PEM to avoid using the corresponding dependency.
+ */
+ public static void checkPemProviderLoaded(String keystoreType) {
+ if (keystoreType != null && keystoreType.startsWith("PEM")) {
+ if (Security.getProvider("PEM") == null) {
+ PemSupport.loadProvider();
+ }
+ }
+ }
+
+
+ public static URL validateStoreURL(final String storePath) throws Exception {
+ assert storePath != null;
+
+ // First see if this is a URL
+ try {
+ return new URL(storePath);
+ } catch (MalformedURLException e) {
+ File file = new File(storePath);
+ if (file.exists() && file.isFile()) {
+ return file.toURI().toURL();
+ } else {
+ URL url = findResource(storePath);
+ if (url != null) {
+ return url;
+ }
+ }
+ }
+
+ throw new Exception("Failed to find a store at " + storePath);
+ }
+
+
+ /**
+ * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open
+ * in a utility class, as it would be a door to load anything you like in a safe VM. For that reason any class trying
+ * to do a privileged block should do with the AccessController directly.
+ */
+ private static URL findResource(final String resourceName) {
+ return SecurityManagerShim.doPrivileged((PrivilegedAction) () -> ClassloadingUtil.findResource(resourceName));
+ }
+
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/PemSupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/PemSupport.java
similarity index 94%
rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/PemSupport.java
rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/PemSupport.java
index 83db8845944..3e027eca4ce 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ssl/PemSupport.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ssl/PemSupport.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.remoting.impl.ssl;
+package org.apache.activemq.artemis.utils.ssl;
import java.security.Security;
diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml
index 9e38c6d2632..6da3da32925 100644
--- a/artemis-core-client/pom.xml
+++ b/artemis-core-client/pom.xml
@@ -125,10 +125,6 @@
io.netty
netty-resolver
-
- de.dentrassi.crypto
- pem-keystore
-
+
+ 4.0.0
+
+
+ org.apache.artemis
+ artemis-lockmanager
+ 2.55.0-SNAPSHOT
+
+
+ artemis-kube-lock-all
+ jar
+ Kube Lock Manager (all dependencies)
+
+
+ ${project.basedir}/../..
+
+
+
+
+ org.apache.artemis
+ artemis-kube-lock
+ ${project.version}
+ true
+
+
+
+
+ org.bouncycastle
+ bcpkix-jdk18on
+ ${bc-java-version}
+ compile
+
+
+ org.bouncycastle
+ bcprov-jdk18on
+ ${bc-java-version}
+ compile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ false
+ false
+
+
+ *:*
+
+
+
+ org.apache.artemis:artemis-lockmanager-api
+
+
+
+
+
+ io.kubernetes
+ artemis.kube.shade.io.kubernetes
+
+
+ io.swagger
+ artemis.kube.shade.io.swagger
+
+
+ io.gapi
+ artemis.kube.shade.io.gapi
+
+
+ io.sundr
+ artemis.kube.shade.io.sundr
+
+
+ io.gsonfire
+ artemis.kube.shade.io.gsonfire
+
+
+ com.google
+ artemis.kube.shade.com.google
+
+
+ com.squareup
+ artemis.kube.shade.com.squareup
+
+
+ com.fasterxml
+ artemis.kube.shade.com.fasterxml
+
+
+ okhttp3
+ artemis.kube.shade.okhttp3
+
+
+ okio
+ artemis.kube.shade.okio
+
+
+ org.yaml
+ artemis.kube.shade.org.yaml
+
+
+ org.joda
+ artemis.kube.shade.org.joda
+
+
+ org.threeten
+ artemis.kube.shade.org.threeten
+
+
+ org.checkerframework
+ artemis.kube.shade.org.checkerframework
+
+
+ javax.annotation
+ artemis.kube.shade.javax.annotation
+
+
+ kotlin
+ artemis.kube.shade.kotlin
+
+
+ org.jetbrains
+ artemis.kube.shade.org.jetbrains
+
+
+ org.intellij
+ artemis.kube.shade.org.intellij
+
+
+ org.apache.commons
+ artemis.kube.shade.org.apache.commons
+
+
+ org.slf4j
+ artemis.kube.shade.org.slf4j
+
+
+ jakarta
+ artemis.kube.shade.jakarta
+
+
+ org.jose4j
+ artemis.kube.shade.org.jose4j
+
+
+ com.github
+ artemis.kube.shade.com.github
+
+
+ io.github
+ artemis.kube.shade.io.github
+
+
+ org.jspecify
+ artemis.kube.shade.org.jspecify
+
+
+ org.bouncycastle
+ artemis.kube.shade.org.bouncycastle
+
+
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
diff --git a/artemis-lockmanager/artemis-kube-lock/pom.xml b/artemis-lockmanager/artemis-kube-lock/pom.xml
new file mode 100644
index 00000000000..5cdc69ee093
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/pom.xml
@@ -0,0 +1,46 @@
+
+
+ 4.0.0
+
+
+ org.apache.artemis
+ artemis-lockmanager
+ 2.55.0-SNAPSHOT
+
+
+ artemis-kube-lock
+ bundle
+ Kubernetes Lock Manager
+
+
+ ${project.basedir}/../..
+
+
+
+
+ org.apache.artemis
+ artemis-lockmanager-api
+
+
+ org.apache.artemis
+ artemis-commons
+
+
+
+
+
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
new file mode 100644
index 00000000000..d89a2430bba
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.apache.artemis.lock.kube.client.LockKubeClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeLock implements DistributedLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ final String hostname;
+ final String namespace;
+ final String id;
+ private final int leasePeriodSeconds;
+
+
+ public KubeLock(String hostname, String namespace, String id, int leasePeriodSeconds) {
+ this.hostname = hostname;
+ this.namespace = namespace;
+ this.id = id;
+ this.leasePeriodSeconds = leasePeriodSeconds;
+ }
+
+ @Override
+ public String getLockId() {
+ return id;
+ }
+
+ @Override
+ public boolean isHeldByCaller() throws UnavailableStateException {
+ try {
+ return renewLock();
+ } catch (Exception e) {
+ throw new UnavailableStateException(e.getMessage(), e);
+ }
+ }
+
+ private boolean renewLock() throws Exception {
+ try {
+ // Try to read the existing lease
+ JsonObject existingLease = LockKubeClient.getLease(namespace, id);
+
+ if (existingLease == null) {
+ logger.debug("Create lock");
+ String nowTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ LockKubeClient.createLease(namespace, id, hostname, nowTime, nowTime, leasePeriodSeconds);
+ return true;
+ }
+
+ JsonObject spec = existingLease.getJsonObject("spec");
+ if (spec == null) {
+ logger.warn("Lease spec is null");
+ return false;
+ }
+
+ String holderIdentity = spec.getString("holderIdentity", null);
+ String renewTimeStr = spec.getString("renewTime", null);
+ int leaseDuration = spec.getInt("leaseDurationSeconds", (int) leasePeriodSeconds);
+
+ logger.debug("renewLock, Read lease: renewTime={}, holderIdentity={}, leaseDuration={}",
+ renewTimeStr, holderIdentity, leaseDuration);
+
+ // Check if we already hold this lease
+ if (hostname.equals(holderIdentity)) {
+ // Renew the lease
+ JsonObject metadata = existingLease.getJsonObject("metadata");
+ String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";
+
+ String renewTime = OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ LockKubeClient.renewLease(namespace, id, resourceVersion, holderIdentity, spec.getString("acquireTime", renewTime), renewTime, leaseDuration);
+ return true;
+ }
+
+ // Check if the lease has expired by using the leaseDurationSeconds from the lease spec
+ if (renewTimeStr != null) {
+ OffsetDateTime renewTime = OffsetDateTime.parse(renewTimeStr);
+ OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("renew period:: {}, now = {}, between={}, between seconds={}", renewTime, now, Duration.between(renewTime, now), Duration.between(renewTime, now).toSeconds());
+ }
+ long ageSeconds = Duration.between(renewTime, now).toSeconds();
+
+ if (ageSeconds > leaseDuration) {
+ // Lease has expired, try to acquire it
+ OffsetDateTime newTime = OffsetDateTime.now(ZoneOffset.UTC);
+ String newRenewTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ String acquireTime = newTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+
+ JsonObject metadata = existingLease.getJsonObject("metadata");
+ String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";
+
+ LockKubeClient.renewLease(namespace, id, resourceVersion, holderIdentity, acquireTime, newRenewTime, leaseDuration);
+ return true;
+ }
+ }
+
+ // Lease is held by someone else and not expired
+ return false;
+
+ } catch (IOException | InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ throw new UnavailableStateException("Failed to renew lock", e);
+ }
+ }
+
+ @Override
+ public boolean tryLock() throws UnavailableStateException {
+ try {
+ return renewLock();
+ } catch (Exception e) {
+ throw new UnavailableStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void unlock() throws UnavailableStateException {
+ try {
+ // Try to read the existing lease
+ String path = "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
+ JsonObject existingLease = kubeClient.get(path);
+
+ if (existingLease == null) {
+ // Lease doesn't exist - already unlocked or expired
+ logger.debug("Lock {} not found, already released", id);
+ return;
+ }
+
+ JsonObject spec = existingLease.getJsonObject("spec");
+ if (spec == null) {
+ logger.warn("Lease spec is null during unlock");
+ return;
+ }
+
+ String holderIdentity = spec.getString("holderIdentity", null);
+
+ // Only unlock if we hold the lease
+ if (hostname.equals(holderIdentity)) {
+ // Delete the lease to release the lock
+ kubeClient.delete(path);
+ logger.debug("Released lock: {}", id);
+ } else {
+ logger.warn("Attempted to unlock {} but it's held by {}", id, holderIdentity);
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to unlock: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
new file mode 100644
index 00000000000..bb25139dae1
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Kubernetes-based distributed lock manager implementation.
+ *
+ * Configuration parameters:
+ *
+ * - hostname: The hostname identifier for this instance.
+ * Default: value of the HOSTNAME environment variable
+ * - namespace: The Kubernetes namespace where locks are managed.
+ * Default: reads from /var/run/secrets/kubernetes.io/serviceaccount/namespace,
+ * falls back to "default" if unavailable
+ * - lease-timeout: The lease timeout in seconds.
+ * Default: 30 seconds
+ *
+ */
+public class KubeLockManager extends AbstractDistributedLockManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String HOSTNAME = "hostname";
+ private static final String NAMESPACE = "namespace";
+ private static final String LEASE_TIMEOUT = "lease-timeout";
+ private static final String API_SERVER = "api-server";
+ private static final String TOKEN = "token";
+ private static final Set VALID_PARAMS = Stream.of(
+ HOSTNAME,
+ NAMESPACE,
+ LEASE_TIMEOUT,
+ API_SERVER,
+ TOKEN).collect(Collectors.toSet());
+ private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
+
+ private String hostname;
+ private String namespace;
+ int leaseTimeout;
+
+ public KubeLockManager(Map config) {
+ this(config.get(HOSTNAME),
+ config.get(NAMESPACE),
+ config.get(LEASE_TIMEOUT) != null ? Integer.parseInt(config.get(LEASE_TIMEOUT)) : 30,
+ config.get(API_SERVER),
+ config.get(TOKEN));
+ validateParameters(config);
+ }
+
+ public KubeLockManager(String hostname, String namespace, int leaseTimeout, String apiServer, String token) {
+ this.hostname = hostname != null ? hostname : System.getenv("HOSTNAME");
+ this.leaseTimeout = leaseTimeout > 0 ? leaseTimeout : 30;
+
+ // Set namespace - use provided value, fall back to service account file, then "default"
+ if (namespace != null) {
+ this.namespace = namespace;
+ } else {
+ try {
+ this.namespace = Files.readString(Path.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace")).trim();
+ logger.debug("Read namespace from Kubernetes service account: {}", this.namespace);
+ } catch (IOException e) {
+ logger.debug("Could not read namespace from service account, using default", e);
+ this.namespace = "default";
+ }
+ }
+ logger.debug("KubeLockManager configured: hostname={}, namespace={}, leaseTimeout={}",
+ this.hostname, this.namespace, this.leaseTimeout);
+ }
+
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
+ @Override
+ public void addUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public void removeUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
+ return true;
+ }
+
+ @Override
+ public void start() throws InterruptedException, ExecutionException {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return true;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public DistributedLock getDistributedLock(String lockId) throws InterruptedException, ExecutionException, TimeoutException {
+ return new KubeLock(hostname, namespace, lockId, leaseTimeout);
+ }
+
+ @Override
+ public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException {
+ return new KubeMutableLong(namespace, mutableLongId, apiServer, token);
+ }
+
+ private String getKubeconfigServer() {
+ try {
+ String kubeconfigPath = System.getenv("KUBECONFIG");
+ if (kubeconfigPath == null) {
+ kubeconfigPath = System.getProperty("user.home") + "/.kube/config";
+ }
+ Path path = Path.of(kubeconfigPath);
+ if (!Files.exists(path)) {
+ return null;
+ }
+
+ String content = Files.readString(path);
+ // Simple parsing - look for "server:" line
+ for (String line : content.split("\n")) {
+ String trimmed = line.trim();
+ if (trimmed.startsWith("server:")) {
+ String server = trimmed.substring("server:".length()).trim();
+ logger.debug("Found server in kubeconfig: {}", server);
+ return server;
+ }
+ }
+ } catch (IOException e) {
+ logger.debug("Could not read kubeconfig", e);
+ }
+ return null;
+ }
+
+ private String generateKubectlToken() {
+ try {
+ ProcessBuilder pb = new ProcessBuilder("kubectl", "create", "token", "default");
+ pb.redirectErrorStream(true);
+ Process process = pb.start();
+
+ String output;
+ try (java.io.BufferedReader reader = new java.io.BufferedReader(
+ new java.io.InputStreamReader(process.getInputStream()))) {
+ output = reader.lines().collect(java.util.stream.Collectors.joining("\n")).trim();
+ }
+
+ int exitCode = process.waitFor();
+ if (exitCode == 0 && !output.isEmpty()) {
+ logger.debug("Successfully generated token using kubectl");
+ return output;
+ } else {
+ logger.debug("kubectl create token failed with exit code {}: {}", exitCode, output);
+ }
+ } catch (Exception e) {
+ logger.debug("Could not generate token using kubectl", e);
+ }
+ return null;
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
new file mode 100644
index 00000000000..f4dc45dded4
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeMutableLong implements MutableLong {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String VALUE_KEY = "value";
+
+ private final String namespace;
+ private final String id;
+ private final String configMapName;
+ private final KubeRestClient kubeClient;
+
+ public KubeMutableLong(String namespace, String id, String apiServer, String token) {
+ this.namespace = namespace;
+ this.id = id;
+ this.configMapName = sanitizeConfigMapName(id);
+ this.kubeClient = new KubeRestClient(apiServer, token);
+ }
+
+ private static String sanitizeConfigMapName(String name) {
+ return name.toLowerCase().replaceAll("[^a-z0-9.-]", "-");
+ }
+
+ @Override
+ public String getMutableLongId() {
+ return id;
+ }
+
+ @Override
+ public long get() throws UnavailableStateException {
+ try {
+ String path = "/api/v1/namespaces/" + namespace + "/configmaps/" + configMapName;
+ JsonObject configMap = kubeClient.get(path);
+
+ if (configMap == null) {
+ return 0L;
+ }
+
+ JsonObject data = configMap.getJsonObject("data");
+ if (data == null || !data.containsKey(VALUE_KEY)) {
+ return 0L;
+ }
+
+ String valueStr = data.getString(VALUE_KEY, "0");
+ return Long.parseLong(valueStr);
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to get mutable long value: " + e.getMessage(), e);
+ } catch (NumberFormatException e) {
+ throw new UnavailableStateException("Invalid long value stored in ConfigMap", e);
+ }
+ }
+
+ @Override
+ public void set(long value) throws UnavailableStateException {
+ try {
+ String path = "/api/v1/namespaces/" + namespace + "/configmaps/" + configMapName;
+ JsonObject existingConfigMap = kubeClient.get(path);
+
+ if (existingConfigMap != null) {
+ // Update existing ConfigMap
+ String updatedJson = buildConfigMapJson(existingConfigMap, value);
+ kubeClient.put(path, updatedJson);
+ } else {
+ // Create new ConfigMap
+ String newConfigMapJson = "{\n" +
+ " \"apiVersion\": \"v1\",\n" +
+ " \"kind\": \"ConfigMap\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + configMapName + "\",\n" +
+ " \"namespace\": \"" + namespace + "\"\n" +
+ " },\n" +
+ " \"data\": {\n" +
+ " \"" + VALUE_KEY + "\": \"" + value + "\"\n" +
+ " }\n" +
+ "}";
+
+ String createPath = "/api/v1/namespaces/" + namespace + "/configmaps";
+ kubeClient.post(createPath, newConfigMapJson);
+ }
+ } catch (IOException | InterruptedException e) {
+ throw new UnavailableStateException("Failed to set mutable long value: " + e.getMessage(), e);
+ }
+ }
+
+ private String buildConfigMapJson(JsonObject existingConfigMap, long value) {
+ JsonObject metadata = existingConfigMap.getJsonObject("metadata");
+ String resourceVersion = metadata != null ? metadata.getString("resourceVersion", "") : "";
+
+ return "{\n" +
+ " \"apiVersion\": \"v1\",\n" +
+ " \"kind\": \"ConfigMap\",\n" +
+ " \"metadata\": {\n" +
+ " \"name\": \"" + configMapName + "\",\n" +
+ " \"namespace\": \"" + namespace + "\",\n" +
+ " \"resourceVersion\": \"" + resourceVersion + "\"\n" +
+ " },\n" +
+ " \"data\": {\n" +
+ " \"" + VALUE_KEY + "\": \"" + value + "\"\n" +
+ " }\n" +
+ "}";
+ }
+
+ @Override
+ public void close() {
+ // No cleanup needed for now
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java
new file mode 100644
index 00000000000..48a719ec72c
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeRestClient.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.utils.JsonLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeRestClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final String SERVICE_ACCOUNT_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token";
+ static final String SERVICE_ACCOUNT_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
+
+ private final String apiServer;
+ private final String token;
+ private final HttpClient httpClient;
+
+ public KubeRestClient(String apiServer, String token) {
+ this.apiServer = apiServer;
+ this.token = token;
+ this.httpClient = HttpClient.newBuilder()
+ .connectTimeout(Duration.ofSeconds(10))
+ .build();
+ }
+
+ public JsonObject get(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .GET()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() == 404) {
+ return null;
+ }
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public JsonObject post(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public JsonObject put(String path, String jsonBody) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .PUT(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+
+ return JsonLoader.readObject(new StringReader(response.body()));
+ }
+
+ public void delete(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .DELETE()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() != 404 && (response.statusCode() < 200 || response.statusCode() >= 300)) {
+ throw new IOException("HTTP " + response.statusCode() + ": " + response.body());
+ }
+ }
+
+ public int getStatusCode(String path) throws IOException, InterruptedException {
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(apiServer + path))
+ .header("Authorization", "Bearer " + token)
+ .header("Accept", "application/json")
+ .GET()
+ .build();
+
+ HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+ return response.statusCode();
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/client/LockKubeClient.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/client/LockKubeClient.java
new file mode 100644
index 00000000000..ee010c623c8
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/client/LockKubeClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube.client;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.json.JsonObjectBuilder;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.apache.activemq.artemis.utils.JsonLoader;
+import org.apache.activemq.artemis.utils.kubernetes.KubernetesClient;
+
+public class LockKubeClient {
+
+
+ private static String getLockPath(String namespace, String id) {
+
+ return "/apis/coordination.k8s.io/v1/namespaces/" + namespace + "/leases/" + id;
+ }
+
+ public static JsonObject getLease(String namespace, String id) throws Exception {
+ return KubernetesClient.get(getLockPath(namespace, id));
+ }
+
+ public static JsonObject renewLease(String namespace, String id, String resourceVersion, String holderIdentity, String acquireTime, String renewTime, int leaseDurationSeconds) throws Exception {
+ String renewLeaseJson = buildLease(id, namespace, resourceVersion, holderIdentity, acquireTime, renewTime, leaseDurationSeconds);
+ return KubernetesClient.post(getLockPath(namespace, id), renewLeaseJson);
+ }
+
+ public static JsonObject createLease(String namespace, String id, String holderIdentity, String acquireTime, String renewTime, int leaseDurationSeconds) throws Exception {
+ String renewLeaseJson = buildLease(id, namespace, null, holderIdentity, acquireTime, renewTime, leaseDurationSeconds);
+ return KubernetesClient.post(getLockPath(namespace, id), renewLeaseJson);
+ }
+
+
+ public static String buildLease(String id, String namespace, String resourceVersion, String holderIdentity, String acquireTime, String renewTime, int leaseDurationSeconds) {
+ JsonObjectBuilder metadataBuilder = JsonLoader.createObjectBuilder()
+ .add("name", id)
+ .add("namespace", namespace);
+
+ if (resourceVersion != null) {
+ metadataBuilder.add("resourceVersion", resourceVersion);
+ }
+
+ JsonObjectBuilder specBuilder = JsonLoader.createObjectBuilder()
+ .add("holderIdentity", holderIdentity)
+ .add("acquireTime", acquireTime)
+ .add("renewTime", renewTime)
+ .add("leaseDurationSeconds", leaseDurationSeconds);
+
+ JsonObjectBuilder leaseBuilder = JsonLoader.createObjectBuilder()
+ .add("apiVersion", "coordination.k8s.io/v1")
+ .add("kind", "Lease")
+ .add("metadata", metadataBuilder)
+ .add("spec", specBuilder);
+
+ return leaseBuilder.build().toString();
+ }
+
+
+
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/pom.xml b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
index f5bd3223ada..4191819fa57 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/pom.xml
+++ b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
@@ -31,11 +31,4 @@
${project.basedir}/../..
-
-
- org.apache.artemis
- artemis-commons
-
-
-
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
new file mode 100644
index 00000000000..2204b10e500
--- /dev/null
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.lockmanager;
+
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.joining;
+
+public abstract class AbstractDistributedLockManager implements DistributedLockManager {
+
+ public AbstractDistributedLockManager() {
+ }
+
+ public AbstractDistributedLockManager(Map properties) {
+ validateParameters(properties);
+ }
+
+ protected String commaOnParameters() {
+ return getValidParams().stream().collect(joining(","));
+ }
+
+
+ protected void validateParameters(Map config) {
+ config.forEach((parameterName, ignore) -> validateParameter(parameterName));
+ }
+
+ protected abstract Set getValidParams();
+
+ protected void validateParameter(String parameterName) {
+ if (!getValidParams().contains(parameterName)) {
+ throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + commaOnParameters());
+ }
+ }
+
+
+
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
index d42f8e985fa..71b7132099d 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
@@ -16,21 +16,20 @@
*/
package org.apache.activemq.artemis.lockmanager;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.activemq.artemis.utils.ClassloadingUtil;
-
public interface DistributedLockManager extends AutoCloseable {
static DistributedLockManager newInstanceOf(String className, Map properties) throws Exception {
- return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className,
- DistributedLockManager.class,
- DistributedLockManager.class.getClassLoader(),
- new Class[]{Map.class},
- properties);
+ return (DistributedLockManager) getInstanceForParamsWithTypeCheck(className,
+ DistributedLockManager.class,
+ DistributedLockManager.class.getClassLoader(),
+ new Class[]{Map.class},
+ properties);
}
@FunctionalInterface
@@ -59,4 +58,24 @@ interface UnavailableManagerListener {
default void close() {
stop();
}
+
+ // This is copied from ClassLoadingUtils..
+ // I need to cut the dependency here to make it easier for external modules to implement the API here.
+ private static Object getInstanceForParamsWithTypeCheck(String className,
+ Class> expectedType,
+ ClassLoader loader, Class>[] parameterTypes, Object... params) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ final Class> clazz = loadWithCheck(className, expectedType, loader);
+ return clazz.getDeclaredConstructor(parameterTypes).newInstance(params);
+ }
+
+ private static Class> loadWithCheck(String className,
+ Class> expectedType,
+ ClassLoader loader) throws ClassNotFoundException {
+ Class> clazz = loader.loadClass(className);
+ if (!expectedType.isAssignableFrom(clazz)) {
+ throw new IllegalStateException("clazz [" + className + "] is not assignable from expected type: " + expectedType);
+ }
+ return clazz;
+ }
+
}
\ No newline at end of file
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
index 3329b88bf79..a47e9e7cd8b 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
@@ -23,11 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
@@ -42,14 +45,25 @@
* The directory must be created in advance before using this lock manager.
*
*/
-public class FileBasedLockManager implements DistributedLockManager {
+public class FileBasedLockManager extends AbstractDistributedLockManager {
private final File locksFolder;
private final Map locks;
private boolean started;
+ private static final String LOCKS_FOLDER = "locks-folder";
+ private static final Set VALID_PARAMS = Stream.of(
+ LOCKS_FOLDER).collect(Collectors.toSet());
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
public FileBasedLockManager(Map args) {
this(new File(args.get("locks-folder")));
+
+ validateParameters(args);
}
public FileBasedLockManager(File locksFolder) {
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
index e6aa6689ca2..cb8ba223390 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
@@ -28,8 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -42,7 +42,6 @@
import org.apache.curator.utils.DebugUtils;
import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.joining;
/**
* ZooKeeper-based distributed lock manager using Apache Curator.
@@ -58,7 +57,7 @@
* retries-ms (optional, default: 1000): Delay in milliseconds between retry attempts
*
*/
-public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener {
+public class CuratorDistributedLockManager extends AbstractDistributedLockManager implements ConnectionStateListener {
enum PrimitiveType {
lock, mutableLong;
@@ -111,6 +110,11 @@ public int hashCode() {
}
}
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
private static final String CONNECT_STRING_PARAM = "connect-string";
private static final String NAMESPACE_PARAM = "namespace";
private static final String SESSION_MS_PARAM = "session-ms";
@@ -126,7 +130,6 @@ public int hashCode() {
CONNECTION_MS_PARAM,
RETRIES_PARAM,
RETRIES_MS_PARAM).collect(Collectors.toSet());
- private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
// It's 9 times the default ZK tick time ie 2000 ms
private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000);
private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000);
@@ -135,17 +138,6 @@ public int hashCode() {
// why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14
private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33);
- private static Map validateParameters(Map config) {
- config.forEach((parameterName, ignore) -> validateParameter(parameterName));
- return config;
- }
-
- private static void validateParameter(String parameterName) {
- if (!VALID_PARAMS.contains(parameterName)) {
- throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR);
- }
- }
-
private CuratorFramework client;
private final Map primitives;
private List listeners;
@@ -161,10 +153,6 @@ private static void validateParameter(String parameterName) {
}
public CuratorDistributedLockManager(Map config) {
- this(validateParameters(config), true);
- }
-
- private CuratorDistributedLockManager(Map config, boolean ignore) {
this(config.get(CONNECT_STRING_PARAM),
config.get(NAMESPACE_PARAM),
Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)),
@@ -172,6 +160,7 @@ private CuratorDistributedLockManager(Map config, boolean ignore
Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)),
Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)),
Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS)));
+ validateParameters(config);
}
private CuratorDistributedLockManager(String connectString,
diff --git a/artemis-lockmanager/pom.xml b/artemis-lockmanager/pom.xml
index cbb24a3aa8f..9c5c3f05de3 100644
--- a/artemis-lockmanager/pom.xml
+++ b/artemis-lockmanager/pom.xml
@@ -32,6 +32,7 @@
artemis-lockmanager-api
artemis-lockmanager-ri
+ artemis-kube-lock
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index f8edce0c30e..05cf276baa5 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -951,6 +951,20 @@
+
+
+ io.kubernetes
+ client-java
+ ${kubernetes.client.version}
+
+
+
+
+ io.kubernetes
+ client-java-extended
+ ${kubernetes.client.version}
+
+
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
index f722c5354f5..5573f3c246a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
@@ -220,7 +220,7 @@ public boolean isLocked() {
* @param name a descriptive name for this lock coordinator
*/
public LockCoordinator(ScheduledExecutorService scheduledExecutor, Executor executor, long checkPeriod, DistributedLockManager lockManager, String lockID, String name) {
- super(scheduledExecutor, executor, checkPeriod, checkPeriod, TimeUnit.MILLISECONDS, false);
+ super(scheduledExecutor, executor, 0, checkPeriod, TimeUnit.MILLISECONDS, false);
assert executor != null;
this.lockManager = lockManager;
this.lockID = lockID;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModule.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModule.java
index 504324c0bf6..597fce406e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModule.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModule.java
@@ -31,8 +31,8 @@
import javax.security.auth.login.LoginException;
import org.apache.activemq.artemis.logs.AuditLogger;
-import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.KubernetesClient;
-import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.KubernetesClientImpl;
+import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.TokenReviewKubeClient;
+import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.TokenReviewKubeClientImpl;
import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,14 +49,14 @@ public class KubernetesLoginModule extends PropertiesLoader implements AuditLogi
private boolean ignoreTokenReviewRoles = false;
private Map> roles;
private final Set principals = new LinkedHashSet<>();
- private final KubernetesClient client;
+ private final TokenReviewKubeClient client;
- public KubernetesLoginModule(KubernetesClient client) {
+ public KubernetesLoginModule(TokenReviewKubeClient client) {
this.client = client;
}
public KubernetesLoginModule() {
- this(new KubernetesClientImpl());
+ this(new TokenReviewKubeClientImpl());
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImpl.java
deleted file mode 100644
index e51ed960d66..00000000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImpl.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client;
-
-import static java.net.HttpURLConnection.HTTP_CREATED;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.net.http.HttpResponse.BodyHandlers;
-import java.nio.file.Path;
-import java.security.KeyStore;
-import java.security.SecureRandom;
-import java.util.Scanner;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
-import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
-import org.apache.activemq.artemis.utils.JsonLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KubernetesClientImpl implements KubernetesClient {
-
- private static final Logger logger = LoggerFactory.getLogger(KubernetesClientImpl.class);
-
- private static final String KUBERNETES_HOST = "KUBERNETES_SERVICE_HOST";
- private static final String KUBERNETES_PORT = "KUBERNETES_SERVICE_PORT";
- private static final String KUBERNETES_TOKEN_PATH = "KUBERNETES_TOKEN_PATH";
- private static final String KUBERNETES_CA_PATH = "KUBERNETES_CA_PATH";
-
- private static final String KUBERNETES_TOKENREVIEW_URI_PATTERN = "https://%s:%s/apis/authentication.k8s.io/v1/tokenreviews";
-
- private static final String DEFAULT_KUBERNETES_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token";
- private static final String DEFAULT_KUBERNETES_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
-
- private URI apiUri;
- private String tokenPath;
-
- private static volatile HttpClient httpClient;
-
- private static HttpClient getHttpClientSingleton() {
- HttpClient result = httpClient;
- if (result != null) {
- return result;
- }
- synchronized (KubernetesClientImpl.class) {
- if (httpClient == null) {
- try {
- httpClient = HttpClient.newBuilder().sslContext(buildSSLContext()).build();
- } catch (Exception e) {
- logger.error("Unable to build a valid SSLContext or HttpClient", e);
- }
- }
- }
- return httpClient;
- }
-
- // for tests
- public static void clearHttpClient() {
- httpClient = null;
- }
-
- public KubernetesClientImpl() {
- this.tokenPath = getParam(KUBERNETES_TOKEN_PATH, DEFAULT_KUBERNETES_TOKEN_PATH);
- String host = getParam(KUBERNETES_HOST);
- String port = getParam(KUBERNETES_PORT);
- this.apiUri = URI.create(String.format(KUBERNETES_TOKENREVIEW_URI_PATTERN, host, port));
- logger.debug("using apiUri {}", apiUri);
- }
-
- public static String getParam(String name, String defaultValue) {
- String value = System.getProperty(name);
- if (value == null) {
- value = System.getenv(name);
- }
- if (value == null) {
- return defaultValue;
- }
- return value;
- }
-
- private String getParam(String name) {
- return getParam(name, null);
- }
-
- @Override
- public TokenReview getTokenReview(String token) {
- TokenReview tokenReview = new TokenReview();
- String authToken = null;
- try {
- logger.debug("Loading client authentication token from {}", tokenPath);
- authToken = readFile(tokenPath);
- logger.debug("Loaded client authentication token from {}", tokenPath);
- } catch (IOException e) {
- logger.error("Cannot retrieve Service Account Authentication Token from " + tokenPath, e);
- return tokenReview;
- }
- String jsonRequest = buildJsonRequest(token);
-
- HttpClient client = getHttpClient();
- if (client == null) {
- return tokenReview;
- }
-
- HttpRequest request = HttpRequest.newBuilder(apiUri)
- .header("Authorization", "Bearer " + authToken)
- .header("Accept", "application/json; charset=utf-8")
- .POST(HttpRequest.BodyPublishers.ofString(jsonRequest)).build();
- logger.debug("Submit TokenReview request to Kubernetes API");
-
- try {
- HttpResponse response = client.send(request, BodyHandlers.ofString());
- if (response.statusCode() == HTTP_CREATED) {
- logger.debug("Received valid TokenReview response");
- return TokenReview.fromJsonString(response.body());
- }
- logger.error("Unable to retrieve a valid TokenReview. Received StatusCode: {}. Body: {}",
- response.statusCode(), response.body());
- } catch (IOException | InterruptedException e) {
- logger.error("Unable to request ReviewToken", e);
- }
- return tokenReview;
- }
-
- protected HttpClient getHttpClient() {
- return KubernetesClientImpl.getHttpClientSingleton();
- }
-
- private String readFile(String path) throws IOException {
- try (Scanner scanner = new Scanner(Path.of(path))) {
- StringBuilder buffer = new StringBuilder();
- while (scanner.hasNextLine()) {
- String line = scanner.nextLine();
- if (!line.isBlank() && !line.startsWith("#")) {
- buffer.append(line);
- }
- }
- return buffer.toString();
- }
- }
-
- private String buildJsonRequest(String clientToken) {
- return JsonLoader.createObjectBuilder()
- .add("apiVersion", "authentication.k8s.io/v1")
- .add("kind", "TokenReview")
- .add("spec", JsonLoader.createObjectBuilder()
- .add("token", clientToken)
- .build())
- .build().toString();
- }
-
- private static SSLContext buildSSLContext() throws Exception {
- SSLContext ctx = SSLContext.getInstance("TLS");
- String caPath = getParam(KUBERNETES_CA_PATH, DEFAULT_KUBERNETES_CA_PATH);
- File certFile = new File(caPath);
- if (!certFile.exists()) {
- logger.debug("Kubernetes CA certificate not found at: {}. Truststore not configured", caPath);
- return ctx;
- }
- KeyStore trustStore = SSLSupport.loadKeystore(null, "PEMCA", caPath, null);
- TrustManagerFactory tmFactory = TrustManagerFactory
- .getInstance(TrustManagerFactory.getDefaultAlgorithm());
- tmFactory.init(trustStore);
-
- ctx.init(null, tmFactory.getTrustManagers(), new SecureRandom());
- return ctx;
- }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClient.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClient.java
similarity index 96%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClient.java
rename to artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClient.java
index fd0b8877df3..ed32a88866f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClient.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClient.java
@@ -18,7 +18,7 @@
import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
-public interface KubernetesClient {
+public interface TokenReviewKubeClient {
TokenReview getTokenReview(String token);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientImpl.java
new file mode 100644
index 00000000000..23667490742
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.json.JsonObject;
+import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
+import org.apache.activemq.artemis.utils.JsonLoader;
+import org.apache.activemq.artemis.utils.kubernetes.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TokenReviewKubeClientImpl implements TokenReviewKubeClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ String tokenReviewPath = "/apis/authentication.k8s.io/v1/tokenreviews";
+
+ @Override
+ public TokenReview getTokenReview(String token) {
+ String jsonRequest = buildTokenReviewRequest(token);
+
+ try {
+ JsonObject response = KubernetesClient.post(tokenReviewPath, jsonRequest);
+ return TokenReview.fromJson(response);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ // TODO: shouldn't we rather throw an exception here?
+ return new TokenReview();
+ }
+ }
+
+ public String buildTokenReviewRequest(String clientToken) {
+ return JsonLoader.createObjectBuilder()
+ .add("apiVersion", "authentication.k8s.io/v1")
+ .add("kind", "TokenReview")
+ .add("spec", JsonLoader.createObjectBuilder()
+ .add("token", clientToken)
+ .build())
+ .build().toString();
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/model/TokenReview.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/model/TokenReview.java
index 2b3d6bed7af..003e8bb54be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/model/TokenReview.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/model/TokenReview.java
@@ -52,15 +52,15 @@ public List getAudiences() {
public static TokenReview fromJsonString(String obj) {
JsonObject json = JsonLoader.readObject(new StringReader(obj));
- JsonObject status = json.getJsonObject("status");
- return TokenReview.fromJson(status);
+ return TokenReview.fromJson(json);
}
- private static TokenReview fromJson(JsonObject obj) {
+ public static TokenReview fromJson(JsonObject obj) {
TokenReview t = new TokenReview();
if (obj == null) {
return t;
}
+ obj = obj.getJsonObject("status");
t.authenticated = obj.getBoolean("authenticated", false);
t.user = User.fromJson(obj.getJsonObject("user"));
t.audiences = listFromJson(obj.getJsonArray("audiences"));
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupport.java
index 1fa44d73ea4..5e5e8e042c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupport.java
@@ -38,6 +38,8 @@
import org.apache.activemq.artemis.spi.core.security.jaas.OIDCLoginModule;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.utils.rest.HttpClientAccess;
+import org.apache.activemq.artemis.utils.rest.SharedHttpClientAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -185,7 +187,12 @@ public void initialize() {
if (this.oidcMetadataAccess == null) {
if (this.httpClientAccess == null) {
- this.httpClientAccess = new SharedHttpClientAccess(options, debug);
+
+ int httpTimeout = OIDCSupport.intOption(OIDCSupport.ConfigKey.HTTP_TIMEOUT_MILLISECONDS, options);
+ String tlsVersion = OIDCSupport.stringOption(OIDCSupport.ConfigKey.TLS_VERSION, options);
+ String caCertificate = OIDCSupport.stringOption(OIDCSupport.ConfigKey.CA_CERTIFICATE, options);
+
+ this.httpClientAccess = new SharedHttpClientAccess(httpTimeout, tlsVersion, caCertificate);
}
this.oidcMetadataAccess = new SharedOIDCMetadataAccess(this.httpClientAccess, options, debug);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedOIDCMetadataAccess.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedOIDCMetadataAccess.java
index 7b4d3b36772..68fb85b977d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedOIDCMetadataAccess.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedOIDCMetadataAccess.java
@@ -33,6 +33,7 @@
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.utils.JsonLoader;
+import org.apache.activemq.artemis.utils.rest.HttpClientAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java
new file mode 100644
index 00000000000..b3fc4a1777a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/HttpClientAccess.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.rest;
+
+import javax.security.auth.login.LoginContext;
+import java.net.URI;
+import java.net.http.HttpClient;
+
+/**
+ * Accessor for {@link HttpClient} used by login modules. The default implementation should return
+ * cached instance, because {@link javax.security.auth.spi.LoginModule login modules} are instantiated on each
+ * {@link LoginContext#login()}, but for test purposes we could return mocked instance.
+ */
+public interface HttpClientAccess {
+
+ /**
+ * Get an instance of {@link HttpClient JDK HTTP client} to be used for OIDC operations like getting keys or
+ * OIDC metadata. Could be used by {@link org.apache.activemq.artemis.spi.core.security.jaas.KubernetesLoginModule}
+ * too if needed. When {@code baseURI} is passed, we may get a cached/shared client instance configured for this
+ * specific URI.
+ *
+ * @param baseURI URI for caching purpose
+ * @return {@link HttpClient}
+ */
+ HttpClient getClient(URI baseURI);
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedHttpClientAccess.java b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java
similarity index 80%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedHttpClientAccess.java
rename to artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java
index db48dca5ea2..3ac8501f8f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/SharedHttpClientAccess.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/utils/rest/SharedHttpClientAccess.java
@@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.spi.core.security.jaas.oidc;
+package org.apache.activemq.artemis.utils.rest;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
import java.io.File;
+import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.http.HttpClient;
import java.security.KeyManagementException;
@@ -26,10 +29,9 @@
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
+import org.apache.activemq.artemis.utils.ssl.KeyStoreSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,22 +42,18 @@
*/
public class SharedHttpClientAccess implements HttpClientAccess {
- public static final Logger LOG = LoggerFactory.getLogger(SharedHttpClientAccess.class);
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Map cache = new ConcurrentHashMap<>();
- // ---- Config options from just-initialized JAAS LoginModule
-
private final int httpTimeout;
private final String tlsVersion;
private final String caCertificate;
- private final boolean debug;
- public SharedHttpClientAccess(Map options, boolean debug) {
- this.httpTimeout = OIDCSupport.intOption(OIDCSupport.ConfigKey.HTTP_TIMEOUT_MILLISECONDS, options);
- this.tlsVersion = OIDCSupport.stringOption(OIDCSupport.ConfigKey.TLS_VERSION, options);
- this.caCertificate = OIDCSupport.stringOption(OIDCSupport.ConfigKey.CA_CERTIFICATE, options);
- this.debug = debug;
+ public SharedHttpClientAccess(final int httpTimeout, String tlsVersion, String caCertificate) {
+ this.httpTimeout = httpTimeout;
+ this.tlsVersion = tlsVersion;
+ this.caCertificate = caCertificate;
}
@Override
@@ -66,8 +64,8 @@ public HttpClient getClient(URI baseURI) {
client = cache.get(baseURI);
if (client == null) {
client = createDefaultHttpClient();
- if (debug) {
- LOG.debug("Created new HTTP Client for accessing OIDC provider at {}", baseURI);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created new HTTP Client at {}", baseURI);
}
cache.put(baseURI, client);
}
@@ -92,10 +90,10 @@ public HttpClient createDefaultHttpClient() {
try {
File caCertificateFile = new File(caCertificate);
if (!caCertificateFile.isFile()) {
- LOG.warn("The certificate file {} does not exist", caCertificate);
+ logger.warn("The certificate file {} does not exist", caCertificate);
} else {
SSLContext sslContext = SSLContext.getInstance(tlsVersion);
- KeyStore trustStore = SSLSupport.loadKeystore(null, "PEMCA", caCertificate, null);
+ KeyStore trustStore = KeyStoreSupport.loadKeystore(null, KeyStoreSupport.PEMCA, caCertificate, null);
TrustManagerFactory tmFactory = TrustManagerFactory
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmFactory.init(trustStore);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 2af3e1b1449..7163d662200 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3266,7 +3266,7 @@
- A period used to verify if the lock still valid and renew it if needed.
+ A period in milliseconds used to verify if the lock still valid and renew it if needed.
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModuleTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModuleTest.java
index d2c20565a36..d81cd018d4d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModuleTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/KubernetesLoginModuleTest.java
@@ -38,13 +38,13 @@
import javax.security.auth.login.LoginException;
import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.TokenCallbackHandler;
-import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.KubernetesClient;
+import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.client.TokenReviewKubeClient;
import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
import org.junit.jupiter.api.Test;
public class KubernetesLoginModuleTest {
- private final KubernetesClient client = mock(KubernetesClient.class);
+ private final TokenReviewKubeClient client = mock(TokenReviewKubeClient.class);
private final KubernetesLoginModule loginModule = new KubernetesLoginModule(client);
private static final String TOKEN = "the_token";
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleLoginContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleLoginContextTest.java
index 54dc863dfe6..43a09a1e1c9 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleLoginContextTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleLoginContextTest.java
@@ -26,11 +26,11 @@
import com.nimbusds.jose.jwk.KeyUse;
import com.nimbusds.jwt.JWTClaimsSet;
import com.nimbusds.jwt.SignedJWT;
-import org.apache.activemq.artemis.spi.core.security.jaas.oidc.HttpClientAccess;
import org.apache.activemq.artemis.spi.core.security.jaas.oidc.OIDCMetadataAccess;
import org.apache.activemq.artemis.spi.core.security.jaas.oidc.OIDCSupport;
-import org.apache.activemq.artemis.spi.core.security.jaas.oidc.SharedHttpClientAccess;
import org.apache.activemq.artemis.spi.core.security.jaas.oidc.SharedOIDCMetadataAccess;
+import org.apache.activemq.artemis.utils.rest.HttpClientAccess;
+import org.apache.activemq.artemis.utils.rest.SharedHttpClientAccess;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleTest.java
index 8f3178bd1d8..ce0a107073c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/OIDCLoginModuleTest.java
@@ -41,8 +41,8 @@
import org.apache.activemq.artemis.core.security.jaas.StubX509Certificate;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.jaas.oidc.OIDCSupport;
-import org.apache.activemq.artemis.spi.core.security.jaas.oidc.SharedHttpClientAccess;
import org.apache.activemq.artemis.spi.core.security.jaas.oidc.SharedOIDCMetadataAccess;
+import org.apache.activemq.artemis.utils.rest.SharedHttpClientAccess;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientTest.java
similarity index 79%
rename from artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImplTest.java
rename to artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientTest.java
index 456460a2d5b..6f28dcefd28 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/KubernetesClientImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/kubernetes/client/TokenReviewKubeClientTest.java
@@ -36,6 +36,7 @@
import java.util.Set;
import org.apache.activemq.artemis.spi.core.security.jaas.kubernetes.model.TokenReview;
+import org.apache.activemq.artemis.utils.kubernetes.KubernetesClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -49,7 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KubernetesClientImplTest {
+public class TokenReviewKubeClientTest {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,8 +69,9 @@ public class KubernetesClientImplTest {
@BeforeAll
public static void startServer() {
- ConfigurationProperties.certificateAuthorityPrivateKey(KubernetesClientImplTest.class.getClassLoader().getResource("server-ca.pem").getPath());
- ConfigurationProperties.certificateAuthorityCertificate(KubernetesClientImplTest.class.getClassLoader().getResource("server-ca-cert.pem").getPath());
+ ConfigurationProperties.directoryToSaveDynamicSSLCertificate("target");
+ ConfigurationProperties.certificateAuthorityPrivateKey(TokenReviewKubeClientTest.class.getClassLoader().getResource("server-ca.pem").getPath());
+ ConfigurationProperties.certificateAuthorityCertificate(TokenReviewKubeClientTest.class.getClassLoader().getResource("server-ca-cert.pem").getPath());
ConfigurationProperties.preventCertificateDynamicUpdate(false);
ConfigurationProperties.proactivelyInitialiseTLS(true);
@@ -80,12 +82,12 @@ public static void startServer() {
assertNotNull(mockServer);
assertTrue(mockServer.hasStarted());
- System.setProperty("KUBERNETES_SERVICE_HOST", host);
- System.setProperty("KUBERNETES_SERVICE_PORT", port);
- System.setProperty("KUBERNETES_TOKEN_PATH",
- KubernetesClientImplTest.class.getClassLoader().getResource("client_token").getPath());
+ KubernetesClient.setParam("KUBERNETES_SERVICE_HOST", host);
+ KubernetesClient.setParam("KUBERNETES_SERVICE_PORT", port);
+ KubernetesClient.setParam("KUBERNETES_TOKEN_PATH",
+ TokenReviewKubeClientTest.class.getClassLoader().getResource("client_token").getPath());
- URL caPath = KubernetesClientImplTest.class.getClassLoader()
+ URL caPath = TokenReviewKubeClientTest.class.getClassLoader()
.getResource("client-and-server-ca-certs.pem");
assertNotNull(caPath);
@@ -95,16 +97,14 @@ public static void startServer() {
@AfterAll
public static void stopServer() {
- System.clearProperty("KUBERNETES_SERVICE_HOST");
- System.clearProperty("KUBERNETES_SERVICE_PORT");
- System.clearProperty("KUBERNETES_TOKEN_PATH");
- System.clearProperty("KUBERNETES_CA_PATH");
mockServer.stop();
+ KubernetesClient.clearHttpClient();
+ KubernetesClient.clearParams();
}
@BeforeEach
public void reset() {
- KubernetesClientImpl.clearHttpClient();
+ KubernetesClient.clearHttpClient();
mockServer.reset();
}
@@ -139,7 +139,7 @@ public void testGetTokenReview() {
response()
.withStatusCode(HTTP_INTERNAL_ERROR));
- KubernetesClient client = new KubernetesClientImpl();
+ TokenReviewKubeClient client = new TokenReviewKubeClientImpl();
TokenReview tr = client.getTokenReview("bob_token");
assertNotNull(tr);
@@ -169,13 +169,12 @@ public void testGetParam() throws Exception {
if (System.getProperty(envKv.getKey()) == null) {
- KubernetesClientImpl clientImpl = new KubernetesClientImpl();
- assertEquals(envKv.getValue(), KubernetesClientImpl.getParam(envKv.getKey(), null));
+ assertEquals(envKv.getValue(), KubernetesClient.getParam(envKv.getKey(), null));
final String valFromProp = "bla";
try {
System.setProperty(envKv.getKey(), valFromProp);
- assertEquals(valFromProp, KubernetesClientImpl.getParam(envKv.getKey(), null));
+ assertEquals(valFromProp, KubernetesClient.getParam(envKv.getKey(), null));
} finally {
System.clearProperty(envKv.getKey());
}
@@ -184,7 +183,7 @@ public void testGetParam() throws Exception {
String candidate = valFromProp;
for (int i = 0; i < 10; i++) {
if (System.getenv(candidate) == null && System.getProperty(candidate) == null) {
- assertEquals(candidate, KubernetesClientImpl.getParam(candidate, candidate));
+ assertEquals(candidate, KubernetesClient.getParam(candidate, candidate));
break;
}
candidate += i;
@@ -195,11 +194,4 @@ public void testGetParam() throws Exception {
}
}
}
-
- @Test
- public void testSingeltonHttpClient() throws Exception {
- KubernetesClientImpl clientImplFirst = new KubernetesClientImpl();
- KubernetesClientImpl clientImplSecond = new KubernetesClientImpl();
- assertEquals(clientImplFirst.getHttpClient(), clientImplSecond.getHttpClient());
- }
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupportTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupportTest.java
index 5edb7698ddb..1f1c3f76142 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupportTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/OIDCSupportTest.java
@@ -23,6 +23,7 @@
import com.nimbusds.jwt.JWTClaimsSet;
import org.apache.activemq.artemis.core.security.jaas.StubX509Certificate;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
+import org.apache.activemq.artemis.utils.rest.HttpClientAccess;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccessTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/rest/HttpClientAccessTest.java
similarity index 90%
rename from artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccessTest.java
rename to artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/rest/HttpClientAccessTest.java
index 401b6010141..2df81d2acb7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/spi/core/security/jaas/oidc/HttpClientAccessTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/rest/HttpClientAccessTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.spi.core.security.jaas.oidc;
+package org.apache.activemq.artemis.tests.util.rest;
import java.io.IOException;
import java.net.URI;
@@ -22,6 +22,8 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
+import org.apache.activemq.artemis.spi.core.security.jaas.oidc.OIDCMetadataAccessTest;
+import org.apache.activemq.artemis.utils.rest.SharedHttpClientAccess;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,7 +34,7 @@
import static org.mockito.Mockito.when;
/**
- * Tests for {@link HttpClientAccess} - we don't have to test much, because we'll be mocking {@link HttpClient}
+ * Tests for {@link org.apache.activemq.artemis.utils.rest.HttpClientAccess} - we don't have to test much, because we'll be mocking {@link HttpClient}
* anyway. So HTTP issues should actually be covered in {@link OIDCMetadataAccessTest}.
*/
public class HttpClientAccessTest {
@@ -74,8 +76,8 @@ public void justMockingHttpClient() throws IOException, InterruptedException {
@Test
public void sharedHttpClientAccess() {
- SharedHttpClientAccess access1 = new SharedHttpClientAccess(null, true);
- SharedHttpClientAccess access2 = new SharedHttpClientAccess(null, true);
+ SharedHttpClientAccess access1 = new SharedHttpClientAccess(1000, null, null);
+ SharedHttpClientAccess access2 = new SharedHttpClientAccess(1000, null, null);
HttpClient client1 = access1.getClient(URI.create("http://localhost:8080"));
HttpClient client2 = access1.getClient(URI.create("https://localhost:8443"));
@@ -90,7 +92,7 @@ public void sharedHttpClientAccess() {
@Test
public void defaultClient() {
- SharedHttpClientAccess access = new SharedHttpClientAccess(null, true);
+ SharedHttpClientAccess access = new SharedHttpClientAccess(1000, null, null);
HttpClient client = access.getClient(URI.create("http://localhost:8081"));
diff --git a/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java b/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java
index 5b7f122f445..4d02a46fd3b 100644
--- a/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java
+++ b/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java
@@ -89,7 +89,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport.checkPemProviderLoaded;
+import static org.apache.activemq.artemis.utils.ssl.KeyStoreSupport.checkPemProviderLoaded;
public class WebServerComponent implements ExternalComponent, WebServerComponentMarker {
diff --git a/docs/user-manual/lock-coordination.adoc b/docs/user-manual/lock-coordination.adoc
index 92c8679876d..e877ab1e118 100644
--- a/docs/user-manual/lock-coordination.adoc
+++ b/docs/user-manual/lock-coordination.adoc
@@ -202,3 +202,85 @@ This is the recommended approach for production deployments as it provides bette
|1000
|Delay in milliseconds between retry attempts
|===
+
+=== Kubernetes-Based Lock Manager
+
+The Kubernetes-based lock manager uses Kubernetes Lease resources to manage distributed locks.
+This implementation is designed for deployments running in Kubernetes environments and leverages the Kubernetes coordination API for leader election and distributed locking.
+
+The lock manager uses the Kubernetes Lease coordination mechanism, which automatically handles lease renewal and expiration.
+When running inside a Kubernetes cluster, the implementation can automatically detect the namespace from the service account configuration.
+
+**Class name:** `org.apache.artemis.lock.kube.KubeLockManager`
+
+**Properties:**
+
+[cols="1,1,1,3"]
+|===
+|Property |Required |Default |Description
+
+|hostname
+|No
+|HOSTNAME environment variable
+|The hostname identifier for this broker instance. Used to identify which pod currently holds the lock. When running in Kubernetes, this is typically set automatically by the cluster.
+
+|namespace
+|No
+|Auto-detected from `/var/run/secrets/kubernetes.io/serviceaccount/namespace`, falls back to "default"
+|The Kubernetes namespace where Lease resources will be created and managed. When running inside a Kubernetes cluster, this is automatically read from the service account configuration.
+
+|lease-timeout
+|No
+|30
+|The lease timeout in seconds. This determines how long a lease remains valid without renewal. If a broker crashes or loses connectivity, the lease will expire after this period, allowing another broker to acquire it.
+|===
+
+**Example Configuration:**
+
+[,xml]
+----
+
+
+ org.apache.artemis.lock.kube.KubeLockManager
+ artemis-ha-lock
+ 5000
+
+
+
+
+
+
+
+----
+
+**Kubernetes RBAC Requirements:**
+
+The broker's service account requires permissions to create, read, update, and delete Lease resources in the coordination.k8s.io API group.
+A minimal RBAC configuration would include:
+
+[,yaml]
+----
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: artemis-lease-manager
+ namespace: artemis-namespace
+rules:
+- apiGroups: ["coordination.k8s.io"]
+ resources: ["leases"]
+ verbs: ["get", "create", "update", "delete"]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: RoleBinding
+metadata:
+ name: artemis-lease-manager-binding
+ namespace: artemis-namespace
+subjects:
+- kind: ServiceAccount
+ name: artemis
+ namespace: artemis-namespace
+roleRef:
+ kind: Role
+ name: artemis-lease-manager
+ apiGroup: rbac.authorization.k8s.io
+----
diff --git a/pom.xml b/pom.xml
index 2fcae7bbf64..67046393579 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,9 @@
3.0.0
10.9.1
+
+ 26.0.0
+
3.8.1
diff --git a/run-kube-test.sh b/run-kube-test.sh
new file mode 100644
index 00000000000..1fd85882380
--- /dev/null
+++ b/run-kube-test.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+# Script to run Kubernetes lock manager tests with minikube
+
+# Get the minikube API server URL
+KUBE_API_SERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
+
+# Get/create a service account token
+KUBE_TOKEN=$(kubectl create token artemis-test -n default --duration=8760h 2>/dev/null || kubectl get secret -n default -o jsonpath='{.items[?(@.metadata.annotations.kubernetes\.io/service-account\.name=="artemis-test")].data.token}' | base64 -d)
+
+if [ -z "$KUBE_TOKEN" ]; then
+ echo "Error: Could not get token for artemis-test service account"
+ echo "Make sure the service account exists and has proper permissions"
+ exit 1
+fi
+
+echo "Using API server: $KUBE_API_SERVER"
+echo "Token obtained successfully"
+echo ""
+echo "Running test..."
+
+# Export the environment variables and run the test
+export KUBE_API_SERVER
+export KUBE_TOKEN
+
+mvn test \
+ -pl tests/smoke-tests \
+ -Dtest=LockCoordinatorTest#testWithKube \
+ -DenableKubernetes=true
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 772b420437f..5b9814ea4c3 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -31,6 +31,7 @@
-Ddistribution.lib="${activemq.basedir}/artemis-distribution/target/apache-artemis-${project.version}-bin/apache-artemis-${project.version}/lib"
localhost
+ false
@@ -45,6 +46,12 @@
compile
pom
+
+ org.apache.artemis
+ artemis-kube-lock
+ ${project.version}
+ test
+
org.apache.artemis
artemis-server
@@ -412,7 +419,7 @@
1
false
${skipSmokeTests}
- ${sts-surefire-extra-args} ${activemq-surefire-argline} ${artemis-distribution-lib-dir}
+ ${sts-surefire-extra-args} ${activemq-surefire-argline} ${artemis-distribution-lib-dir} -DenableKubernetes=${enableKubernetes}
@@ -451,6 +458,15 @@
+
+
+ kubernetes-tests
+
+ true
+
+
+
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
index 4d3edf8ce85..1c9e8e0c5b5 100644
--- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml
@@ -105,8 +105,8 @@ under the License.
- tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
- tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
index 918c417197f..0a709f8b446 100644
--- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml
@@ -105,8 +105,8 @@ under the License.
- tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
- tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
index f5f50f78360..b973b41b8fe 100644
--- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml
@@ -95,7 +95,7 @@ under the License.
-->
- tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
index 8b9b054733a..efa954f38bb 100644
--- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml
@@ -98,7 +98,7 @@ under the License.
-->
- tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml
new file mode 100644
index 00000000000..f7a3f8fa6b7
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml
@@ -0,0 +1,208 @@
+
+
+
+
+
+
+
+ 0.0.0.0
+
+ true
+
+
+ NIO
+
+ ./data/paging
+
+ ./data/bindings
+
+ ./data/journal
+
+ ./data/large-messages
+
+ true
+
+ 2
+
+ -1
+
+ 1000
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5000
+
+
+ 90
+
+
+
+
+
+
+
+
+
+
+
+ tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+
+
+
+
+ org.apache.artemis.lock.kube.KubeLockManager
+ fail
+ 5000
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 1
+ 10
+ PAGE
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml
new file mode 100644
index 00000000000..3f0a8bd19fb
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml
@@ -0,0 +1,208 @@
+
+
+
+
+
+
+
+ 0.0.0.0
+
+ true
+
+
+ NIO
+
+ ./data/paging
+
+ ./data/bindings
+
+ ./data/journal
+
+ ./data/large-messages
+
+ true
+
+ 2
+
+ -1
+
+ 1000
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5000
+
+
+ 90
+
+
+
+
+
+
+
+
+
+
+
+ tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
+
+
+
+
+ org.apache.artemis.lock.kube.KubeLockManager
+ fail
+ 5000
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 1
+ 10
+ PAGE
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
index 02c163a78dd..2fb169777b1 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
@@ -34,15 +34,19 @@
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -50,6 +54,9 @@ public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String SERVER_NAME_WITH_KUBE_A = "lockmanager/dualMirrorSingleAcceptor/kube/A";
+ public static final String SERVER_NAME_WITH_KUBE_B = "lockmanager/dualMirrorSingleAcceptor/kube/B";
+
public static final String SERVER_NAME_WITH_ZK_A = "lockmanager/dualMirrorSingleAcceptor/ZK/A";
public static final String SERVER_NAME_WITH_ZK_B = "lockmanager/dualMirrorSingleAcceptor/ZK/B";
@@ -109,6 +116,22 @@ public void prepareServers() throws Exception {
}
+ @Test
+ @EnabledIfSystemProperty(named = "enableKubernetes", matches = "true")
+ public void testAlternatingKube() throws Throwable {
+ {
+ createServerPair(SERVER_NAME_WITH_KUBE_A, SERVER_NAME_WITH_KUBE_B,
+ "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A",
+ "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B",
+ null);
+
+ cleanupData(SERVER_NAME_WITH_KUBE_A);
+ cleanupData(SERVER_NAME_WITH_KUBE_B);
+ }
+
+ testAlternating(SERVER_NAME_WITH_KUBE_A, SERVER_NAME_WITH_KUBE_B, null, null);
+ }
+
@Test
public void testAlternatingZK() throws Throwable {
{
@@ -146,18 +169,18 @@ public void testAlternatingFile() throws Throwable {
Properties properties = new Properties();
- properties.put("acceptorConfigurations.artemis.extraParams.amqpCredits", "1000");
- properties.put("acceptorConfigurations.artemis.extraParams.amqpLowCredits", "300");
- properties.put("acceptorConfigurations.artemis.factoryClassName", "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory");
- properties.put("acceptorConfigurations.artemis.lockCoordinator", "failover");
- properties.put("acceptorConfigurations.artemis.name", "artemis");
- properties.put("acceptorConfigurations.artemis.params.scheme", "tcp");
- properties.put("acceptorConfigurations.artemis.params.tcpReceiveBufferSize", "1048576");
- properties.put("acceptorConfigurations.artemis.params.port", "61616");
- properties.put("acceptorConfigurations.artemis.params.host", "localhost");
- properties.put("acceptorConfigurations.artemis.params.protocols", "CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE");
- properties.put("acceptorConfigurations.artemis.params.useEpoll", "true");
- properties.put("acceptorConfigurations.artemis.params.tcpSendBufferSize", "1048576");
+ properties.put("acceptorConfigurations.forClients.extraParams.amqpCredits", "1000");
+ properties.put("acceptorConfigurations.forClients.extraParams.amqpLowCredits", "300");
+ properties.put("acceptorConfigurations.forClients.factoryClassName", "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory");
+ properties.put("acceptorConfigurations.forClients.lockCoordinator", "failover");
+ properties.put("acceptorConfigurations.forClients.name", "forClients");
+ properties.put("acceptorConfigurations.forClients.params.scheme", "tcp");
+ properties.put("acceptorConfigurations.forClients.params.tcpReceiveBufferSize", "1048576");
+ properties.put("acceptorConfigurations.forClients.params.port", "61616");
+ properties.put("acceptorConfigurations.forClients.params.host", "localhost");
+ properties.put("acceptorConfigurations.forClients.params.protocols", "CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE");
+ properties.put("acceptorConfigurations.forClients.params.useEpoll", "true");
+ properties.put("acceptorConfigurations.forClients.params.tcpSendBufferSize", "1048576");
properties.put("lockCoordinatorConfigurations.failover.checkPeriod", "5000");
properties.put("lockCoordinatorConfigurations.failover.className", "org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager");
@@ -185,15 +208,20 @@ public void testAlternating(String nameServerA, String nameServerB, File brokerP
processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
+ String uriManagementA = "tcp://localhost:61000";
+ String uriManagementB = "tcp://localhost:61001";
+
for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) {
logger.info("Iteration {}: Server {} active", i, (i % 2 == 0) ? "A" : "B");
if (i % 2 == 0) {
// Even iteration: Server A active, kill Server B
killServer(processB);
+ waitForLockStatus(uriManagementA, true);
} else {
// Odd iteration: Server B active, kill Server A
killServer(processA);
+ waitForLockStatus(uriManagementB, true);
}
// Send messages through the shared acceptor
@@ -205,14 +233,36 @@ public void testAlternating(String nameServerA, String nameServerB, File brokerP
// Restart the killed server
if (i % 2 == 0) {
processB = startServer(nameServerB, 0, -1, brokerPropertiesB);
+ waitForLockStatus(uriManagementA, true);
+ waitForLockStatus(uriManagementB, false);
} else {
processA = startServer(nameServerA, 0, -1, brokerPropertiesA);
+ waitForLockStatus(uriManagementA, false);
+ waitForLockStatus(uriManagementB, true);
}
}
// Verify they both have the expected message count (iterations × (sent - consumed))
- assertMessageCount("tcp://localhost:61000", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT);
- assertMessageCount("tcp://localhost:61001", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount(uriManagementA, "myQueue", EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount(uriManagementB, "myQueue", EXPECTED_FINAL_MESSAGE_COUNT);
+
+ int countActive = 0;
+
+ if (getLockedStatus(uriManagementA).getBoolean("locked")) {
+ logger.info("server 0 is locked");
+ countActive++;
+ } else {
+ logger.debug("server 0 is not locked");
+ }
+
+ if (getLockedStatus(uriManagementB).getBoolean("locked")) {
+ logger.info("server 1 is locked");
+ countActive++;
+ } else {
+ logger.info("server 1 is not locked");
+ }
+
+ assertEquals(1, countActive);
}
private static void sendMessages(ConnectionFactory cfX, int nmessages) throws JMSException {
@@ -258,15 +308,46 @@ private static void receiveMessages(ConnectionFactory cfX, int nmessages) throws
}
}
+ protected JsonObject getLockedStatus(String uri) throws Exception {
+ try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) {
+ return simpleManagement.listLockCoordinators().getJsonObject(0);
+ }
+ }
+
+ protected void waitForLockStatus(String uri, boolean expectedStatus) throws Exception {
+ try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) {
+ Wait.assertEquals(expectedStatus, () -> {
+ int retry = 0;
+
+ do {
+ try {
+ JsonArray lockList = simpleManagement.listLockCoordinators();
+ return lockList.getJsonObject(0).getBoolean("locked");
+ } catch (Exception e) {
+ logger.info(e.getMessage(), e);
+ }
+ Thread.sleep(500);
+ retry++;
+ }
+ while (retry < 10);
+
+ throw new RuntimeException("could not execute lockStatus check");
+
+ });
+ }
+ }
+
+
protected void assertMessageCount(String uri, String queueName, int count) throws Exception {
- SimpleManagement simpleManagement = new SimpleManagement(uri, null, null);
- Wait.assertEquals(count, () -> {
- try {
- return simpleManagement.getMessageCountOnQueue(queueName);
- } catch (Throwable e) {
- return -1;
- }
- });
+ try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) {
+ Wait.assertEquals(count, () -> {
+ try {
+ return simpleManagement.getMessageCountOnQueue(queueName);
+ } catch (Throwable e) {
+ return -1;
+ }
+ });
+ }
}
}
\ No newline at end of file
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
index 426f4e503d1..669e8956a26 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
@@ -31,6 +31,7 @@
import java.util.function.Supplier;
import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager;
@@ -39,9 +40,11 @@
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.artemis.lock.kube.KubeLockManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,13 +95,107 @@ public void testWithFile() throws Exception {
internalTest(i -> getFileCoordinators(i));
}
+
+ @Test
+ public void testWithKube() throws Exception {
+ internalTest(i -> getKubeCoordinators(i));
+ }
+
+ public void testPassiveLockOnCoordinator(List list) throws Exception {
+ assertEquals(2, list.size());
+ DistributedLockManager lockManager0 = list.get(0).getLockManager();
+ DistributedLockManager lockManager1 = list.get(1).getLockManager();
+ lockManager0.start();
+ lockManager1.start();
+
+ runAfter(lockManager0::stop);
+ runAfter(lockManager1::stop);
+
+ LockCoordinator lockCoordinator0 = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), 10_000, lockManager0, "lock-it", "lock-it");
+ lockCoordinator0.start();
+ runAfter(lockCoordinator0::stop);
+
+ Wait.assertTrue(lockCoordinator0::isLocked, 3600000, 100);
+
+ LockCoordinator lockCoordinator1 = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), 10_000, lockManager1, "lock-it", "lock-it");
+ lockCoordinator1.start();
+ runAfter(lockCoordinator1::stop);
+
+ assertFalse(lockCoordinator1.isLocked());
+
+
+ lockCoordinator0.stop();
+
+ Wait.assertTrue(lockCoordinator1::isLocked, 5000, 100);
+ }
+
+
+ public void testSimplePair(List list) throws Exception {
+ assertEquals(2, list.size());
+ DistributedLockManager lockManager0 = list.get(0).getLockManager();
+ DistributedLockManager lockManager1 = list.get(1).getLockManager();
+
+ lockManager0.start();
+ lockManager1.start();
+
+ try {
+ for (int i = 0; i < 100; i++) {
+ DistributedLock lock0 = lockManager0.getDistributedLock("lock");
+ assertTrue(lock0.tryLock());
+ assertTrue(lock0.isHeldByCaller());
+ DistributedLock lock1 = lockManager1.getDistributedLock("lock");
+ assertFalse(lock1.tryLock());
+ assertFalse(lock1.isHeldByCaller());
+
+ lock0.unlock();
+ assertTrue(lock1.tryLock());
+ assertTrue(lock1.isHeldByCaller());
+ lock1.unlock();
+ }
+ } finally {
+ lockManager0.stop();
+ lockManager1.stop();
+ }
+ }
+
+ private List getKubeCoordinators(int numberOfCoordinators) {
+ try {
+ String hostPortion = "host_" + RandomUtil.randomAlphaNumericString(10);
+ ArrayList locks = new ArrayList<>();
+ String lockName = "lock-test-" + RandomUtil.randomUUIDString();
+ for (int i = 0; i < numberOfCoordinators; i++) {
+ HashMap parameters = new HashMap<>();
+ parameters.put("hostname", hostPortion + "_host_" + i);
+ parameters.put("namespace", "default");
+ parameters.put("lease-timeout", "10");
+ DistributedLockManager lockManager = DistributedLockManager.newInstanceOf(KubeLockManager.class.getName(), parameters);
+
+ LockCoordinator lockCoordinator = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), KEEP_ALIVE_INTERVAL_MS, lockManager, lockName, lockName);
+ lockCoordinator.onLockAcquired(() -> lock(lockCoordinator));
+ lockCoordinator.onLockReleased(() -> unlock(lockCoordinator));
+ lockCoordinator.onLockReleased(() -> lockChanged.incrementAndGet());
+ lockCoordinator.onLockAcquired(() -> lockChanged.incrementAndGet());
+ lockCoordinator.setDebugInfo("ID" + i);
+ locks.add(lockCoordinator);
+ }
+ return locks;
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
@Test
public void testWithZK() throws Exception {
+ ZookeeperCluster zkCluster = startZK();
+ internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString()));
+ }
+
+ private ZookeeperCluster startZK() throws Exception {
ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, ZK_BASE_PORT, 100);
zkCluster.start();
runAfter(zkCluster::stop);
assertEquals(ZK_ENDPOINTS, zkCluster.getConnectString());
- internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString()));
+ return zkCluster;
}
private void internalTest(Function> lockCoordinatorSupplier) throws Exception {
@@ -107,6 +204,8 @@ private void internalTest(Function> lockCoordinat
testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0));
testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0));
testPriorityOrdering(lockCoordinatorSupplier.apply(1).get(0));
+ testSimplePair(lockCoordinatorSupplier.apply(2));
+ testPassiveLockOnCoordinator(getKubeCoordinators(2));
{
List list = lockCoordinatorSupplier.apply(2);