Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 48 additions & 32 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.internal.MetricReportUtils;
import io.grpc.xds.internal.MetricReportUtils.ParsedMetricName;
import io.grpc.xds.orca.OrcaOobUtil;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaPerRequestUtil;
Expand All @@ -50,7 +51,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -239,7 +239,7 @@ protected void updateOverallBalancingState() {
private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence,
config.metricNamesForComputingUtilization);
config.parsedMetricNamesForComputingUtilization);
updateWeight(picker);
return picker;
}
Expand Down Expand Up @@ -329,15 +329,15 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) {
}

public OrcaReportListener getOrCreateOrcaListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
if (orcaReportListener != null
&& orcaReportListener.errorUtilizationPenalty == errorUtilizationPenalty
&& orcaReportListener.metricNamesForComputingUtilization
.equals(metricNamesForComputingUtilization)) {
&& orcaReportListener.parsedMetricNamesForComputingUtilization
.equals(parsedMetricNamesForComputingUtilization)) {
return orcaReportListener;
}
orcaReportListener =
new OrcaReportListener(errorUtilizationPenalty, metricNamesForComputingUtilization);
new OrcaReportListener(errorUtilizationPenalty, parsedMetricNamesForComputingUtilization);
return orcaReportListener;
}

Expand All @@ -362,17 +362,17 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne

final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener {
private final float errorUtilizationPenalty;
private final ImmutableList<String> metricNamesForComputingUtilization;
private final ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization;

OrcaReportListener(float errorUtilizationPenalty,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;
this.parsedMetricNamesForComputingUtilization = parsedMetricNamesForComputingUtilization;
}

@Override
public void onLoadReport(MetricReport report) {
double utilization = getUtilization(report, metricNamesForComputingUtilization);
double utilization = getUtilization(report);

double newWeight = 0;
if (utilization > 0 && report.getQps() > 0) {
Expand All @@ -398,10 +398,10 @@ public void onLoadReport(MetricReport report) {
* if application utilization is > 0, it is returned. If neither are present, the CPU
* utilization is returned.
*/
private double getUtilization(MetricReport report, ImmutableList<String> metricNames) {
OptionalDouble customUtil = getCustomMetricUtilization(report, metricNames);
if (customUtil.isPresent()) {
return customUtil.getAsDouble();
private double getUtilization(MetricReport report) {
double customUtil = getCustomMetricUtilization(report);
if (customUtil >= 0) {
return customUtil;
}
double appUtil = report.getApplicationUtilization();
if (appUtil > 0) {
Expand All @@ -412,20 +412,23 @@ private double getUtilization(MetricReport report, ImmutableList<String> metricN

/**
* Returns the maximum utilization value among the specified metric names.
* Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report,
* Returns -1 if NONE of the specified metrics are present in the report,
* or if all present metrics are NaN.
* Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present.
*/
private OptionalDouble getCustomMetricUtilization(MetricReport report,
ImmutableList<String> metricNames) {
return metricNames.stream()
.map(name -> MetricReportUtils.getMetric(report, name))
.filter(OptionalDouble::isPresent)
.mapToDouble(OptionalDouble::getAsDouble)
.filter(d -> !Double.isNaN(d) && d > 0)
.max();
private double getCustomMetricUtilization(MetricReport report) {
double max = -1.0;
for (int i = 0; i < parsedMetricNamesForComputingUtilization.size(); i++) {
double d = MetricReportUtils.getMetricValue(report,
parsedMetricNamesForComputingUtilization.get(i));
if (!Double.isNaN(d) && d > 0 && d > max) {
max = d;
}
}
return max;
}

}

}

private final class UpdateWeightTask implements Runnable {
Expand All @@ -446,7 +449,7 @@ private void createAndApplyOrcaListeners() {
if (config.enableOobLoadReport) {
OrcaOobUtil.setListener(weightedSubchannel,
wChild.getOrCreateOrcaListener(config.errorUtilizationPenalty,
config.metricNamesForComputingUtilization),
config.parsedMetricNamesForComputingUtilization),
OrcaOobUtil.OrcaReportingConfig.newBuilder()
.setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS).build());
} else {
Expand Down Expand Up @@ -516,7 +519,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {

WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
float errorUtilizationPenalty, AtomicInteger sequence,
ImmutableList<String> metricNamesForComputingUtilization) {
ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization) {
checkNotNull(children, "children");
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
this.children = children;
Expand All @@ -526,7 +529,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
WeightedChildLbState wChild = (WeightedChildLbState) child;
pickers.add(wChild.getCurrentPicker());
reportListeners.add(wChild.getOrCreateOrcaListener(errorUtilizationPenalty,
metricNamesForComputingUtilization));
parsedMetricNamesForComputingUtilization));
}
this.pickers = pickers;
this.reportListeners = reportListeners;
Expand Down Expand Up @@ -767,7 +770,7 @@ static final class WeightedRoundRobinLoadBalancerConfig {
final long oobReportingPeriodNanos;
final long weightUpdatePeriodNanos;
final float errorUtilizationPenalty;
final ImmutableList<String> metricNamesForComputingUtilization;
final ImmutableList<ParsedMetricName> parsedMetricNamesForComputingUtilization;

public static Builder newBuilder() {
return new Builder();
Expand All @@ -783,7 +786,20 @@ private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos,
this.oobReportingPeriodNanos = oobReportingPeriodNanos;
this.weightUpdatePeriodNanos = weightUpdatePeriodNanos;
this.errorUtilizationPenalty = errorUtilizationPenalty;
this.metricNamesForComputingUtilization = metricNamesForComputingUtilization;

ImmutableList.Builder<ParsedMetricName> builder = ImmutableList.builder();
if (metricNamesForComputingUtilization != null) {
for (int i = 0; i < metricNamesForComputingUtilization.size(); i++) {
String metricName = metricNamesForComputingUtilization.get(i);
ParsedMetricName parsed = MetricReportUtils.ParsedMetricName.parse(metricName);
if (parsed.getMetricType() != MetricReportUtils.MetricType.INVALID) {
builder.add(parsed);
} else {
log.log(Level.FINE, "Invalid custom metric name configured and ignored: " + metricName);
}
}
}
this.parsedMetricNamesForComputingUtilization = builder.build();
}

@Override
Expand All @@ -799,15 +815,15 @@ public boolean equals(Object o) {
&& this.weightUpdatePeriodNanos == that.weightUpdatePeriodNanos
// Float.compare considers NaNs equal
&& Float.compare(this.errorUtilizationPenalty, that.errorUtilizationPenalty) == 0
&& Objects.equals(this.metricNamesForComputingUtilization,
that.metricNamesForComputingUtilization);
&& Objects.equals(this.parsedMetricNamesForComputingUtilization,
that.parsedMetricNamesForComputingUtilization);
}

@Override
public int hashCode() {
return Objects.hash(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport,
oobReportingPeriodNanos, weightUpdatePeriodNanos, errorUtilizationPenalty,
metricNamesForComputingUtilization);
parsedMetricNamesForComputingUtilization);
}

static final class Builder {
Expand Down
118 changes: 85 additions & 33 deletions xds/src/main/java/io/grpc/xds/internal/MetricReportUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package io.grpc.xds.internal;

import com.google.auto.value.AutoValue;
import io.grpc.services.MetricReport;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Optional;


/**
* Utilities for parsing and resolving metrics from {@link MetricReport}.
Expand All @@ -27,41 +28,92 @@ public final class MetricReportUtils {

private MetricReportUtils() {}

public enum MetricType {
CPU_UTILIZATION,
APPLICATION_UTILIZATION,
MEMORY_UTILIZATION,
UTILIZATION,
NAMED_METRICS,
INVALID
}

@AutoValue
public abstract static class ParsedMetricName {
public abstract MetricType getMetricType();

public abstract Optional<String> getKey();

public static ParsedMetricName create(MetricType metricType, Optional<String> key) {
return new AutoValue_MetricReportUtils_ParsedMetricName(metricType, key);
}

/**
* Pre-parses a custom metric name into a {@link ParsedMetricName}.
*
* @param name The custom metric name to parse.
* @return The parsed metric name.
*/
public static ParsedMetricName parse(String name) {
Comment thread
sauravzg marked this conversation as resolved.
if (name.equals("cpu_utilization")) {
return create(MetricType.CPU_UTILIZATION, Optional.empty());
}
if (name.equals("application_utilization")) {
return create(MetricType.APPLICATION_UTILIZATION, Optional.empty());
}
if (name.equals("mem_utilization")) {
return create(MetricType.MEMORY_UTILIZATION, Optional.empty());
}
if (name.startsWith("utilization.")) {
return create(MetricType.UTILIZATION, Optional.of(name.substring("utilization.".length())));
}
if (name.startsWith("named_metrics.")) {
return create(MetricType.NAMED_METRICS,
Optional.of(name.substring("named_metrics.".length())));
}
return create(MetricType.INVALID, Optional.empty());
}

}

/**
* Resolves a metric value from the report based on the given metric name.
* The logic checks for specific prefixes to determine where to look up the metric:
* <ul>
* <li>"cpu_utilization" -> getCpuUtilization()</li>
* <li>"application_utilization" -> getApplicationUtilization()</li>
* <li>"mem_utilization" -> getMemoryUtilization()</li>
* <li>"utilization." -> lookup in utilizationMetrics</li>
* <li>"named_metrics." -> lookup in namedMetrics</li>
* </ul>
* Resolves a custom metric value for `parsedMetric`
* Returns -1.0 if the metric is absent or invalid.
*
* @param report The metric report to query.
* @param metricName The name of the custom metric to look up.
* @return The value of the metric if found, or empty if not found.
* @param parsedMetric The parsed metric to lookup.
* @return The metric value or -1.0 if absent.
*/
public static OptionalDouble getMetric(MetricReport report, String metricName) {
if (metricName.equals("cpu_utilization")) {
return OptionalDouble.of(report.getCpuUtilization());
} else if (metricName.equals("application_utilization")) {
return OptionalDouble.of(report.getApplicationUtilization());
} else if (metricName.equals("mem_utilization")) {
return OptionalDouble.of(report.getMemoryUtilization());
} else if (metricName.startsWith("utilization.")) {
Map<String, Double> map = report.getUtilizationMetrics();
Double val = map.get(metricName.substring("utilization.".length()));
if (val != null) {
return OptionalDouble.of(val);
}
} else if (metricName.startsWith("named_metrics.")) {
Map<String, Double> map = report.getNamedMetrics();
Double val = map.get(metricName.substring("named_metrics.".length()));
if (val != null) {
return OptionalDouble.of(val);
}

public static double getMetricValue(MetricReport report, ParsedMetricName parsedMetric) {
switch (parsedMetric.getMetricType()) {
case CPU_UTILIZATION:
return report.getCpuUtilization();
case APPLICATION_UTILIZATION:
return report.getApplicationUtilization();
case MEMORY_UTILIZATION:
return report.getMemoryUtilization();
case UTILIZATION:
if (parsedMetric.getKey().isPresent()) {
String key = parsedMetric.getKey().get();
Double val = report.getUtilizationMetrics().get(key);
if (val != null) {
return val;
}
}
return -1.0;
case NAMED_METRICS:
if (parsedMetric.getKey().isPresent()) {
String key = parsedMetric.getKey().get();
Double val = report.getNamedMetrics().get(key);
if (val != null) {
return val;
}
}
return -1.0;
case INVALID:
return -1.0;
default:
return -1.0;
}
return OptionalDouble.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser;
import io.grpc.xds.WeightedRoundRobinLoadBalancer.WeightedRoundRobinLoadBalancerConfig;
import io.grpc.xds.internal.MetricReportUtils.ParsedMetricName;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
Expand Down Expand Up @@ -112,16 +113,19 @@ public void parseLoadBalancingConfigDefaultValues() throws IOException {
}

@Test
public void parseLoadBalancingConfigCustomMetrics() throws IOException {
public void parseLoadBalancingConfigCustomMetricsIgnoresInvalid() throws IOException {
System.setProperty("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS", "true");
try {
String lbConfig = "{\"metricNamesForComputingUtilization\" : [\"foo\", \"bar\"]}";
String lbConfig =
"{\"metricNamesForComputingUtilization\" : "
+ "[\"utilization.foo\", \"invalid_name\", \"named_metrics.bar\"]}";
ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(
parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
WeightedRoundRobinLoadBalancerConfig config =
(WeightedRoundRobinLoadBalancerConfig) configOrError.getConfig();
assertThat(config.metricNamesForComputingUtilization).containsExactly("foo", "bar");
assertThat(config.parsedMetricNamesForComputingUtilization).containsExactly(
ParsedMetricName.parse("utilization.foo"), ParsedMetricName.parse("named_metrics.bar"));
} finally {
System.clearProperty("GRPC_EXPERIMENTAL_WRR_CUSTOM_METRICS");
}
Expand Down
Loading
Loading