diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index 74e1751db993..deb7c284626a 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -69,6 +69,11 @@ *

Package-private for internal use. */ class ChannelPool extends ManagedChannel { + static final String CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING = + "Channel pool is repeatedly resizing. " + + "Consider adjusting `initialChannelCount` or `maxResizeDelta` to a more reasonable value. " + + "See https://docs.cloud.google.com/java/docs/troubleshooting to enable logging " + + "and set `com.google.api.gax.grpc.ChannelPool.level=FINEST` to log the channel pool resize behavior."; @VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName()); private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50); @@ -84,6 +89,16 @@ class ChannelPool extends ManagedChannel { private final AtomicInteger indexTicker = new AtomicInteger(); private final String authority; + // The number of consecutive resize cycles to wait before logging a warning about repeated + // resizing. This value was chosen to detect repeated requests for changes (multiple continuous + // increase or decrease attempts) without being too sensitive. + private static final int CONSECUTIVE_RESIZE_THRESHOLD = 5; + + // Tracks the number of consecutive resize cycles where a resize actually occurred (either expand + // or shrink). Used to detect repeated resizing activity and log a warning. + // Note: This field is only accessed safely within resizeSafely() and does not need to be atomic. + private int consecutiveResizes = 0; + static ChannelPool create( ChannelPoolSettings settings, ChannelFactory channelFactory, @@ -275,7 +290,8 @@ private void resizeSafely() { *

  • Get the maximum number of outstanding RPCs since last invocation *
  • Determine a valid range of number of channels to handle that many outstanding RPCs *
  • If the current number of channel falls outside of that range, add or remove at most - * {@link ChannelPoolSettings#MAX_RESIZE_DELTA} to get closer to middle of that range. + * {@link ChannelPoolSettings#DEFAULT_MAX_RESIZE_DELTA} to get closer to middle of that + * range. * * *

    Not threadsafe, must be called under the entryWriteLock monitor @@ -313,9 +329,25 @@ void resize() { int currentSize = localEntries.size(); int delta = tentativeTarget - currentSize; int dampenedTarget = tentativeTarget; - if (Math.abs(delta) > ChannelPoolSettings.MAX_RESIZE_DELTA) { - dampenedTarget = - currentSize + (int) Math.copySign(ChannelPoolSettings.MAX_RESIZE_DELTA, delta); + if (Math.abs(delta) > settings.getMaxResizeDelta()) { + dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta); + } + + // Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking + // that `dampenedTarget != currentSize` would cause false positives when the pool is within + // bounds but not at the target (target aims for the middle of the bounds) + boolean resized = (currentSize < minChannels || currentSize > maxChannels); + if (resized) { + consecutiveResizes++; + } else { + consecutiveResizes = 0; + } + + // Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log + // message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit). + // However, aim to log once to ensure that this does not incur log spam. + if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) { + LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); } // Only resize the pool when thresholds are crossed diff --git a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java index ebdc48cdd69e..31204ead57b6 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPoolSettings.java @@ -59,7 +59,11 @@ public abstract class ChannelPoolSettings { static final Duration RESIZE_INTERVAL = Duration.ofMinutes(1); /** The maximum number of channels that can be added or removed at a time. */ - static final int MAX_RESIZE_DELTA = 2; + static final int DEFAULT_MAX_RESIZE_DELTA = 2; + + // Arbitrary limit to prevent unbounded growth and protect server/client resources. + // Capping at 25 ensures we don't scale too aggressively in a single cycle. + private static final int MAX_ALLOWED_RESIZE_DELTA = 25; /** * Threshold to start scaling down the channel pool. @@ -92,6 +96,22 @@ public abstract class ChannelPoolSettings { */ public abstract int getMaxChannelCount(); + /** + * The maximum number of channels that can be added or removed at a time. + * + *

    This setting limits the rate at which the channel pool can grow or shrink in a single resize + * period. The default value is {@value #DEFAULT_MAX_RESIZE_DELTA}. Increasing this value can help + * the pool better handle sudden bursts or spikes in requests by allowing it to scale up faster. + * Regardless of this setting, the number of channels will never exceed {@link + * #getMaxChannelCount()}. + * + *

    Note: This value cannot exceed {@value #MAX_ALLOWED_RESIZE_DELTA}. + * + *

    Warning: Higher values for resize delta may still result in performance degradation + * during spikes due to rapid scaling. + */ + public abstract int getMaxResizeDelta(); + /** * The initial size of the channel pool. * @@ -116,11 +136,7 @@ boolean isStaticSize() { return true; } // When the scaling threshold are not set - if (getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE) { - return true; - } - - return false; + return getMinRpcsPerChannel() == 0 && getMaxRpcsPerChannel() == Integer.MAX_VALUE; } public abstract Builder toBuilder(); @@ -132,6 +148,9 @@ public static ChannelPoolSettings staticallySized(int size) { .setMaxRpcsPerChannel(Integer.MAX_VALUE) .setMinChannelCount(size) .setMaxChannelCount(size) + // Static pools don't resize so this value doesn't affect operation. However, + // validation still checks that resize delta doesn't exceed channel pool size. + .setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size)) .build(); } @@ -142,7 +161,8 @@ public static Builder builder() { .setMaxChannelCount(200) .setMinRpcsPerChannel(0) .setMaxRpcsPerChannel(Integer.MAX_VALUE) - .setPreemptiveRefreshEnabled(false); + .setPreemptiveRefreshEnabled(false) + .setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA); } @AutoValue.Builder @@ -159,6 +179,15 @@ public abstract static class Builder { public abstract Builder setPreemptiveRefreshEnabled(boolean enabled); + /** + * Sets the maximum number of channels that can be added or removed in a single resize cycle. + * This acts as a rate limiter to prevent wild fluctuations. + * + *

    Warning: Higher values for resize delta may still result in performance degradation + * during spikes due to rapid scaling. + */ + public abstract Builder setMaxResizeDelta(int count); + abstract ChannelPoolSettings autoBuild(); public ChannelPoolSettings build() { @@ -178,6 +207,14 @@ public ChannelPoolSettings build() { "initial channel count must be less than maxChannelCount"); Preconditions.checkState( s.getInitialChannelCount() > 0, "Initial channel count must be greater than 0"); + Preconditions.checkState( + s.getMaxResizeDelta() > 0, "Max resize delta must be greater than 0"); + Preconditions.checkState( + s.getMaxResizeDelta() <= MAX_ALLOWED_RESIZE_DELTA, + "Max resize delta cannot be greater than " + MAX_ALLOWED_RESIZE_DELTA); + Preconditions.checkState( + s.getMaxResizeDelta() <= s.getMaxChannelCount(), + "Max resize delta cannot be greater than max channel count"); return s; } } diff --git a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java index 55a99c1481ec..b8c533ad3c82 100644 --- a/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java +++ b/sdk-platform-java/gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java @@ -523,6 +523,53 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce assertThat(pool.entries.get()).hasSize(2); } + @Test + void customResizeDeltaIsRespected() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + List> startedCalls = new ArrayList<>(); + + ChannelFactory channelFactory = + () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) + .thenAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ClientCall clientCall = Mockito.mock(ClientCall.class); + startedCalls.add(clientCall); + return clientCall; + }); + + channels.add(channel); + return channel; + }; + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(2) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(5) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(2); + + // Add 20 RPCs to push expansion + for (int i = 0; i < 20; i++) { + ClientCalls.futureUnaryCall( + pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance()); + } + pool.resize(); + // delta is 15 - 2 = 13. Capped at maxResizeDelta = 5. + // Expected size = 2 + 5 = 7. + assertThat(pool.entries.get()).hasSize(7); + } + @Test void removedIdleChannelsAreShutdown() throws Exception { ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); @@ -679,6 +726,121 @@ public void onComplete() {} assertThat(e.getMessage()).isEqualTo("Call is already cancelled"); } + @Test + void repeatedResizingLogsWarningOnExpand() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + List> startedCalls = new ArrayList<>(); + + ChannelFactory channelFactory = + () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Mockito.when(channel.newCall(Mockito.any(), Mockito.any())) + .thenAnswer( + invocation -> { + @SuppressWarnings("unchecked") + ClientCall clientCall = Mockito.mock(ClientCall.class); + startedCalls.add(clientCall); + return clientCall; + }); + + channels.add(channel); + return channel; + }; + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(1) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(1) + .setMinChannelCount(1) + .setMaxChannelCount(10) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(1); + + FakeLogHandler logHandler = new FakeLogHandler(); + ChannelPool.LOG.addHandler(logHandler); + + try { + // Add 20 RPCs to push expansion + for (int i = 0; i < 20; i++) { + ClientCalls.futureUnaryCall( + pool.newCall(METHOD_RECOGNIZE, CallOptions.DEFAULT), Color.getDefaultInstance()); + } + + // Resize 4 times, should not log warning yet + for (int i = 0; i < 4; i++) { + pool.resize(); + } + assertThat(logHandler.getAllMessages()).isEmpty(); + + // 5th resize, should log warning + pool.resize(); + assertThat(logHandler.getAllMessages()).hasSize(1); + assertThat(logHandler.getAllMessages()) + .contains(ChannelPool.CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); + + // 6th resize, should not log again + pool.resize(); + assertThat(logHandler.getAllMessages()).hasSize(1); + } finally { + ChannelPool.LOG.removeHandler(logHandler); + } + } + + @Test + void repeatedResizingLogsWarningOnShrink() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FixedExecutorProvider provider = FixedExecutorProvider.create(executor); + + List channels = new ArrayList<>(); + ChannelFactory channelFactory = + () -> { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + channels.add(channel); + return channel; + }; + + pool = + new ChannelPool( + ChannelPoolSettings.builder() + .setInitialChannelCount(10) + .setMinRpcsPerChannel(1) + .setMaxRpcsPerChannel(2) + .setMaxResizeDelta(1) + .setMinChannelCount(1) + .setMaxChannelCount(10) + .build(), + channelFactory, + provider); + assertThat(pool.entries.get()).hasSize(10); + + FakeLogHandler logHandler = new FakeLogHandler(); + ChannelPool.LOG.addHandler(logHandler); + + try { + // 0 RPCs, should shrink every cycle + // Resize 4 times, should not log warning yet + for (int i = 0; i < 4; i++) { + pool.resize(); + } + assertThat(logHandler.getAllMessages()).isEmpty(); + + // 5th resize, should log warning + pool.resize(); + assertThat(logHandler.getAllMessages()) + .contains(ChannelPool.CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING); + } finally { + ChannelPool.LOG.removeHandler(logHandler); + } + } + @Test void testDoubleRelease() throws Exception { FakeLogHandler logHandler = new FakeLogHandler(); @@ -737,4 +899,12 @@ void testDoubleRelease() throws Exception { ChannelPool.LOG.removeHandler(logHandler); } } + + @Test + void settingsValidationFailsWhenMaxResizeDeltaExceedsLimit() { + ChannelPoolSettings.Builder builder = + ChannelPoolSettings.builder().setMaxResizeDelta(26).setMaxChannelCount(30); + org.junit.jupiter.api.Assertions.assertThrows( + IllegalStateException.class, () -> builder.build()); + } }