in spanner_prober/prober/proberlib.go [319:439]
func newSpannerProber(ctx context.Context, opt ProberOptions, clientOpts ...option.ClientOption) (*Prober, error) {
if opt.NumRows <= 0 {
return nil, fmt.Errorf("NumRows must be at least 1, got %v", opt.NumRows)
}
if opt.NodeCount > 0 && opt.ProcessingUnits > 0 {
return nil, fmt.Errorf("At most one of NodeCount or ProcessingUnits may be specified. NodeCount: %v, ProcessingUnits: %v", opt.NodeCount, opt.ProcessingUnits)
}
if opt.Prober == nil {
return nil, errors.New("Prober must not be nil")
}
if opt.ChannelPoolSize < 1 {
return nil, errors.New("Number of channels must be >= 1")
}
instanceClient, err := instance.NewInstanceAdminClient(ctx, clientOpts...)
if err != nil {
return nil, err
}
if err := createCloudSpannerInstanceIfMissing(ctx, instanceClient, opt); err != nil {
return nil, err
}
databaseClient, err := database.NewDatabaseAdminClient(ctx, clientOpts...)
if err != nil {
return nil, err
}
defer databaseClient.Close()
if err := createCloudSpannerDatabase(ctx, databaseClient, opt); err != nil {
return nil, err
}
if !opt.UseGrpcGcp {
log.Info("Using default channel pool.")
clientOpts = append(
clientOpts,
option.WithGRPCDialOption(grpc.WithUnaryInterceptor(AddGFELatencyUnaryInterceptor)),
option.WithGRPCDialOption(grpc.WithStreamInterceptor(AddGFELatencyStreamingInterceptor)),
)
} else {
log.Info("Using gRPC-GCP channel pool.")
var perRPC credentials.PerRPCCredentials
var err error
keyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if keyFile == "" {
perRPC, err = oauth.NewApplicationDefault(context.Background(), scope)
} else {
perRPC, err = oauth.NewServiceAccountFromFile(keyFile, scope)
}
if err != nil {
return nil, err
}
// Converting gRPC-GCP config to JSON because grpc.Dial only accepts JSON
// config for configuring load balancers.
grpcGcpJsonConfig, err := protojson.Marshal(opt.grpcGcpConfig())
if err != nil {
return nil, err
}
conn, err := grpc.Dial(
// Spanner endpoint. Replace this with your custom endpoint if not using
// the default endpoint.
opt.Endpoint,
// Application default or service account credentials set up above.
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(perRPC),
// Do not look up load balancer (gRPC-GCP) config via DNS.
grpc.WithDisableServiceConfig(),
// Instead use this static config.
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, grpcgcp.Name, string(grpcGcpJsonConfig))),
// gRPC-GCP interceptors required for proper operation of gRPC-GCP
// channel pool. Add your interceptors as next arguments if needed.
grpc.WithChainUnaryInterceptor(grpcgcp.GCPUnaryClientInterceptor, AddGFELatencyUnaryInterceptor),
grpc.WithChainStreamInterceptor(grpcgcp.GCPStreamClientInterceptor, AddGFELatencyStreamingInterceptor),
grpc.WithStatsHandler(new(ocgrpc.ClientHandler)),
)
if err != nil {
return nil, err
}
pool := &grpcGcpConnPool{
cc: conn,
// Set the pool size on ConnPool to communicate it to Spanner client.
size: opt.ChannelPoolSize,
}
clientOpts = append(
clientOpts,
gtransport.WithConnPool(pool),
)
}
dataClient, err := spanner.NewClientWithConfig(
ctx,
opt.databaseURI(),
spanner.ClientConfig{
NumChannels: opt.ChannelPoolSize,
},
clientOpts...,
)
if err != nil {
return nil, err
}
p := &Prober{
instanceAdminClient: instanceClient,
spannerClient: dataClient,
deadline: time.Duration(opt.ProbeDeadline),
opsChannel: make(chan op, aggregationChannelSize),
numRows: opt.NumRows,
qps: opt.QPS,
prober: opt.Prober,
maxStaleness: opt.MaxStaleness,
payloadSize: opt.PayloadSize,
generatePayload: generatePayload,
mu: &sync.Mutex{},
opt: opt,
clientOpts: clientOpts,
}
return p, nil
}