Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 278 additions & 40 deletions packaging/src/kubernetes/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ spec:
configOverrides:
hive.server2.enable.doAs: "false"

llap:
llapClusters:
- name: llap0
enabled: true
replicas: 2
executors: 1
memoryMb: 1024
serviceHosts: "@llap0"
resources:
requestsMemory: "2Gi"
limitsMemory: "3Gi"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -121,40 +121,55 @@ spec:
cpuScaleDownThreshold: {{ .Values.cluster.hiveServer2.autoscaling.cpuScaleDownThreshold | default 30 }}
{{- end }}

llap:
enabled: {{ .Values.cluster.llap.enabled }}
{{- if .Values.cluster.llap.enabled }}
replicas: {{ .Values.cluster.llap.replicas }}
executors: {{ .Values.cluster.llap.executors }}
memoryMb: {{ .Values.cluster.llap.memoryMb }}
serviceHosts: {{ .Values.cluster.llap.serviceHosts | quote }}
{{- if .Values.cluster.llap.resources }}
{{- if .Values.cluster.llapClusters }}
llapClusters:
{{- range .Values.cluster.llapClusters }}
- name: {{ .name }}
enabled: {{ .enabled | default true }}
replicas: {{ .replicas | default 2 }}
executors: {{ .executors | default 1 }}
memoryMb: {{ .memoryMb | default 1024 }}
{{- if .resources }}
resources:
{{- toYaml .Values.cluster.llap.resources | nindent 6 }}
{{- toYaml .resources | nindent 6 }}
{{- end }}
{{- if .Values.cluster.llap.configOverrides }}
{{- if .configOverrides }}
configOverrides:
{{- toYaml .Values.cluster.llap.configOverrides | nindent 6 }}
{{- toYaml .configOverrides | nindent 6 }}
{{- end }}
{{- if .Values.cluster.llap.extraVolumes }}
{{- if .extraVolumes }}
extraVolumes:
{{- toYaml .Values.cluster.llap.extraVolumes | nindent 6 }}
{{- toYaml .extraVolumes | nindent 6 }}
{{- end }}
{{- if .Values.cluster.llap.extraVolumeMounts }}
{{- if .extraVolumeMounts }}
extraVolumeMounts:
{{- toYaml .Values.cluster.llap.extraVolumeMounts | nindent 6 }}
{{- toYaml .extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.llap.autoscaling .Values.cluster.llap.autoscaling.enabled }}
{{- if and .autoscaling .autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.llap.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.llap.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.llap.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.llap.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
minReplicas: {{ .autoscaling.minReplicas }}
scaleUpThreshold: {{ .autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- if .tezAm }}
tezAm:
replicas: {{ .tezAm.replicas | default 1 }}
{{- if and .tezAm.autoscaling .tezAm.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .tezAm.autoscaling.minReplicas }}
scaleUpStabilizationSeconds: {{ .tezAm.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .tezAm.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .tezAm.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .tezAm.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

tezAm:
enabled: {{ .Values.cluster.tezAm.enabled }}
Expand Down
8 changes: 5 additions & 3 deletions packaging/src/kubernetes/helm/hive-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ cluster:
cpuScaleDownThreshold: 30

# ---------------------------------------------------------------------------
# LLAP — enabled by default for full-HA
# LLAP CLUSTERS — each entry is an independent LLAP cluster with its own
# StatefulSet, autoscaling, and ZooKeeper registration.
# Users select a cluster via: SET hive.llap.daemon.service.hosts=@llap0;
# ---------------------------------------------------------------------------
llap:
llapClusters:
- name: llap0
enabled: true
replicas: 2
executors: 1
memoryMb: 1024
serviceHosts: "@llap0"
resources: {}
configOverrides: {}
extraVolumes: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hive.kubernetes.operator.model.HiveCluster;
import org.apache.hive.kubernetes.operator.model.HiveClusterSpec;
import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
import org.apache.hive.kubernetes.operator.model.spec.LlapSpec;
import org.apache.hive.kubernetes.operator.model.status.AutoscalingStatus;
import org.apache.hive.kubernetes.operator.util.ConfigUtils;
import org.apache.hive.kubernetes.operator.util.Labels;
Expand Down Expand Up @@ -203,32 +204,46 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient clie
spec.metastore().replicas(), patches, statuses, msMetrics);
}

// LLAP
if (spec.llap().isEnabled() && spec.llap().autoscaling().isEnabled()) {
AutoscalingSpec llapAuto = spec.llap().autoscaling();
Map<String, String> llapSelector = Labels.selectorForComponent(cluster, ConfigUtils.COMPONENT_LLAP);
// LLAP clusters (each evaluated independently)
for (LlapSpec llapSpec : spec.llapClusters()) {
if (!llapSpec.isEnabled() || !llapSpec.autoscaling().isEnabled()) {
continue;
}
String llapComponentKey = ConfigUtils.llapComponentKey(llapSpec.name());
AutoscalingSpec llapAuto = llapSpec.autoscaling();
Map<String, String> llapSelector = Labels.selectorForLlapCluster(cluster, llapSpec.name());
bgScraper.registerOrUpdate(namespace, clusterName,
ConfigUtils.COMPONENT_LLAP, llapSelector,
llapComponentKey, llapSelector,
llapAuto.metricsPort(), llapAuto.metricsScrapeIntervalSeconds());
String llapKey = namespace + "/" + clusterName + "/" + ConfigUtils.COMPONENT_LLAP;
String llapKey = cacheKey(namespace, clusterName, llapComponentKey);
List<PodMetrics> llapMetrics = metricsCache.getOrEmpty(llapKey, llapAuto.metricsScrapeIntervalSeconds() * 3);
evaluateComponent(cluster, client, namespace, clusterName,
ConfigUtils.COMPONENT_LLAP, llapAuto,
spec.llap().replicas(), patches, statuses, llapMetrics);
llapComponentKey, llapAuto,
llapSpec.replicas(), patches, statuses, llapMetrics);
}

// TezAM
if (spec.tezAm().isEnabled() && spec.tezAm().autoscaling().isEnabled()) {
AutoscalingSpec tezAuto = spec.tezAm().autoscaling();
Map<String, String> tezSelector = Labels.selectorForComponent(cluster, ConfigUtils.COMPONENT_TEZAM);
bgScraper.registerOrUpdate(namespace, clusterName,
ConfigUtils.COMPONENT_TEZAM, tezSelector,
tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
String tezKey = namespace + "/" + clusterName + "/" + ConfigUtils.COMPONENT_TEZAM;
List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey, tezAuto.metricsScrapeIntervalSeconds() * 3);
evaluateComponent(cluster, client, namespace, clusterName,
ConfigUtils.COMPONENT_TEZAM, tezAuto,
spec.tezAm().replicas(), patches, statuses, tezMetrics);
// Per-LLAP TezAM (one TezAM per LLAP cluster, each with its own autoscaling config)
if (spec.tezAm().isEnabled()) {
for (LlapSpec llapSpec : spec.llapClusters()) {
if (!llapSpec.isEnabled()) {
continue;
}
LlapSpec.LlapTezAmSpec perLlapTezAm = llapSpec.tezAm();
if (!perLlapTezAm.autoscaling().isEnabled()) {
continue;
}
AutoscalingSpec tezAuto = perLlapTezAm.autoscaling();
String tezAmComponentKey = ConfigUtils.tezAmComponentKey(llapSpec.name());
Map<String, String> tezSelector = Labels.selectorForTezAmCluster(cluster, llapSpec.name());
bgScraper.registerOrUpdate(namespace, clusterName,
tezAmComponentKey, tezSelector,
tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey);
List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey, tezAuto.metricsScrapeIntervalSeconds() * 3);
evaluateComponent(cluster, client, namespace, clusterName,
tezAmComponentKey, tezAuto,
perLlapTezAm.replicas(), patches, statuses, tezMetrics);
}
}

return new AutoscalingEvaluation(patches, statuses);
Expand Down Expand Up @@ -258,8 +273,8 @@ private void evaluateComponent(HiveCluster cluster, KubernetesClient client,

// For LLAP and TezAM, scaling decisions are based on HS2 metrics (activation gate),
// not their own pod metrics. Allow evaluation even with 0 own pods.
boolean usesHs2Activation = ConfigUtils.COMPONENT_LLAP.equals(component)
|| ConfigUtils.COMPONENT_TEZAM.equals(component);
boolean usesHs2Activation = component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
|| component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-");

if (metrics.isEmpty() && !usesHs2Activation) {
LOG.debug("[{}] No ready pods to scrape, skipping", component);
Expand Down Expand Up @@ -305,19 +320,39 @@ private void evaluateComponent(HiveCluster cluster, KubernetesClient client,
}

private ScalingStrategy createStrategy(String component, HiveCluster cluster) {
if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
String llapName = component.substring(ConfigUtils.COMPONENT_LLAP.length() + 1);
return new LlapScalingStrategy(this, cluster, llapName);
}
if (component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
String llapName = component.substring(ConfigUtils.COMPONENT_TEZAM.length() + 1);
return new TezAmScalingStrategy(this, cluster, llapName);
}
return switch (component) {
case ConfigUtils.COMPONENT_HIVESERVER2 -> new HiveServer2ScalingStrategy();
case ConfigUtils.COMPONENT_METASTORE -> new MetastoreScalingStrategy();
case ConfigUtils.COMPONENT_LLAP -> new LlapScalingStrategy(this, cluster);
case ConfigUtils.COMPONENT_TEZAM -> new TezAmScalingStrategy(this, cluster);
default -> throw new IllegalArgumentException("Unknown component: " + component);
};
}

private int getCurrentReplicas(KubernetesClient client, String namespace,
String clusterName, String component) {
String workloadName = clusterName + "-" + component;
if (ConfigUtils.COMPONENT_LLAP.equals(component) || ConfigUtils.COMPONENT_TEZAM.equals(component)) {
// Component key → workload name mapping:
// "llap-{name}" → "{cluster}-{name}"
// "tezam-{name}" → "{cluster}-tezam-{name}"
// other → "{cluster}-{component}"
String workloadName;
if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
String llapName = component.substring(ConfigUtils.COMPONENT_LLAP.length() + 1);
workloadName = clusterName + "-" + llapName;
} else if (component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
String llapName = component.substring(ConfigUtils.COMPONENT_TEZAM.length() + 1);
workloadName = clusterName + "-tezam-" + llapName;
} else {
workloadName = clusterName + "-" + component;
}
if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
|| component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
var ss = client.apps().statefulSets()
.inNamespace(namespace).withName(workloadName).get();
return ss != null && ss.getSpec().getReplicas() != null ? ss.getSpec().getReplicas() : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ public class LlapScalingStrategy implements ScalingStrategy {
static final String METRIC_QUEUED = "hadoop_llapdaemon_executornumqueuedrequests";
static final String METRIC_CONFIGURED = "hadoop_llapdaemon_executornumexecutorsconfigured";
static final String METRIC_AVAILABLE = "hadoop_llapdaemon_executornumexecutorsavailable";
static final String METRIC_LLAP_TARGET_PREFIX = "hs2_llap_target_sessions_";

private final HiveClusterAutoscaler orchestrator;
private final HiveCluster cluster;
private final String llapName;
private int lastMetric;

public LlapScalingStrategy(HiveClusterAutoscaler orchestrator, HiveCluster cluster) {
public LlapScalingStrategy(HiveClusterAutoscaler orchestrator, HiveCluster cluster, String llapName) {
this.orchestrator = orchestrator;
this.cluster = cluster;
this.llapName = llapName;
}

@Override
Expand Down Expand Up @@ -117,14 +120,49 @@ public int lastMetricValue() {
}

/**
* Detect HS2 open sessions.
* Detect HS2 sessions targeting this specific LLAP cluster.
* First tries the per-target metric (hs2_llap_target_sessions_{llapName}) which is
* available when HS2 has the per-LLAP-cluster session tracking patch.
* Falls back to the generic hs2_open_sessions for backward compatibility only if
* HS2 does NOT expose any per-target metrics at all (i.e. older HS2 image).
*
* @return true if sessions > 0, false if scraped and all 0, null if scrape returned no pods
* (ambiguous — could be transient failure or HS2 genuinely absent)
*/
private Boolean detectHs2Sessions(List<PodMetrics> hs2Metrics) {
if (hs2Metrics.isEmpty()) {
return null;
}

// Check if HS2 supports per-target metrics (any metric with the prefix exists).
// If it does, use only the per-target metric for this cluster — a missing metric
// means 0 sessions targeting this cluster (the gauge is registered lazily on first connect).
String targetMetric = METRIC_LLAP_TARGET_PREFIX + llapName;
boolean hs2SupportsTargetMetrics = false;
for (PodMetrics pm : hs2Metrics) {
for (String key : pm.metrics().keySet()) {
if (key.startsWith(METRIC_LLAP_TARGET_PREFIX)) {
hs2SupportsTargetMetrics = true;
break;
}
}
if (hs2SupportsTargetMetrics) {
break;
}
}

if (hs2SupportsTargetMetrics) {
// HS2 has per-target tracking: check only our specific metric
for (PodMetrics pm : hs2Metrics) {
Double val = pm.metrics().get(targetMetric);
if (val != null && val > 0) {
return true;
}
}
return false;
}

// Fallback: generic hs2_open_sessions (older HS2 without per-target metrics)
for (PodMetrics pm : hs2Metrics) {
double sessions = pm.metrics().getOrDefault(
HiveServer2ScalingStrategy.METRIC_OPEN_SESSIONS, 0.0);
Expand Down
Loading
Loading