Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@
<td>Map</td>
<td>The user-specified labels that are set to the rest Service. The value should be in the form of a1:v1,a2:v2</td>
</tr>
<tr>
<td><h5>kubernetes.rest-service.port-name</h5></td>
<td style="word-wrap: break-word;">"rest"</td>
<td>String</td>
<td>The name assigned to the rest port on the JobManager rest Service. Some environments enforce port-naming policies (for example, service meshes that route by port name); this option lets operators align Flink's rest port name with such policies. Must be a valid IANA service name (lowercase alphanumeric and '-', starting and ending alphanumeric, max 15 characters). Defaults to 'rest' to preserve backward compatibility.</td>
</tr>
<tr>
<td><h5>kubernetes.secrets</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,8 @@ private void updateKubernetesServiceTargetPortIfNecessary() throws Exception {
Preconditions.checkArgument(
restPort > 0, "Failed to parse rest port from " + webInterfaceUrl);
final String restServiceName = ExternalServiceDecorator.getExternalServiceName(clusterId);
flinkKubeClient
.updateServiceTargetPort(restServiceName, Constants.REST_PORT_NAME, restPort)
.get();
final String restPortName = flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME);
flinkKubeClient.updateServiceTargetPort(restServiceName, restPortName, restPort).get();
if (!HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
final String internalServiceName =
InternalServiceDecorator.getInternalServiceName(clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,21 @@ public class KubernetesConfigOptions {
"The user-specified labels that are set to the rest Service. The value should be "
+ "in the form of a1:v1,a2:v2");

public static final ConfigOption<String> REST_SERVICE_PORT_NAME =
key("kubernetes.rest-service.port-name")
.stringType()
.defaultValue(Constants.REST_PORT_NAME)
.withDescription(
"The name assigned to the rest port on the JobManager rest Service. "
+ "Some environments enforce port-naming policies (for example, "
+ "service meshes that route by port name); this option lets operators "
+ "align Flink's rest port name with such policies. "
+ "Must be a valid IANA service name (lowercase alphanumeric and '-', "
+ "starting and ending alphanumeric, max 15 characters). "
+ "Defaults to '"
+ Constants.REST_PORT_NAME
+ "' to preserve backward compatibility.");

public static final ConfigOption<Map<String, String>> INTERNAL_SERVICE_ANNOTATIONS =
key("kubernetes.internal-service.annotations")
.mapType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ public KubernetesConfigOptions.ServiceExposedType getRestServiceExposedType() {
return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
}

public String getRestServicePortName() {
return flinkConfig.get(KubernetesConfigOptions.REST_SERVICE_PORT_NAME);
}

public boolean isInternalServiceEnabled() {
return !HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/** An abstract class represents the service type that flink supported. */
public abstract class ServiceType {
Expand Down Expand Up @@ -64,7 +63,7 @@ public Service buildUpExternalRestService(
.getType())
.withSelector(kubernetesJobManagerParameters.getSelectors())
.addNewPort()
.withName(Constants.REST_PORT_NAME)
.withName(kubernetesJobManagerParameters.getRestServicePortName())
.withPort(kubernetesJobManagerParameters.getRestPort())
.withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
.endPort()
Expand Down Expand Up @@ -109,25 +108,22 @@ public abstract Optional<Endpoint> getRestEndpoint(
*/
public abstract String getType();

/** Get rest port from the external Service. */
/**
* Get rest port from the external Service. The rest Service is built with a single port (whose
* name is configurable via {@link KubernetesConfigOptions#REST_SERVICE_PORT_NAME}), so the
* lookup does not depend on the port name.
*/
public int getRestPortFromExternalService(Service externalService) {
final List<ServicePort> servicePortCandidates =
externalService.getSpec().getPorts().stream()
.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
.collect(Collectors.toList());
final List<ServicePort> ports = externalService.getSpec().getPorts();

if (servicePortCandidates.isEmpty()) {
if (ports == null || ports.isEmpty()) {
throw new RuntimeException(
"Failed to find port \""
+ Constants.REST_PORT_NAME
+ "\" in Service \""
"Failed to find any port in Service \""
+ externalService.getMetadata().getName()
+ "\"");
}

final ServicePort externalServicePort = servicePortCandidates.get(0);

return getRestPort(externalServicePort);
return getRestPort(ports.get(0));
}

// Helper method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,34 @@ void testSetServiceExposedTypeWithHeadless() throws IOException {
assertThat(((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getClusterIP())
.isEqualTo(HeadlessClusterIPService.HEADLESS_CLUSTER_IP);
}

@Test
void testDefaultRestServicePortName() throws IOException {
final List<HasMetadata> resources =
this.externalServiceDecorator.buildAccompanyingKubernetesResources();
final Service restService = (Service) resources.get(0);

assertThat(restService.getSpec().getPorts())
.singleElement()
.extracting(ServicePort::getName)
.isEqualTo(Constants.REST_PORT_NAME);
}

@Test
void testCustomRestServicePortName() throws IOException {
final String customPortName = "flink-rest";
this.flinkConfig.set(KubernetesConfigOptions.REST_SERVICE_PORT_NAME, customPortName);
// Rebuild the decorator so it picks up the updated configuration.
this.externalServiceDecorator =
new ExternalServiceDecorator(this.kubernetesJobManagerParameters);

final List<HasMetadata> resources =
this.externalServiceDecorator.buildAccompanyingKubernetesResources();
final Service restService = (Service) resources.get(0);

assertThat(restService.getSpec().getPorts())
.singleElement()
.extracting(ServicePort::getName)
.isEqualTo(customPortName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.flink.kubernetes.KubernetesClientTestBase;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ServiceType}. */
class ServiceTypeTest extends KubernetesClientTestBase {
Expand All @@ -40,4 +43,51 @@ void testServiceClassify() {
assertThat(ServiceType.classify(buildExternalServiceWithLoadBalancer("", "")))
.isEqualByComparingTo(KubernetesConfigOptions.ServiceExposedType.LoadBalancer);
}

@Test
void testGetRestPortFromExternalServiceWithDefaultName() {
final int restPort =
ClusterIPService.INSTANCE.getRestPortFromExternalService(
buildExternalServiceWithClusterIP());
assertThat(restPort).isEqualTo(REST_PORT);
}

@Test
void testGetRestPortFromExternalServiceWithCustomName() {
final Service service =
new ServiceBuilder()
.withNewMetadata()
.withName("flink-cluster-rest")
.endMetadata()
.withNewSpec()
.withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name())
.addNewPort()
.withName("flink-rest")
.withPort(REST_PORT)
.withNewTargetPort(REST_PORT)
.endPort()
.endSpec()
.build();

final int restPort = ClusterIPService.INSTANCE.getRestPortFromExternalService(service);
assertThat(restPort).isEqualTo(REST_PORT);
}

@Test
void testGetRestPortFromExternalServiceFailsWhenNoPorts() {
final Service service =
new ServiceBuilder()
.withNewMetadata()
.withName("flink-cluster-rest")
.endMetadata()
.withNewSpec()
.withType(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name())
.endSpec()
.build();

assertThatThrownBy(() -> ClusterIPService.INSTANCE.getRestPortFromExternalService(service))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to find any port")
.hasMessageContaining("flink-cluster-rest");
}
}