From 019ca9693c7253a77f20233f0399a34515f7aa0f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 20 Apr 2026 15:47:31 -0400 Subject: [PATCH 1/3] fix: Fix channel clamping logic in ChannelPools to respect channel bounds --- .../com/google/api/gax/grpc/ChannelPool.java | 9 +- .../api/gax/grpc/ChannelPoolSettings.java | 2 +- .../google/api/gax/grpc/ChannelPoolTest.java | 171 ++++++++++-------- 3 files changed, 104 insertions(+), 78 deletions(-) diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index deb7c284626a..b365037732f0 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -310,6 +310,10 @@ void resize() { if (minChannels < settings.getMinChannelCount()) { minChannels = settings.getMinChannelCount(); } + // Limit in case the calculated min channel count exceeds the configured max channel count + if (minChannels > settings.getMaxChannelCount()) { + minChannels = settings.getMaxChannelCount(); + } // Number of channels if each channel operated at minimum capacity // Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem. @@ -319,8 +323,9 @@ void resize() { if (maxChannels > settings.getMaxChannelCount()) { maxChannels = settings.getMaxChannelCount(); } - if (maxChannels < minChannels) { - maxChannels = minChannels; + // Limit in case the calculated max channel count falls below the configured min channel count + if (maxChannels < settings.getMinChannelCount()) { + maxChannels = settings.getMinChannelCount(); } // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index dc77f303541b..31aae0202ad4 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -182,7 +182,7 @@ public ChannelPoolSettings build() { Preconditions.checkState( s.getMinChannelCount() > 0, "Minimum channel count must be at least 1"); Preconditions.checkState( - s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid"); + s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid"); Preconditions.checkState( s.getMinChannelCount() <= s.getInitialChannelCount(), "initial channel count be at least minChannelCount"); diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index 950beb605406..a3b106a677e4 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -144,6 +144,26 @@ private void verifyTargetChannel( } } + private static ChannelFactory createMockChannelFactory( + List channels, List> startedCalls) { + return () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) + .thenAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ClientCall clientCall = Mockito.mock(ClientCall.class); + if (startedCalls != null) { + startedCalls.add(clientCall); + } + return clientCall; + }); + + channels.add(channel); + return channel; + }; + } + @Test void ensureEvenDistribution() throws InterruptedException, IOException { int numChannels = 10; @@ -451,21 +471,7 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -531,21 +537,7 @@ void customResizeDeltaIsRespected() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -578,21 +570,7 @@ void removedIdleChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -619,21 +597,7 @@ void removedActiveChannelsAreShutdown() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -734,21 +698,7 @@ void repeatedResizingLogsWarningOnExpand() throws Exception { List channels = new ArrayList<>(); List> startedCalls = new ArrayList<>(); - ChannelFactory channelFactory = - () -> { - ManagedChannel channel = Mockito.mock(ManagedChannel.class); - Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) - .thenAnswer( - invocation -> { - @SuppressWarnings("unchecked") - ClientCall clientCall = Mockito.mock(ClientCall.class); - startedCalls.add(clientCall); - return clientCall; - }); - - channels.add(channel); - return channel; - }; + ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls); pool = new ChannelPool( @@ -899,4 +849,75 @@ void testDoubleRelease() throws Exception { ChannelPool.LOG.removeHandler(logHandler); } } + + @Test + void settingsValidationFailsWhenMinChannelsExceedsMaxChannels() { + Assertions.assertThrows( + IllegalStateException.class, + () -> ChannelPoolSettings.builder().setMinChannelCount(2).setMaxChannelCount(1).build()); + } + + @Test + void minChannelsClampedToMaxChannelCountUnderHighLoad() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, null); + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(1) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(10) // Allow large growth + .setMinChannelCount(1) + .setMaxChannelCount(5) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(1); + + // Add 20 RPCs, which would require 10 channels (20/2) + // But max is 5 + for (int i = 0; i < 20; i++) { + ClientCalls.futureUnaryCall( + pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance()); + } + + pool.resize(); + + // Should be clamped to maxChannelCount = 5 + assertThat(pool.entries.get()).hasSize(5); + } + + @Test + void maxChannelsClampedToMinChannelCountUnderLowLoad() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, null); + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(5) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMinChannelCount(3) + .setMaxChannelCount(10) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(5); + + // With no outstanding RPCs, the pool should want to shrink to 0 + // But min is 3 + pool.resize(); + + // Should be clamped to minChannelCount = 3 + assertThat(pool.entries.get()).hasSize(3); + } } From 3a2046ffec51374040b86bb3b0082628a9ada1d9 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 20 Apr 2026 18:18:32 -0400 Subject: [PATCH 2/3] Refine ChannelPool Javadoc and restore validation --- .../com/google/api/gax/grpc/ChannelPool.java | 17 ++++---- .../api/gax/grpc/ChannelPoolSettings.java | 42 +++++++++++++++++-- .../google/api/gax/grpc/ChannelPoolTest.java | 6 +-- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index b365037732f0..3bf048c41983 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel { // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand // or shrink). Used to detect repeated resizing activity and log a warning. - // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. - private int consecutiveResizes = 0; + private final AtomicInteger consecutiveResizes = new AtomicInteger(0); static ChannelPool create( ChannelPoolSettings settings, @@ -328,13 +327,17 @@ void resize() { maxChannels = settings.getMinChannelCount(); } - // If the pool were to be resized, try to aim for the middle of the bound, but limit rate of - // change. + // If the pool were to be resized, try to aim for the middle of the bound, + // but limit rate of change. int tentativeTarget = (maxChannels + minChannels) / 2; int currentSize = localEntries.size(); + // Calculate the desired change in pool size. int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; + // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. if (Math.abs(delta) > settings.getMaxResizeDelta()) { + // Limit the change to maxResizeDelta, maintaining the correct direction (positive or + // negative). dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); } @@ -343,15 +346,15 @@ void resize() { // bounds but not at the target (target aims for the middle of the bounds) boolean resized = (currentSize < minChannels || currentSize > maxChannels); if (resized) { - consecutiveResizes++; + consecutiveResizes.incrementAndGet(); } else { - consecutiveResizes = 0; + consecutiveResizes.set(0); } // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). // However, aim to log once to ensure that this does not incur log spam. - if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) { LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index 31aae0202ad4..b3a8d3e890ff 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -158,18 +158,57 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + /** + * Sets the minimum desired number of concurrent RPCs per channel. + * + *

This ensures channels are adequately utilized. If the average load per channel falls below + * this value, the pool attempts to shrink. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMinRpcsPerChannel(int count); + /** + * Sets the maximum desired number of concurrent RPCs per channel. + * + *

This ensures channels do not become overloaded. If the average load per channel exceeds + * this value, the pool attempts to expand. The resulting target channel count is a dynamic + * value determined by load and is bounded by {@link #setMinChannelCount} and {@link + * #setMaxChannelCount}. + */ public abstract Builder setMaxRpcsPerChannel(int count); + /** + * Sets the minimum number of channels the pool can shrink to. + * + *

When resizing, if the calculated resize bounds fall below this minimum configuration, the + * bounds will be clamped to this value. This ensures the pool never shrinks below this absolute + * minimum, even under very low load. + */ public abstract Builder setMinChannelCount(int count); + /** + * Sets the maximum number of channels the pool can expand to. + * + *

When resizing, if the calculated resize bounds exceed this maximum configuration, the + * bounds will be clamped to this value. This ensures the pool never expands above this absolute + * maximum, even under very high load. + */ public abstract Builder setMaxChannelCount(int count); + /** Sets the initial number of channels in the pool. */ public abstract Builder setInitialChannelCount(int count); + /** + * Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle. + */ public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); + /** + * Sets the maximum number of channels that can be added or removed in a single resize cycle. + * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically + * according to {@link #RESIZE_INTERVAL} (default 1 minute). + */ public abstract Builder setMaxResizeDelta(int count); abstract ChannelPoolSettings autoBuild(); @@ -193,9 +232,6 @@ public ChannelPoolSettings build() { s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0"); Preconditions.checkState( s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0"); - Preconditions.checkState( - s.getMaxResizeDelta() <= s.getMaxChannelCount(), - "Max resize delta cannot be greater than max channel count"); return s; } } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index a3b106a677e4..1820c46de60c 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -852,9 +852,9 @@ void testDoubleRelease() throws Exception { @Test void settingsValidationFailsWhenMinChannelsExceedsMaxChannels() { - Assertions.assertThrows( - IllegalStateException.class, - () -> ChannelPoolSettings.builder().setMinChannelCount(2).setMaxChannelCount(1).build()); + ChannelPoolSettings.Builder builder = + ChannelPoolSettings.builder().setMinChannelCount(2).setMaxChannelCount(1); + Assertions.assertThrows(IllegalStateException.class, () -> builder.build()); } @Test From 8992272d98ef93a10bcc964587f4ef136408a69b Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 20 Apr 2026 18:38:47 -0400 Subject: [PATCH 3/3] docs(gax-grpc): clarify runtime capping of maxResizeDelta and simplify static pool config --- .../com/google/api/gax/grpc/ChannelPool.java | 16 +++++++-- .../api/gax/grpc/ChannelPoolSettings.java | 8 ++--- .../google/api/gax/grpc/ChannelPoolTest.java | 36 +++++++++++++++++++ 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index 3bf048c41983..1591aca9e742 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -335,12 +335,22 @@ void resize() { int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; // Dampen the rate of change if the desired delta exceeds the maximum allowed step size. - if (Math.abs(delta) > settings.getMaxResizeDelta()) { - // Limit the change to maxResizeDelta, maintaining the correct direction (positive or + // Ensure that the step size is capped by the max channel count to handle small pool + // configurations. + int effectiveMaxResizeDelta = + Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount()); + if (Math.abs(delta) > effectiveMaxResizeDelta) { + // Limit the change to effectiveMaxResizeDelta, maintaining the correct direction (positive or // negative). - dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); + dampenedTarget = currentSize + (int) Math.copySign(effectiveMaxResizeDelta, delta); } + // Ensure that the calculated dampedTarget value will never exceed the maxChannelCount or fall + // below minChannelCount + dampenedTarget = + Math.max( + settings.getMinChannelCount(), Math.min(settings.getMaxChannelCount(), dampenedTarget)); + // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking // that `dampenedTarget != currentSize` would cause false positives when the pool is within // bounds but not at the target (target aims for the middle of the bounds) diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index b3a8d3e890ff..2fd52ce488df 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -139,9 +139,8 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) - // Static pools don't resize so this value doesn't affect operation. However, - // validation still checks that resize delta doesn't exceed channel pool size. - .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) + // Static pools don't resize so this value doesn't affect operation. + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA) .build(); } @@ -207,7 +206,8 @@ public abstract static class Builder { /** * Sets the maximum number of channels that can be added or removed in a single resize cycle. * This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically - * according to {@link #RESIZE_INTERVAL} (default 1 minute). + * according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is + * effectively capped by the bound configured via {@link #setMaxChannelCount}. */ public abstract Builder setMaxResizeDelta(int count); diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index 1820c46de60c..7fc45f08290e 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -892,6 +892,42 @@ void minChannelsClampedToMaxChannelCountUnderHighLoad() throws Exception { assertThat(pool.entries.get()).hasSize(5); } + @Test + void resizeDampenedByMaxResizeDelta() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = createMockChannelFactory(channels, null); + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(1) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(2) // Limit growth to 2 channels per cycle + .setMinChannelCount(1) + .setMaxChannelCount(10) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(1); + + // Add 20 RPCs, which would require 10 channels (20/2) + // Desired delta is +9 (10 - 1) + // maxResizeDelta is 2, so it should be dampened to 2 + for (int i = 0; i < 20; i++) { + ClientCalls.futureUnaryCall( + pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance()); + } + + pool.resize(); + + // Should be dampened to 1 + 2 = 3 channels + assertThat(pool.entries.get()).hasSize(3); + } + @Test void maxChannelsClampedToMinChannelCountUnderLowLoad() throws Exception { ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class);