Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import static com.google.cloud.datastore.Validator.validateNamespace;

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
Expand All @@ -37,7 +37,6 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Objects;
Expand All @@ -54,9 +53,18 @@ public class DatastoreOptions extends ServiceOptions<Datastore, DatastoreOptions
private static final String DEFAULT_DATABASE_ID = "";
public static final String PROJECT_ID_ENV_VAR = "DATASTORE_PROJECT_ID";
public static final String LOCAL_HOST_ENV_VAR = "DATASTORE_EMULATOR_HOST";
public static final int INIT_CHANNEL_COUNT = 1;
public static final int INIT_CHANNEL_COUNT = 5;
Comment thread
lqiu96 marked this conversation as resolved.
static final int CHANNEL_POOL_DEFAULT_RESIZE_DELTA = 5;
// Configure this default to be 100 to match the typical default `MAX_CONCURRENT_STREAMS`.
// Larger values *may* experience possible client-side queueing as excess streams cannot be
// multiplexed onto a full Http2 connection.
static final int CHANNEL_POOL_MAX_RPCS_PER_CHANNEL = 100;
public static final int MIN_CHANNEL_COUNT = 1;
public static final int MAX_CHANNEL_COUNT = 4;

// This is a default max channel constant value set to handle the default initial channel
// count and resize delta.
@ObsoleteApi("This constant is obsolete and will be removed in a future version")
public static final int MAX_CHANNEL_COUNT = 10;

private transient TransportChannelProvider channelProvider = null;

Expand Down Expand Up @@ -233,33 +241,29 @@ private DatastoreOptions(Builder builder) {
"Only gRPC transport allows setting of channel provider or credentials provider");
} else if (getTransportOptions() instanceof GrpcTransportOptions) {
if (builder.channelProvider == null) {
/*
The default gRPC connection pool is configured with a minimum of 1 channel.
The maximum channel count automatically defaults to 200 (Defined in gax-grpc).
*/
// Set the default gRPC connection pool to be configured with a minimum of 1 channel.
// The maximum channel count automatically defaults to 200 (as defined in gax-grpc).
// Datastore sets the initial channel pool count to be 5 channels to allow better handle
// large loads of requests and the resize delta to be 5 to scale quicker. In cases of low
// load, the channel count will scale down as needed and memory will be freed. The default
// configuration is set to try and handle ~500 QPS and will scale up and down as needed.
ChannelPoolSettings datastoreChannelPoolSettings =
ChannelPoolSettings.builder()
.setInitialChannelCount(INIT_CHANNEL_COUNT)
.setMinChannelCount(MIN_CHANNEL_COUNT)
.setMaxRpcsPerChannel(CHANNEL_POOL_MAX_RPCS_PER_CHANNEL)
.setMaxResizeDelta(CHANNEL_POOL_DEFAULT_RESIZE_DELTA)
.build();

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator =
this.traceUtil.getChannelConfigurator();
if (channelConfigurator == null) {
this.channelProvider =
GrpcTransportOptions.setUpChannelProvider(
DatastoreSettings.defaultGrpcTransportProviderBuilder()
.setChannelPoolSettings(datastoreChannelPoolSettings),
this);
} else {
InstantiatingGrpcChannelProvider.Builder channelProviderBuilder =
DatastoreSettings.defaultGrpcTransportProviderBuilder()
.setChannelPoolSettings(datastoreChannelPoolSettings)
.setEndpoint(getHost());
if (traceUtil.getChannelConfigurator() != null) {
// Intercept the grpc channel calls to add telemetry info.
this.channelProvider =
GrpcTransportOptions.setUpChannelProvider(
DatastoreSettings.defaultGrpcTransportProviderBuilder()
.setChannelPoolSettings(datastoreChannelPoolSettings)
.setChannelConfigurator(channelConfigurator),
this);
channelProviderBuilder.setChannelConfigurator(traceUtil.getChannelConfigurator());
}
this.channelProvider = channelProviderBuilder.build();
} else {
this.channelProvider = builder.channelProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ClientContext;
Expand Down Expand Up @@ -80,14 +79,11 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException {
DatastoreStubSettings.newBuilder(clientContext)
.applyToAllUnaryMethods(retrySettingSetter(datastoreOptions));
if (!isEmulator(datastoreOptions)) {
// Use the TransportChannelProvider configured in DatastoreOptions. For gRPC transport, this
// will
// be configured with a default ChannelPool configuration
datastoreStubSettingsBuilder.setTransportChannelProvider(
DatastoreSettings.defaultGrpcTransportProviderBuilder()
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(DatastoreOptions.INIT_CHANNEL_COUNT)
.setMinChannelCount(DatastoreOptions.MIN_CHANNEL_COUNT)
.build())
.build());
datastoreOptions.getTransportChannelProvider());
}

datastoreStub = GrpcDatastoreStub.create(datastoreStubSettingsBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ public void testGrpcDefaultChannelConfigurations() {
assertEquals(channelPoolSettings.getInitialChannelCount(), DatastoreOptions.INIT_CHANNEL_COUNT);
assertEquals(channelPoolSettings.getMinChannelCount(), DatastoreOptions.MIN_CHANNEL_COUNT);
assertEquals(channelPoolSettings.getMaxChannelCount(), DEFAULT_MAX_CHANNEL_COUNT);
assertEquals(
channelPoolSettings.getMaxRpcsPerChannel(),
DatastoreOptions.CHANNEL_POOL_MAX_RPCS_PER_CHANNEL);
}

@Test
Expand Down
Loading