diff --git a/.kokoro/presubmit/graalvm-native-a.cfg b/.kokoro/presubmit/graalvm-native-a.cfg index af9f68ad4f..d1c7f7580d 100644 --- a/.kokoro/presubmit/graalvm-native-a.cfg +++ b/.kokoro/presubmit/graalvm-native-a.cfg @@ -3,7 +3,7 @@ # Configure the docker image for kokoro-trampoline. env_vars: { key: "TRAMPOLINE_IMAGE" - value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.61.0" # {x-version-update:google-cloud-shared-dependencies:current} + value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.62.0-SNAPSHOT" # {x-version-update:google-cloud-shared-dependencies:current} } env_vars: { diff --git a/.kokoro/presubmit/graalvm-native-b.cfg b/.kokoro/presubmit/graalvm-native-b.cfg index 576031a719..1041cf93b8 100644 --- a/.kokoro/presubmit/graalvm-native-b.cfg +++ b/.kokoro/presubmit/graalvm-native-b.cfg @@ -3,7 +3,7 @@ # Configure the docker image for kokoro-trampoline. env_vars: { key: "TRAMPOLINE_IMAGE" - value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.61.0" # {x-version-update:google-cloud-shared-dependencies:current} + value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.62.0-SNAPSHOT" # {x-version-update:google-cloud-shared-dependencies:current} } env_vars: { diff --git a/.kokoro/presubmit/graalvm-native-c.cfg b/.kokoro/presubmit/graalvm-native-c.cfg index 1d86c06d22..731eee483b 100644 --- a/.kokoro/presubmit/graalvm-native-c.cfg +++ b/.kokoro/presubmit/graalvm-native-c.cfg @@ -3,7 +3,7 @@ # Configure the docker image for kokoro-trampoline. env_vars: { key: "TRAMPOLINE_IMAGE" - value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_c:3.61.0" # {x-version-update:google-cloud-shared-dependencies:current} + value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_c:3.62.0-SNAPSHOT" # {x-version-update:google-cloud-shared-dependencies:current} } env_vars: { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java index 9ef2a486f1..17c957d8cb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java @@ -65,6 +65,9 @@ public class ChannelPoolDpImpl implements ChannelPool { private static final String DEFAULT_LOG_NAME = "pool"; private static final AtomicInteger INDEX = new AtomicInteger(); + // TODO: Move to client configuration. + private static final int CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD = 5; + private final String poolLogId; @VisibleForTesting volatile int minGroups; @@ -95,6 +98,12 @@ public class ChannelPoolDpImpl implements ChannelPool { @GuardedBy("this") private boolean closed = false; + @GuardedBy("this") + private long lastRecycleNano = 0; + + @GuardedBy("this") + private Duration recycleBackoff = Duration.ofMillis(1); + public ChannelPoolDpImpl( Supplier channelSupplier, ChannelPoolConfiguration config, @@ -221,6 +230,8 @@ public void start(Listener responseListener, Metadata headers) { public void onBeforeSessionStart(PeerInfo peerInfo) { afeId = AfeId.extract(peerInfo); synchronized (ChannelPoolDpImpl.this) { + channelWrapper.consecutiveFailures = 0; + recycleBackoff = Duration.ofMillis(1); rehomeChannel(channelWrapper, afeId); sessionsPerAfeId.add(afeId); } @@ -232,6 +243,8 @@ public void onClose(Status status, Metadata trailers) { synchronized (ChannelPoolDpImpl.this) { if (afeId != null) { sessionsPerAfeId.remove(afeId); + } else if (!status.isOk() && status.getCode() != Code.CANCELLED) { + channelWrapper.consecutiveFailures++; } releaseChannel(channelWrapper, status); } @@ -306,12 +319,12 @@ private void releaseChannel(ChannelWrapper channelWrapper, Status status) { channelWrapper.group.numStreams--; channelWrapper.numOutstanding--; - if (shouldRecycleChannel(status)) { + if (shouldRecycleChannel(channelWrapper, status)) { recycleChannel(channelWrapper); } } - private static boolean shouldRecycleChannel(Status status) { + private static boolean shouldRecycleChannel(ChannelWrapper channelWrapper, Status status) { if (status.getCode() == Code.UNIMPLEMENTED) { return true; } @@ -322,6 +335,10 @@ private static boolean shouldRecycleChannel(Status status) { return true; } + if (channelWrapper.consecutiveFailures >= CONSECUTIVE_OPEN_SESSION_FAILURE_THRESHOLD) { + return true; + } + return false; } @@ -332,6 +349,13 @@ private void recycleChannel(ChannelWrapper channelWrapper) { return; } + if (lastRecycleNano > System.nanoTime() - recycleBackoff.toNanos()) { + return; + } + + lastRecycleNano = System.nanoTime(); + recycleBackoff = recycleBackoff.multipliedBy(2); + channelWrapper.group.channels.remove(channelWrapper); channelWrapper.channel.shutdown(); // Checking for starting group because we don't want to delete the stating group. @@ -480,6 +504,7 @@ static class ChannelWrapper { private final ManagedChannel channel; private final Instant createdAt; private int numOutstanding = 0; + private int consecutiveFailures = 0; public ChannelWrapper(AfeChannelGroup group, ManagedChannel channel, Clock clock) { this.group = group; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java index b5e2de38fa..9bb5ad1ece 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java @@ -469,4 +469,98 @@ void testRecycledChannelDoesNotRejoinPool() throws InterruptedException { pool.close(); } + + @Test + void testRecycleChannelOnConsecutiveFailures() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + + // Should not be recycled yet + verify(channel, times(0)).shutdown(); + verify(channelSupplier, times(1)).get(); + } + + // 5th failure + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + + // Now it should be recycled + verify(channel, times(1)).shutdown(); + verify(channelSupplier, times(2)).get(); + + pool.close(); + } + + @Test + void testResetConsecutiveFailuresOnSuccess() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + doReturn(Attributes.EMPTY).when(clientCall).getAttributes(); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + // 4 failures + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + } + verify(channel, times(0)).shutdown(); + + // A success: onHeaders (which calls onBeforeSessionStart) + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + + PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build(); + Metadata headers = new Metadata(); + headers.put( + SessionStreamImpl.PEER_INFO_KEY, + Base64.getEncoder().encodeToString(peerInfo.toByteArray())); + listener.getValue().onHeaders(headers); + listener.getValue().onClose(Status.OK, new Metadata()); + + // Another 4 failures - should still not recycle because counter was reset + for (int i = 0; i < 4; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.UNAVAILABLE, new Metadata()); + } + verify(channel, times(0)).shutdown(); + + pool.close(); + } + + @Test + void testCancelledDoesNotIncrementFailures() { + when(channelSupplier.get()).thenReturn(channel); + when(channel.newCall(any(), any())).thenReturn(clientCall); + doNothing().when(clientCall).start(listener.capture(), any()); + + ChannelPoolDpImpl pool = + new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor); + + for (int i = 0; i < 10; i++) { + pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT) + .start(mock(Listener.class), new Metadata()); + listener.getValue().onClose(Status.CANCELLED, new Metadata()); + } + + // Should never be recycled + verify(channel, times(0)).shutdown(); + verify(channelSupplier, times(1)).get(); + + pool.close(); + } }