diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index deb7c284626a..1591aca9e742 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel { // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand // or shrink). Used to detect repeated resizing activity and log a warning. - // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. - private int consecutiveResizes = 0; + private final AtomicInteger consecutiveResizes = new AtomicInteger(0); static ChannelPool create( ChannelPoolSettings settings, @@ -310,6 +309,10 @@ void resize() { if (minChannels < settings.getMinChannelCount()) { minChannels = settings.getMinChannelCount(); } + // Limit in case the calculated min channel count exceeds the configured max channel count + if (minChannels > settings.getMaxChannelCount()) { + minChannels = settings.getMaxChannelCount(); + } // Number of channels if each channel operated at minimum capacity // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem. @@ -319,34 +322,49 @@ void resize() { if (maxChannels > settings.getMaxChannelCount()) { maxChannels = settings.getMaxChannelCount(); } - if (maxChannels < minChannels) { - maxChannels = minChannels; + // Limit in case the calculated max channel count falls below the configured min channel count + if (maxChannels < settings.getMinChannelCount()) { + maxChannels = settings.getMinChannelCount(); } - // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of - // change. + // If the pool were to be resized, try to aim for the middle of the bound, + // but limit rate of change. int tentativeTarget = (maxChannels + minChannels) / 2; int currentSize = localEntries.size(); + // Calculate the desired change in pool size. int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > settings.getMaxResizeDelta()) { - dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); - } + // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. + // Ensure that the step size is capped by the max channel count to handle small pool + // configurations. + int effectiveMaxResizeDelta = + Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount()); + if (Math.abs(delta) > effectiveMaxResizeDelta) { + // Limit the change to effectiveMaxResizeDelta, maintaining the correct direction (positive or + // negative). + dampenedTarget = currentSize + (int) Math.copySign(effectiveMaxResizeDelta, delta); + } + + // Ensure that the calculated dampedTarget value will never exceed the maxChannelCount or fall + // below minChannelCount + dampenedTarget = + Math.max( + settings.getMinChannelCount(), Math.min(settings.getMaxChannelCount(), dampenedTarget)); // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking // that `dampenedTarget != currentSize` would cause false positives when the pool is within // bounds but not at the target (target aims for the middle of the bounds) boolean resized = (currentSize < minChannels || currentSize > maxChannels); if (resized) { - consecutiveResizes++; + consecutiveResizes.incrementAndGet(); } else { - consecutiveResizes = 0; + consecutiveResizes.set(0); } // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). // However, aim to log once to ensure that this does not incur log spam. - if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) { LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index 31204ead57b6..a17b28bd061c 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) - // Static pools don't resize so this value doesn't affect operation. However, - // validation still checks that resize delta doesn't exceed channel pool size. - .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) + // Static pools don't resize so this value doesn't affect operation. + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA) .build(); } @@ -167,21 +166,57 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + /** + * Sets the minimum desired number of concurrent RPCs per channel. + * + *

This ensures channels are adequately utilized. If the average load per channel falls below + * this value, the pool attempts to shrink. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMinRpcsPerChannel(int count); + /** + * Sets the maximum desired number of concurrent RPCs per channel. + * + *

This ensures channels do not become overloaded. If the average load per channel exceeds + * this value, the pool attempts to expand. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMaxRpcsPerChannel(int count); + /** + * Sets the minimum number of channels the pool can shrink to. + * + *

When resizing, if the calculated resize bounds fall below this minimum configuration, the + * bounds will be clamped to this value. This ensures the pool never shrinks below this absolute + * minimum, even under very low load. + */ public abstract Builder setMinChannelCount(int count); + /** + * Sets the maximum number of channels the pool can expand to. + * + *

When resizing, if the calculated resize bounds exceed this maximum configuration, the + * bounds will be clamped to this value. This ensures the pool never expands above this absolute + * maximum, even under very high load. + */ public abstract Builder setMaxChannelCount(int count); + /** Sets the initial number of channels in the pool. */ public abstract Builder setInitialChannelCount(int count); + /** + * Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle. + */ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); /** * Sets the maximum number of channels that can be added or removed in a single resize cycle. - * This acts as a rate limiter to prevent wild fluctuations. + * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically + * according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is + * effectively capped by the bound configured via {@link #setMaxChannelCount}. * *

Warning: Higher values for resize delta may still result in performance degradation * during spikes due to rapid scaling. @@ -198,7 +233,7 @@ public ChannelPoolSettings build() { Preconditions.checkState( s.getMinChannelCount() > 0, "Minimum channel count must be at least 1"); Preconditions.checkState( - s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid"); + s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid"); Preconditions.checkState( s.getMinChannelCount() <= s.getInitialChannelCount(), "initial channel count be at least minChannelCount"); diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index b8c533ad3c82..c7cad57be186 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -144,6 +144,26 @@ private void verifyTargetChannel( } } + private static ChannelFactory createMockChannelFactory( + List channels, List> startedCalls) { + return () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) + .thenAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ClientCall clientCall = Mockito.mock(ClientCall.class); + if (startedCalls != null) { + startedCalls.add(clientCall); + } + return clientCall; + }); + + channels.add(channel); + return channel; + }; + } + @Test void ensureEvenDistribution() throws InterruptedException, IOException { int numChannels = 10; @@ -451,21 +471,7 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -531,21 +537,7 @@ void customResizeDeltaIsRespected() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -578,21 +570,7 @@ void removedIdleChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -619,21 +597,7 @@ void removedActiveChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -734,21 +698,7 @@ void repeatedResizingLogsWarningOnExpand() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool(