diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index a3084585bf4cb..17382bd3e5637 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -266,6 +266,12 @@ Map The user-specified labels that are set to the rest Service. The value should be in the form of a1:v1,a2:v2 + +
kubernetes.rest-service.port-name
+ "rest" + String + The name assigned to the rest port on the JobManager rest Service. Some environments enforce port-naming policies (for example, service meshes that route by port name); this option lets operators align Flink's rest port name with such policies. Must be a valid IANA service name (lowercase alphanumeric and '-', starting and ending alphanumeric, max 15 characters). Defaults to 'rest' to preserve backward compatibility. +
kubernetes.secrets
(none) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java index 1a50c4a564806..e99f2f2637eb0 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java @@ -304,9 +304,8 @@ private void updateKubernetesServiceTargetPortIfNecessary() throws Exception { Preconditions.checkArgument( restPort > 0, "Failed to parse rest port from " + webInterfaceUrl); final String restServiceName = ExternalServiceDecorator.getExternalServiceName(clusterId); - flinkKubeClient - .updateServiceTargetPort(restServiceName, Constants.REST_PORT_NAME, restPort) - .get(); + final String restPortName = flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME); + flinkKubeClient.updateServiceTargetPort(restServiceName, restPortName, restPort).get(); if (!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { final String internalServiceName = InternalServiceDecorator.getInternalServiceName(clusterId); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index bac5cced79901..45c745e3f9329 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -385,6 +385,21 @@ public class KubernetesConfigOptions { "The user-specified labels that are set to the rest Service. The value should be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption REST_SERVICE_PORT_NAME = + key("kubernetes.rest-service.port-name") + .stringType() + .defaultValue(Constants.REST_PORT_NAME) + .withDescription( + "The name assigned to the rest port on the JobManager rest Service. " + + "Some environments enforce port-naming policies (for example, " + + "service meshes that route by port name); this option lets operators " + + "align Flink's rest port name with such policies. " + + "Must be a valid IANA service name (lowercase alphanumeric and '-', " + + "starting and ending alphanumeric, max 15 characters). " + + "Defaults to '" + + Constants.REST_PORT_NAME + + "' to preserve backward compatibility."); + public static final ConfigOption> INTERNAL_SERVICE_ANNOTATIONS = key("kubernetes.internal-service.annotations") .mapType() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 2b7b4575ebc00..e1fe4323b4623 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -200,6 +200,10 @@ public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() { return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); } + public String getRestServicePortName() { + return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME); + } + public boolean isInternalServiceEnabled() { return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java index 0fcf585b16f16..f27b6115cdeb5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** An abstract class represents the service type that flink supported. */ public abstract class ServiceType { @@ -64,7 +63,7 @@ public Service buildUpExternalRestService( .getType()) .withSelector(kubernetesJobManagerParameters.getSelectors()) .addNewPort() - .withName(Constants.REST_PORT_NAME) + .withName(kubernetesJobManagerParameters.getRestServicePortName()) .withPort(kubernetesJobManagerParameters.getRestPort()) .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort()) .endPort() @@ -109,25 +108,22 @@ public abstract Optional getRestEndpoint( */ public abstract String getType(); - /** Get rest port from the external Service. */ + /** + * Get rest port from the external Service. The rest Service is built with a single port (whose + * name is configurable via {@link KubernetesConfigOptions#REST_SERVICE_PORT_NAME}), so the + * lookup does not depend on the port name. + */ public int getRestPortFromExternalService(Service externalService) { - final List servicePortCandidates = - externalService.getSpec().getPorts().stream() - .filter(x -> x.getName().equals(Constants.REST_PORT_NAME)) - .collect(Collectors.toList()); + final List ports = externalService.getSpec().getPorts(); - if (servicePortCandidates.isEmpty()) { + if (ports == null || ports.isEmpty()) { throw new RuntimeException( - "Failed to find port \"" - + Constants.REST_PORT_NAME - + "\" in Service \"" + "Failed to find any port in Service \"" + externalService.getMetadata().getName() + "\""); } - final ServicePort externalServicePort = servicePortCandidates.get(0); - - return getRestPort(externalServicePort); + return getRestPort(ports.get(0)); } // Helper method diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java index 0b3535d0b4279..1b1cfb0c4d2b7 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java @@ -131,4 +131,34 @@ void testSetServiceExposedTypeWithHeadless() throws IOException { assertThat(((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getClusterIP()) .isEqualTo(HeadlessClusterIPService.HEADLESS_CLUSTER_IP); } + + @Test + void testDefaultRestServicePortName() throws IOException { + final List resources = + this.externalServiceDecorator.buildAccompanyingKubernetesResources(); + final Service restService = (Service) resources.get(0); + + assertThat(restService.getSpec().getPorts()) + .singleElement() + .extracting(ServicePort::getName) + .isEqualTo(Constants.REST_PORT_NAME); + } + + @Test + void testCustomRestServicePortName() throws IOException { + final String customPortName = "flink-rest"; + this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_PORT_NAME, customPortName); + // Rebuild the decorator so it picks up the updated configuration. + this.externalServiceDecorator = + new ExternalServiceDecorator(this.kubernetesJobManagerParameters); + + final List resources = + this.externalServiceDecorator.buildAccompanyingKubernetesResources(); + final Service restService = (Service) resources.get(0); + + assertThat(restService.getSpec().getPorts()) + .singleElement() + .extracting(ServicePort::getName) + .isEqualTo(customPortName); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java index 506bd530ddb6a..ebe4f9b1a8096 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java @@ -21,9 +21,12 @@ import org.apache.flink.kubernetes.KubernetesClientTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link ServiceType}. */ class ServiceTypeTest extends KubernetesClientTestBase { @@ -40,4 +43,51 @@ void testServiceClassify() { assertThat(ServiceType.classify(buildExternalServiceWithLoadBalancer("", ""))) .isEqualByComparingTo(KubernetesConfigOptions.ServiceExposedType.LoadBalancer); } + + @Test + void testGetRestPortFromExternalServiceWithDefaultName() { + final int restPort = + ClusterIPService.INSTANCE.getRestPortFromExternalService( + buildExternalServiceWithClusterIP()); + assertThat(restPort).isEqualTo(REST_PORT); + } + + @Test + void testGetRestPortFromExternalServiceWithCustomName() { + final Service service = + new ServiceBuilder() + .withNewMetadata() + .withName("flink-cluster-rest") + .endMetadata() + .withNewSpec() + .withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()) + .addNewPort() + .withName("flink-rest") + .withPort(REST_PORT) + .withNewTargetPort(REST_PORT) + .endPort() + .endSpec() + .build(); + + final int restPort = ClusterIPService.INSTANCE.getRestPortFromExternalService(service); + assertThat(restPort).isEqualTo(REST_PORT); + } + + @Test + void testGetRestPortFromExternalServiceFailsWhenNoPorts() { + final Service service = + new ServiceBuilder() + .withNewMetadata() + .withName("flink-cluster-rest") + .endMetadata() + .withNewSpec() + .withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()) + .endSpec() + .build(); + + assertThatThrownBy(() -> ClusterIPService.INSTANCE.getRestPortFromExternalService(service)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to find any port") + .hasMessageContaining("flink-cluster-rest"); + } }