diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index deb7c284626a..1591aca9e742 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel { // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand // or shrink). Used to detect repeated resizing activity and log a warning. - // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. - private int consecutiveResizes = 0; + private final AtomicInteger consecutiveResizes = new AtomicInteger(0); static ChannelPool create( ChannelPoolSettings settings, @@ -310,6 +309,10 @@ void resize() { if (minChannels < settings.getMinChannelCount()) { minChannels = settings.getMinChannelCount(); } + // Limit in case the calculated min channel count exceeds the configured max channel count + if (minChannels > settings.getMaxChannelCount()) { + minChannels = settings.getMaxChannelCount(); + } // Number of channels if each channel operated at minimum capacity // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem. @@ -319,34 +322,49 @@ void resize() { if (maxChannels > settings.getMaxChannelCount()) { maxChannels = settings.getMaxChannelCount(); } - if (maxChannels < minChannels) { - maxChannels = minChannels; + // Limit in case the calculated max channel count falls below the configured min channel count + if (maxChannels < settings.getMinChannelCount()) { + maxChannels = settings.getMinChannelCount(); } - // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of - // change. + // If the pool were to be resized, try to aim for the middle of the bound, + // but limit rate of change. int tentativeTarget = (maxChannels + minChannels) / 2; int currentSize = localEntries.size(); + // Calculate the desired change in pool size. int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > settings.getMaxResizeDelta()) { - dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); - } + // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. + // Ensure that the step size is capped by the max channel count to handle small pool + // configurations. + int effectiveMaxResizeDelta = + Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount()); + if (Math.abs(delta) > effectiveMaxResizeDelta) { + // Limit the change to effectiveMaxResizeDelta, maintaining the correct direction (positive or + // negative). + dampenedTarget = currentSize + (int) Math.copySign(effectiveMaxResizeDelta, delta); + } + + // Ensure that the calculated dampedTarget value will never exceed the maxChannelCount or fall + // below minChannelCount + dampenedTarget = + Math.max( + settings.getMinChannelCount(), Math.min(settings.getMaxChannelCount(), dampenedTarget)); // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking // that `dampenedTarget != currentSize` would cause false positives when the pool is within // bounds but not at the target (target aims for the middle of the bounds) boolean resized = (currentSize < minChannels || currentSize > maxChannels); if (resized) { - consecutiveResizes++; + consecutiveResizes.incrementAndGet(); } else { - consecutiveResizes = 0; + consecutiveResizes.set(0); } // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). // However, aim to log once to ensure that this does not incur log spam. - if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) { LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index 31204ead57b6..a17b28bd061c 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) - // Static pools don't resize so this value doesn't affect operation. However, - // validation still checks that resize delta doesn't exceed channel pool size. - .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) + // Static pools don't resize so this value doesn't affect operation. + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA) .build(); } @@ -167,21 +166,57 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + /** + * Sets the minimum desired number of concurrent RPCs per channel. + * + *
This ensures channels are adequately utilized. If the average load per channel falls below + * this value, the pool attempts to shrink. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMinRpcsPerChannel(int count); + /** + * Sets the maximum desired number of concurrent RPCs per channel. + * + *
This ensures channels do not become overloaded. If the average load per channel exceeds + * this value, the pool attempts to expand. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMaxRpcsPerChannel(int count); + /** + * Sets the minimum number of channels the pool can shrink to. + * + *
When resizing, if the calculated resize bounds fall below this minimum configuration, the + * bounds will be clamped to this value. This ensures the pool never shrinks below this absolute + * minimum, even under very low load. + */ public abstract Builder setMinChannelCount(int count); + /** + * Sets the maximum number of channels the pool can expand to. + * + *
When resizing, if the calculated resize bounds exceed this maximum configuration, the + * bounds will be clamped to this value. This ensures the pool never expands above this absolute + * maximum, even under very high load. + */ public abstract Builder setMaxChannelCount(int count); + /** Sets the initial number of channels in the pool. */ public abstract Builder setInitialChannelCount(int count); + /** + * Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle. + */ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); /** * Sets the maximum number of channels that can be added or removed in a single resize cycle. - * This acts as a rate limiter to prevent wild fluctuations. + * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically + * according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is + * effectively capped by the bound configured via {@link #setMaxChannelCount}. * *
Warning: Higher values for resize delta may still result in performance degradation
* during spikes due to rapid scaling.
@@ -198,7 +233,7 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMinChannelCount() > 0, "Minimum channel count must be at least 1");
Preconditions.checkState(
- s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid");
+ s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid");
Preconditions.checkState(
s.getMinChannelCount() <= s.getInitialChannelCount(),
"initial channel count be at least minChannelCount");
diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
index b8c533ad3c82..c7cad57be186 100644
--- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
+++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java
@@ -144,6 +144,26 @@ private void verifyTargetChannel(
}
}
+ private static ChannelFactory createMockChannelFactory(
+ List