From 43d6396e560e30b87631d1cf81c66579d342a602 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Mon, 27 Apr 2026 02:02:05 -0700 Subject: [PATCH 1/2] [FLINK-37893][k8s] Make REST service port name configurable The Kubernetes REST service exposed by the JobManager hardcodes its port name to "rest" (Constants.REST_PORT_NAME). Operators in environments that enforce port-naming policies (e.g., service meshes that route by port name, or organizations with strict port-name conventions) cannot align Flink with those policies. This change adds a new config option kubernetes.rest-service.port-name (string, default "rest") that controls the port name on the REST Service. The default preserves existing behavior, making this a purely additive change. ServiceType#getRestPortFromExternalService no longer filters ports by name. Flink always builds the rest Service with a single port, so the lookup can simply return that port; this lets the reader work for any configured port name without plumbing the name through the abstract getRestEndpoint signature on ServiceType. KubernetesResourceManagerDriver#updateKubernetesServiceTargetPortIfNecessary reads the configured port name from flinkConfig when updating the rest service's target port in host-network mode. --- .../KubernetesResourceManagerDriver.java | 5 +- .../KubernetesConfigOptions.java | 15 ++++++ .../KubernetesJobManagerParameters.java | 4 ++ .../kubeclient/services/ServiceType.java | 24 ++++----- .../ExternalServiceDecoratorTest.java | 30 +++++++++++ .../kubeclient/services/ServiceTypeTest.java | 50 +++++++++++++++++++ 6 files changed, 111 insertions(+), 17 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java index 1a50c4a564806..e99f2f2637eb0 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java @@ -304,9 +304,8 @@ private void updateKubernetesServiceTargetPortIfNecessary() throws Exception { Preconditions.checkArgument( restPort > 0, "Failed to parse rest port from " + webInterfaceUrl); final String restServiceName = ExternalServiceDecorator.getExternalServiceName(clusterId); - flinkKubeClient - .updateServiceTargetPort(restServiceName, Constants.REST_PORT_NAME, restPort) - .get(); + final String restPortName = flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME); + flinkKubeClient.updateServiceTargetPort(restServiceName, restPortName, restPort).get(); if (!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { final String internalServiceName = InternalServiceDecorator.getInternalServiceName(clusterId); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index bac5cced79901..45c745e3f9329 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -385,6 +385,21 @@ public class KubernetesConfigOptions { "The user-specified labels that are set to the rest Service. The value should be " + "in the form of a1:v1,a2:v2"); + public static final ConfigOption REST_SERVICE_PORT_NAME = + key("kubernetes.rest-service.port-name") + .stringType() + .defaultValue(Constants.REST_PORT_NAME) + .withDescription( + "The name assigned to the rest port on the JobManager rest Service. " + + "Some environments enforce port-naming policies (for example, " + + "service meshes that route by port name); this option lets operators " + + "align Flink's rest port name with such policies. " + + "Must be a valid IANA service name (lowercase alphanumeric and '-', " + + "starting and ending alphanumeric, max 15 characters). " + + "Defaults to '" + + Constants.REST_PORT_NAME + + "' to preserve backward compatibility."); + public static final ConfigOption> INTERNAL_SERVICE_ANNOTATIONS = key("kubernetes.internal-service.annotations") .mapType() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 2b7b4575ebc00..e1fe4323b4623 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -200,6 +200,10 @@ public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() { return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); } + public String getRestServicePortName() { + return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME); + } + public boolean isInternalServiceEnabled() { return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java index 0fcf585b16f16..f27b6115cdeb5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** An abstract class represents the service type that flink supported. */ public abstract class ServiceType { @@ -64,7 +63,7 @@ public Service buildUpExternalRestService( .getType()) .withSelector(kubernetesJobManagerParameters.getSelectors()) .addNewPort() - .withName(Constants.REST_PORT_NAME) + .withName(kubernetesJobManagerParameters.getRestServicePortName()) .withPort(kubernetesJobManagerParameters.getRestPort()) .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort()) .endPort() @@ -109,25 +108,22 @@ public abstract Optional getRestEndpoint( */ public abstract String getType(); - /** Get rest port from the external Service. */ + /** + * Get rest port from the external Service. The rest Service is built with a single port (whose + * name is configurable via {@link KubernetesConfigOptions#REST_SERVICE_PORT_NAME}), so the + * lookup does not depend on the port name. + */ public int getRestPortFromExternalService(Service externalService) { - final List servicePortCandidates = - externalService.getSpec().getPorts().stream() - .filter(x -> x.getName().equals(Constants.REST_PORT_NAME)) - .collect(Collectors.toList()); + final List ports = externalService.getSpec().getPorts(); - if (servicePortCandidates.isEmpty()) { + if (ports == null || ports.isEmpty()) { throw new RuntimeException( - "Failed to find port \"" - + Constants.REST_PORT_NAME - + "\" in Service \"" + "Failed to find any port in Service \"" + externalService.getMetadata().getName() + "\""); } - final ServicePort externalServicePort = servicePortCandidates.get(0); - - return getRestPort(externalServicePort); + return getRestPort(ports.get(0)); } // Helper method diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java index 0b3535d0b4279..1b1cfb0c4d2b7 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java @@ -131,4 +131,34 @@ void testSetServiceExposedTypeWithHeadless() throws IOException { assertThat(((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getClusterIP()) .isEqualTo(HeadlessClusterIPService.HEADLESS_CLUSTER_IP); } + + @Test + void testDefaultRestServicePortName() throws IOException { + final List resources = + this.externalServiceDecorator.buildAccompanyingKubernetesResources(); + final Service restService = (Service) resources.get(0); + + assertThat(restService.getSpec().getPorts()) + .singleElement() + .extracting(ServicePort::getName) + .isEqualTo(Constants.REST_PORT_NAME); + } + + @Test + void testCustomRestServicePortName() throws IOException { + final String customPortName = "flink-rest"; + this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_PORT_NAME, customPortName); + // Rebuild the decorator so it picks up the updated configuration. + this.externalServiceDecorator = + new ExternalServiceDecorator(this.kubernetesJobManagerParameters); + + final List resources = + this.externalServiceDecorator.buildAccompanyingKubernetesResources(); + final Service restService = (Service) resources.get(0); + + assertThat(restService.getSpec().getPorts()) + .singleElement() + .extracting(ServicePort::getName) + .isEqualTo(customPortName); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java index 506bd530ddb6a..ebe4f9b1a8096 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java @@ -21,9 +21,12 @@ import org.apache.flink.kubernetes.KubernetesClientTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link ServiceType}. */ class ServiceTypeTest extends KubernetesClientTestBase { @@ -40,4 +43,51 @@ void testServiceClassify() { assertThat(ServiceType.classify(buildExternalServiceWithLoadBalancer("", ""))) .isEqualByComparingTo(KubernetesConfigOptions.ServiceExposedType.LoadBalancer); } + + @Test + void testGetRestPortFromExternalServiceWithDefaultName() { + final int restPort = + ClusterIPService.INSTANCE.getRestPortFromExternalService( + buildExternalServiceWithClusterIP()); + assertThat(restPort).isEqualTo(REST_PORT); + } + + @Test + void testGetRestPortFromExternalServiceWithCustomName() { + final Service service = + new ServiceBuilder() + .withNewMetadata() + .withName("flink-cluster-rest") + .endMetadata() + .withNewSpec() + .withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()) + .addNewPort() + .withName("flink-rest") + .withPort(REST_PORT) + .withNewTargetPort(REST_PORT) + .endPort() + .endSpec() + .build(); + + final int restPort = ClusterIPService.INSTANCE.getRestPortFromExternalService(service); + assertThat(restPort).isEqualTo(REST_PORT); + } + + @Test + void testGetRestPortFromExternalServiceFailsWhenNoPorts() { + final Service service = + new ServiceBuilder() + .withNewMetadata() + .withName("flink-cluster-rest") + .endMetadata() + .withNewSpec() + .withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()) + .endSpec() + .build(); + + assertThatThrownBy(() -> ClusterIPService.INSTANCE.getRestPortFromExternalService(service)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to find any port") + .hasMessageContaining("flink-cluster-rest"); + } } From 35300c84ee9913c1b38d8356635bc986fe0fb653 Mon Sep 17 00:00:00 2001 From: Stefan Wang <1fannnw@gmail.com> Date: Wed, 29 Apr 2026 11:46:01 -0700 Subject: [PATCH 2/2] [FLINK-37893][docs] Regenerate kubernetes config table for REST_SERVICE_PORT_NAME The new kubernetes.rest-service.port-name option was missing from the generated config docs, which made ConfigOptionsDocsCompletenessITCase fail in the flink-docs surefire integration-tests step. --- .../generated/kubernetes_config_configuration.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index a3084585bf4cb..17382bd3e5637 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -266,6 +266,12 @@ Map The user-specified labels that are set to the rest Service. The value should be in the form of a1:v1,a2:v2 + +
kubernetes.rest-service.port-name
+ "rest" + String + The name assigned to the rest port on the JobManager rest Service. Some environments enforce port-naming policies (for example, service meshes that route by port name); this option lets operators align Flink's rest port name with such policies. Must be a valid IANA service name (lowercase alphanumeric and '-', starting and ending alphanumeric, max 15 characters). Defaults to 'rest' to preserve backward compatibility. +
kubernetes.secrets
(none)