diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index 74e1751db993..deb7c284626a 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -69,6 +69,11 @@ *
Package-private for internal use. */ class ChannelPool extends ManagedChannel { + static final String CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING = + "Channel pool is repeatedly resizing. " + + "Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. " + + "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging " + + "and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior."; @VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName()); private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50); @@ -84,6 +89,16 @@ class ChannelPool extends ManagedChannel { private final AtomicInteger indexTicker = new AtomicInteger(); private final String authority; + // The number of consecutive resize cycles to wait before logging a warning about repeated + // resizing. This value was chosen to detect repeated requests for changes (multiple continuous + // increase or decrease attempts) without being too sensitive. + private static final int CONSECUTIVE_RESIZE_THRESHOLD = 5; + + // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand + // or shrink). Used to detect repeated resizing activity and log a warning. + // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. + private int consecutiveResizes = 0; + static ChannelPool create( ChannelPoolSettings settings, ChannelFactory channelFactory, @@ -275,7 +290,8 @@ private void resizeSafely() { *
Not threadsafe, must be called under the entryWriteLock monitor @@ -313,9 +329,25 @@ void resize() { int currentSize = localEntries.size(); int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) { - dampenedTarget = - currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta); + if (Math.abs(delta) > settings.getMaxResizeDelta()) { + dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); + } + + // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking + // that `dampenedTarget != currentSize` would cause false positives when the pool is within + // bounds but not at the target (target aims for the middle of the bounds) + boolean resized = (currentSize < minChannels || currentSize > maxChannels); + if (resized) { + consecutiveResizes++; + } else { + consecutiveResizes = 0; + } + + // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log + // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). + // However, aim to log once to ensure that this does not incur log spam. + if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } // Only resize the pool when thresholds are crossed diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index ebdc48cdd69e..31204ead57b6 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -59,7 +59,11 @@ public abstract class ChannelPoolSettings { static final Duration RESIZE_INTERVAL = Duration.ofMinutes(1); /** The maximum number of channels that can be added or removed at a time. */ - static final int MAX_RESIZE_DELTA = 2; + static final int DEFAULT_MAX_RESIZE_DELTA = 2; + + // Arbitrary limit to prevent unbounded growth and protect server/client resources. + // Capping at 25 ensures we don't scale too aggressively in a single cycle. + private static final int MAX_ALLOWED_RESIZE_DELTA = 25; /** * Threshold to start scaling down the channel pool. @@ -92,6 +96,22 @@ public abstract class ChannelPoolSettings { */ public abstract int getMaxChannelCount(); + /** + * The maximum number of channels that can be added or removed at a time. + * + *
This setting limits the rate at which the channel pool can grow or shrink in a single resize + * period. The default value is {@value #DEFAULT_MAX_RESIZE_DELTA}. Increasing this value can help + * the pool better handle sudden bursts or spikes in requests by allowing it to scale up faster. + * Regardless of this setting, the number of channels will never exceed {@link + * #getMaxChannelCount()}. + * + *
Note: This value cannot exceed {@value #MAX_ALLOWED_RESIZE_DELTA}. + * + *
Warning: Higher values for resize delta may still result in performance degradation + * during spikes due to rapid scaling. + */ + public abstract int getMaxResizeDelta(); + /** * The initial size of the channel pool. * @@ -116,11 +136,7 @@ boolean isStaticSize() { return true; } // When the scaling threshold are not set - if (getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE) { - return true; - } - - return false; + return getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE; } public abstract Builder toBuilder(); @@ -132,6 +148,9 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) + // Static pools don't resize so this value doesn't affect operation. However, + // validation still checks that resize delta doesn't exceed channel pool size. + .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) .build(); } @@ -142,7 +161,8 @@ public static Builder builder() { .setMaxChannelCount(200) .setMinRpcsPerChannel(0) .setMaxRpcsPerChannel(Integer.MAX_VALUE) - .setPreemptiveRefreshEnabled(false); + .setPreemptiveRefreshEnabled(false) + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA); } @AutoValue.Builder @@ -159,6 +179,15 @@ public abstract static class Builder { public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); + /** + * Sets the maximum number of channels that can be added or removed in a single resize cycle. + * This acts as a rate limiter to prevent wild fluctuations. + * + *
Warning: Higher values for resize delta may still result in performance degradation
+ * during spikes due to rapid scaling.
+ */
+ public abstract Builder setMaxResizeDelta(int count);
+
abstract ChannelPoolSettings autoBuild();
public ChannelPoolSettings build() {
@@ -178,6 +207,14 @@ public ChannelPoolSettings build() {
"initial channel count must be less than maxChannelCount");
Preconditions.checkState(
s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0");
+ Preconditions.checkState(
+ s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0");
+ Preconditions.checkState(
+ s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA,
+ "Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA);
+ Preconditions.checkState(
+ s.getMaxResizeDelta() <= s.getMaxChannelCount(),
+ "Max resize delta cannot be greater than max channel count");
return s;
}
}
diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
index 55a99c1481ec..b8c533ad3c82 100644
--- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
+++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
@@ -523,6 +523,53 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce
assertThat(pool.entries.get()).hasSize(2);
}
+ @Test
+ void customResizeDeltaIsRespected() throws Exception {
+ ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);
+ FixedExecutorProvider provider = FixedExecutorProvider.create(executor);
+
+ List