func newSpannerProber()

in spanner_prober/prober/proberlib.go [319:439]


func newSpannerProber(ctx context.Context, opt ProberOptions, clientOpts ...option.ClientOption) (*Prober, error) {
	if opt.NumRows <= 0 {
		return nil, fmt.Errorf("NumRows must be at least 1, got %v", opt.NumRows)
	}

	if opt.NodeCount > 0 && opt.ProcessingUnits > 0 {
		return nil, fmt.Errorf("At most one of NodeCount or ProcessingUnits may be specified. NodeCount: %v, ProcessingUnits: %v", opt.NodeCount, opt.ProcessingUnits)
	}

	if opt.Prober == nil {
		return nil, errors.New("Prober must not be nil")
	}

	if opt.ChannelPoolSize < 1 {
		return nil, errors.New("Number of channels must be >= 1")
	}

	instanceClient, err := instance.NewInstanceAdminClient(ctx, clientOpts...)
	if err != nil {
		return nil, err
	}

	if err := createCloudSpannerInstanceIfMissing(ctx, instanceClient, opt); err != nil {
		return nil, err
	}

	databaseClient, err := database.NewDatabaseAdminClient(ctx, clientOpts...)
	if err != nil {
		return nil, err
	}
	defer databaseClient.Close()
	if err := createCloudSpannerDatabase(ctx, databaseClient, opt); err != nil {
		return nil, err
	}

	if !opt.UseGrpcGcp {
		log.Info("Using default channel pool.")
		clientOpts = append(
			clientOpts,
			option.WithGRPCDialOption(grpc.WithUnaryInterceptor(AddGFELatencyUnaryInterceptor)),
			option.WithGRPCDialOption(grpc.WithStreamInterceptor(AddGFELatencyStreamingInterceptor)),
		)
	} else {
		log.Info("Using gRPC-GCP channel pool.")
		var perRPC credentials.PerRPCCredentials
		var err error
		keyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
		if keyFile == "" {
			perRPC, err = oauth.NewApplicationDefault(context.Background(), scope)
		} else {
			perRPC, err = oauth.NewServiceAccountFromFile(keyFile, scope)
		}
		if err != nil {
			return nil, err
		}
		// Converting gRPC-GCP config to JSON because grpc.Dial only accepts JSON
		// config for configuring load balancers.
		grpcGcpJsonConfig, err := protojson.Marshal(opt.grpcGcpConfig())
		if err != nil {
			return nil, err
		}
		conn, err := grpc.Dial(
			// Spanner endpoint. Replace this with your custom endpoint if not using
			// the default endpoint.
			opt.Endpoint,
			// Application default or service account credentials set up above.
			grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
			grpc.WithPerRPCCredentials(perRPC),
			// Do not look up load balancer (gRPC-GCP) config via DNS.
			grpc.WithDisableServiceConfig(),
			// Instead use this static config.
			grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, grpcgcp.Name, string(grpcGcpJsonConfig))),
			// gRPC-GCP interceptors required for proper operation of gRPC-GCP
			// channel pool. Add your interceptors as next arguments if needed.
			grpc.WithChainUnaryInterceptor(grpcgcp.GCPUnaryClientInterceptor, AddGFELatencyUnaryInterceptor),
			grpc.WithChainStreamInterceptor(grpcgcp.GCPStreamClientInterceptor, AddGFELatencyStreamingInterceptor),
			grpc.WithStatsHandler(new(ocgrpc.ClientHandler)),
		)
		if err != nil {
			return nil, err
		}
		pool := &grpcGcpConnPool{
			cc: conn,
			// Set the pool size on ConnPool to communicate it to Spanner client.
			size: opt.ChannelPoolSize,
		}
		clientOpts = append(
			clientOpts,
			gtransport.WithConnPool(pool),
		)
	}
	dataClient, err := spanner.NewClientWithConfig(
		ctx,
		opt.databaseURI(),
		spanner.ClientConfig{
			NumChannels: opt.ChannelPoolSize,
		},
		clientOpts...,
	)
	if err != nil {
		return nil, err
	}

	p := &Prober{
		instanceAdminClient: instanceClient,
		spannerClient:       dataClient,
		deadline:            time.Duration(opt.ProbeDeadline),
		opsChannel:          make(chan op, aggregationChannelSize),
		numRows:             opt.NumRows,
		qps:                 opt.QPS,
		prober:              opt.Prober,
		maxStaleness:        opt.MaxStaleness,
		payloadSize:         opt.PayloadSize,
		generatePayload:     generatePayload,
		mu:                  &sync.Mutex{},
		opt:                 opt,
		clientOpts:          clientOpts,
	}

	return p, nil
}