Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class ChannelPool extends ManagedChannel {

// Tracks the number of consecutive resize cycles where a resize actually occurred (either expand
// or shrink). Used to detect repeated resizing activity and log a warning.
// Note: This field is only accessed safely within resizeSafely() and does not need to be atomic.
private int consecutiveResizes = 0;
private final AtomicInteger consecutiveResizes = new AtomicInteger(0);

static ChannelPool create(
ChannelPoolSettings settings,
Expand Down Expand Up @@ -310,6 +309,10 @@ void resize() {
if (minChannels < settings.getMinChannelCount()) {
minChannels = settings.getMinChannelCount();
}
// Limit in case the calculated min channel count exceeds the configured max channel count
if (minChannels > settings.getMaxChannelCount()) {
minChannels = settings.getMaxChannelCount();
}

// Number of channels if each channel operated at minimum capacity
// Note: getMinRpcsPerChannel() can return 0, but division by 0 shouldn't cause a problem.
Expand All @@ -319,34 +322,49 @@ void resize() {
if (maxChannels > settings.getMaxChannelCount()) {
maxChannels = settings.getMaxChannelCount();
}
if (maxChannels < minChannels) {
maxChannels = minChannels;
// Limit in case the calculated max channel count falls below the configured min channel count
if (maxChannels < settings.getMinChannelCount()) {
maxChannels = settings.getMinChannelCount();
}

// If the pool were to be resized, try to aim for the middle of the bound, but limit rate of
// change.
// If the pool were to be resized, try to aim for the middle of the bound,
// but limit rate of change.
int tentativeTarget = (maxChannels + minChannels) / 2;
int currentSize = localEntries.size();
// Calculate the desired change in pool size.
int delta = tentativeTarget - currentSize;
int dampenedTarget = tentativeTarget;
if (Math.abs(delta) > settings.getMaxResizeDelta()) {
dampenedTarget = currentSize + (int) Math.copySign(settings.getMaxResizeDelta(), delta);
}
// Dampen the rate of change if the desired delta exceeds the maximum allowed step size.
// Ensure that the step size is capped by the max channel count to handle small pool
// configurations.
int effectiveMaxResizeDelta =
Math.min(settings.getMaxResizeDelta(), settings.getMaxChannelCount());
if (Math.abs(delta) > effectiveMaxResizeDelta) {
// Limit the change to effectiveMaxResizeDelta, maintaining the correct direction (positive or
// negative).
dampenedTarget = currentSize + (int) Math.copySign(effectiveMaxResizeDelta, delta);
}

// Ensure that the calculated dampedTarget value will never exceed the maxChannelCount or fall
// below minChannelCount
dampenedTarget =
Math.max(
settings.getMinChannelCount(), Math.min(settings.getMaxChannelCount(), dampenedTarget));

// Only count as "resized" if the thresholds are crossed and Gax attempts to scale. Checking
// that `dampenedTarget != currentSize` would cause false positives when the pool is within
// bounds but not at the target (target aims for the middle of the bounds)
boolean resized = (currentSize < minChannels || currentSize > maxChannels);
if (resized) {
consecutiveResizes++;
consecutiveResizes.incrementAndGet();
} else {
consecutiveResizes = 0;
consecutiveResizes.set(0);
}

// Log warning only once when the consecutive threshold is reached to avoid spamming logs. Log
// message will repeat if the number of consecutive resizes resets (e.g. stabilizes for a bit).
// However, aim to log once to ensure that this does not incur log spam.
if (consecutiveResizes == CONSECUTIVE_RESIZE_THRESHOLD) {
if (consecutiveResizes.get() == CONSECUTIVE_RESIZE_THRESHOLD) {
LOG.warning(CHANNEL_POOL_CONSECUTIVE_RESIZING_WARNING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ public static ChannelPoolSettings staticallySized(int size) {
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
.setMinChannelCount(size)
.setMaxChannelCount(size)
// Static pools don't resize so this value doesn't affect operation. However,
// validation still checks that resize delta doesn't exceed channel pool size.
.setMaxResizeDelta(Math.min(DEFAULT_MAX_RESIZE_DELTA, size))
// Static pools don't resize so this value doesn't affect operation.
.setMaxResizeDelta(DEFAULT_MAX_RESIZE_DELTA)
.build();
}

Expand All @@ -167,21 +166,57 @@ public static Builder builder() {

@AutoValue.Builder
public abstract static class Builder {
/**
* Sets the minimum desired number of concurrent RPCs per channel.
*
* <p>This ensures channels are adequately utilized. If the average load per channel falls below
* this value, the pool attempts to shrink. The resulting target channel count is a dynamic
* value determined by load and is bounded by {@link #setMinChannelCount} and {@link
* #setMaxChannelCount}.
*/
public abstract Builder setMinRpcsPerChannel(int count);

/**
* Sets the maximum desired number of concurrent RPCs per channel.
*
* <p>This ensures channels do not become overloaded. If the average load per channel exceeds
* this value, the pool attempts to expand. The resulting target channel count is a dynamic
* value determined by load and is bounded by {@link #setMinChannelCount} and {@link
* #setMaxChannelCount}.
*/
public abstract Builder setMaxRpcsPerChannel(int count);

/**
* Sets the minimum number of channels the pool can shrink to.
*
* <p>When resizing, if the calculated resize bounds fall below this minimum configuration, the
* bounds will be clamped to this value. This ensures the pool never shrinks below this absolute
* minimum, even under very low load.
*/
public abstract Builder setMinChannelCount(int count);

/**
* Sets the maximum number of channels the pool can expand to.
*
* <p>When resizing, if the calculated resize bounds exceed this maximum configuration, the
* bounds will be clamped to this value. This ensures the pool never expands above this absolute
* maximum, even under very high load.
*/
public abstract Builder setMaxChannelCount(int count);

/** Sets the initial number of channels in the pool. */
public abstract Builder setInitialChannelCount(int count);

/**
* Sets whether preemptive channel refresh is enabled to prevent channels from becoming idle.
*/
public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);

/**
* Sets the maximum number of channels that can be added or removed in a single resize cycle.
* This acts as a rate limiter to prevent wild fluctuations.
* This acts as a rate limiter to prevent wild fluctuations. The pool resizes periodically
* according to {@link #RESIZE_INTERVAL} (default 1 minute). During resizing, this value is
* effectively capped by the bound configured via {@link #setMaxChannelCount}.
*
* <p><b>Warning:</b> Higher values for resize delta may still result in performance degradation
* during spikes due to rapid scaling.
Expand All @@ -198,7 +233,7 @@ public ChannelPoolSettings build() {
Preconditions.checkState(
s.getMinChannelCount() > 0, "Minimum channel count must be at least 1");
Preconditions.checkState(
s.getMinChannelCount() <= s.getMaxRpcsPerChannel(), "absolute channel range is invalid");
s.getMinChannelCount() <= s.getMaxChannelCount(), "absolute channel range is invalid");
Preconditions.checkState(
s.getMinChannelCount() <= s.getInitialChannelCount(),
"initial channel count be at least minChannelCount");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ private void verifyTargetChannel(
}
}

private static ChannelFactory createMockChannelFactory(
List<ManagedChannel> channels, List<ClientCall<Object, Object>> startedCalls) {
return () -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
if (startedCalls != null) {
startedCalls.add(clientCall);
}
return clientCall;
});

channels.add(channel);
return channel;
};
}

@Test
void ensureEvenDistribution() throws InterruptedException, IOException {
int numChannels = 10;
Expand Down Expand Up @@ -451,21 +471,7 @@ void channelCountShouldNotChangeWhenOutstandingRpcsAreWithinLimits() throws Exce
List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};
ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls);

pool =
new ChannelPool(
Expand Down Expand Up @@ -531,21 +537,7 @@ void customResizeDeltaIsRespected() throws Exception {
List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};
ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls);

pool =
new ChannelPool(
Expand Down Expand Up @@ -578,21 +570,7 @@ void removedIdleChannelsAreShutdown() throws Exception {
List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};
ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls);

pool =
new ChannelPool(
Expand All @@ -619,21 +597,7 @@ void removedActiveChannelsAreShutdown() throws Exception {
List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};
ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls);

pool =
new ChannelPool(
Expand Down Expand Up @@ -734,21 +698,7 @@ void repeatedResizingLogsWarningOnExpand() throws Exception {
List<ManagedChannel> channels = new ArrayList<>();
List<ClientCall<Object, Object>> startedCalls = new ArrayList<>();

ChannelFactory channelFactory =
() -> {
ManagedChannel channel = Mockito.mock(ManagedChannel.class);
Mockito.when(channel.newCall(Mockito.any(), Mockito.any()))
.thenAnswer(
invocation -> {
@SuppressWarnings("unchecked")
ClientCall<Object, Object> clientCall = Mockito.mock(ClientCall.class);
startedCalls.add(clientCall);
return clientCall;
});

channels.add(channel);
return channel;
};
ChannelFactory channelFactory = createMockChannelFactory(channels, startedCalls);

pool =
new ChannelPool(
Expand Down
Loading