Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

/** Options for the Cloud Spanner service. */
public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {

private static final long serialVersionUID = 2789571558532701170L;
private static SpannerEnvironment environment = SpannerEnvironmentImpl.INSTANCE;

Expand All @@ -89,6 +90,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
"https://www.googleapis.com/auth/spanner.admin",
"https://www.googleapis.com/auth/spanner.data");
private static final int MAX_CHANNELS = 256;
private static final int DEFAULT_NUM_CHANNELS = 4;
private static final int GRPC_GCP_ENABLED_DEFAULT_NUM_CHANNELS = 8;
private final TransportChannelProvider channelProvider;

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -133,6 +136,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
* {@link SpannerOptions}.
*/
public interface CallCredentialsProvider {

/** Return the {@link CallCredentials} to use for a gRPC call. */
CallCredentials getCallCredentials();
}
Expand Down Expand Up @@ -187,6 +191,7 @@ public interface CallCredentialsProvider {
* }</pre>
*/
public interface CallContextConfigurator {

/**
* Configure a {@link ApiCallContext} for a specific RPC call.
*
Expand Down Expand Up @@ -307,6 +312,7 @@ static <ReqT, RespT> SpannerMethod valueOf(ReqT request, MethodDescriptor<ReqT,
* }</pre>
*/
public static class SpannerCallContextTimeoutConfigurator implements CallContextConfigurator {

private Duration commitTimeout;
private Duration rollbackTimeout;

Expand Down Expand Up @@ -453,6 +459,7 @@ public SpannerCallContextTimeoutConfigurator withPartitionReadTimeout(

/** Default implementation of {@code SpannerFactory}. */
private static class DefaultSpannerFactory implements SpannerFactory {

private static final DefaultSpannerFactory INSTANCE = new DefaultSpannerFactory();

@Override
Expand All @@ -463,6 +470,7 @@ public Spanner create(SpannerOptions serviceOptions) {

/** Default implementation of {@code SpannerRpcFactory}. */
private static class DefaultSpannerRpcFactory implements SpannerRpcFactory {

private static final DefaultSpannerRpcFactory INSTANCE = new DefaultSpannerRpcFactory();

@Override
Expand All @@ -475,6 +483,7 @@ public ServiceRpc create(SpannerOptions options) {

/** {@link ExecutorProvider} that is used for {@link AsyncResultSet}. */
public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {

/** Overridden to suppress the throws declaration of the super interface. */
@Override
void close();
Expand All @@ -485,6 +494,7 @@ public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseab
* ScheduledExecutorService}.
*/
public static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {

private final ScheduledExecutorService executor;

private FixedCloseableExecutorProvider(ScheduledExecutorService executor) {
Expand Down Expand Up @@ -602,6 +612,7 @@ private SpannerOptions(Builder builder) {
* variables.
*/
public interface SpannerEnvironment {

/**
* The optimizer version to use. Must return an empty string to indicate that no value has been
* set.
Expand All @@ -624,6 +635,7 @@ default String getOptimizerStatisticsPackage() {
* variables.
*/
private static class SpannerEnvironmentImpl implements SpannerEnvironment {

private static final SpannerEnvironmentImpl INSTANCE = new SpannerEnvironmentImpl();
private static final String SPANNER_OPTIMIZER_VERSION_ENV_VAR = "SPANNER_OPTIMIZER_VERSION";
private static final String SPANNER_OPTIMIZER_STATISTICS_PACKAGE_ENV_VAR =
Expand All @@ -646,6 +658,7 @@ public String getOptimizerStatisticsPackage() {
/** Builder for {@link SpannerOptions} instances. */
public static class Builder
extends ServiceOptions.Builder<Spanner, SpannerOptions, SpannerOptions.Builder> {

static final int DEFAULT_PREFETCH_CHUNKS = 4;
static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.getDefaultInstance();
static final RetrySettings DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
Expand All @@ -670,7 +683,7 @@ public static class Builder
private GrpcInterceptorProvider interceptorProvider;

/** By default, we create 4 channels per {@link SpannerOptions} */
private int numChannels = 4;
private Integer numChannels;

private String transportChannelExecutorThreadNameFormat = "Cloud-Spanner-TransportChannel-%d";

Expand Down Expand Up @@ -1122,8 +1135,7 @@ public Builder setHost(String host) {

/** Enables gRPC-GCP extension with the default settings. */
public Builder enableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = true;
return this;
return this.enableGrpcGcpExtension(null);
}

/**
Expand All @@ -1133,6 +1145,10 @@ public Builder enableGrpcGcpExtension() {
public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) {
this.grpcGcpExtensionEnabled = true;
this.grpcGcpOptions = options;
// By default, set number of channels to 8 if grpc-gcp extension is enabled
if (this.numChannels == null) {
this.numChannels = GRPC_GCP_ENABLED_DEFAULT_NUM_CHANNELS;
}
return this;
}

Expand Down Expand Up @@ -1166,6 +1182,12 @@ public SpannerOptions build() {
// As we are using plain text, we should never send any credentials.
this.setCredentials(NoCredentials.getInstance());
}
// Set the number of channels if not set
if (this.numChannels == null) {
/** By default, we create 4 channels per {@link SpannerOptions} */
this.numChannels = DEFAULT_NUM_CHANNELS;
}

return new SpannerOptions(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,4 +918,37 @@ public void testCustomAsyncExecutorProvider() {
.build();
assertSame(service, options.getAsyncExecutorProvider().getExecutor());
}

@Test
public void testDefaultNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions.Builder builder =
SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension();

SpannerOptions options = builder.build();
assertEquals(options.getNumChannels(), 8);
}

@Test
public void testNumChannelsWithGrpcGcpExtensionEnabled() {
// Set number of channels before enabling grpc-gcp channel pool
int numChannels = 5;
SpannerOptions.Builder builder1 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setNumChannels(numChannels)
.enableGrpcGcpExtension();

SpannerOptions options1 = builder1.build();
assertEquals(options1.getNumChannels(), numChannels);

// Set number of channels after enabling grpc-gcp channel pool
SpannerOptions.Builder builder2 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.enableGrpcGcpExtension()
.setNumChannels(numChannels);

SpannerOptions options2 = builder2.build();
assertEquals(options2.getNumChannels(), numChannels);
}
}