spanner_prober/prober/proberlib.go (598 lines of code) (raw):

// Package prober defines a Cloud Spanner prober. package prober import ( "bytes" "context" "crypto/sha256" "errors" "fmt" "math/rand" "os" "sync" "time" "cloud.google.com/go/spanner" database "cloud.google.com/go/spanner/admin/database/apiv1" instance "cloud.google.com/go/spanner/admin/instance/apiv1" "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp" log "github.com/golang/glog" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "google.golang.org/api/iterator" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/oauth" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" gpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" dbadminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" ) const ( aggregationChannelSize = 1000 // Long running operations retry parameters. baseLRORetryDelay = 200 * time.Millisecond maxLRORetryDelay = 5 * time.Second scope = "https://www.googleapis.com/auth/cloud-platform" ) var ( generatePayload = func(size int) ([]byte, []byte, error) { payload := make([]byte, size) rand.Read(payload) h := sha256.New() if _, err := h.Write(payload); err != nil { return nil, nil, err } return payload, h.Sum(nil), nil } MetricPrefix = "grpc_gcp_spanner_prober/" opNameTag = tag.MustNewKey("op_name") resultTag = tag.MustNewKey("result") withError = tag.Insert(resultTag, "error") withSuccess = tag.Insert(resultTag, "success") opLatency = stats.Int64( "op_latency", "gRPC-GCP Spanner prober operation latency", stats.UnitMilliseconds, ) opLatencyView = &view.View{ Name: MetricPrefix + opLatency.Name(), Measure: opLatency, Aggregation: view.Distribution(expDistribution...), TagKeys: []tag.Key{resultTag, opNameTag}, } opResults = stats.Int64( "op_count", "gRPC-GCP Spanner prober operation count", stats.UnitDimensionless, ) opResultsView = &view.View{ Name: MetricPrefix + opResults.Name(), Measure: opResults, Aggregation: view.Count(), TagKeys: []tag.Key{resultTag, opNameTag}, } ) func (o ProberOptions) grpcGcpConfig() *gpb.ApiConfig { return &gpb.ApiConfig{ ChannelPool: &gpb.ChannelPoolConfig{ // Creates a fixed-size gRPC-GCP channel pool. MinSize: uint32(o.ChannelPoolSize), MaxSize: uint32(o.ChannelPoolSize), // This option repeats(preserves) the strategy used by the Spanner // client to distribute BatchCreateSessions calls across channels. BindPickStrategy: gpb.ChannelPoolConfig_ROUND_ROBIN, // When issuing RPC call within Spanner session fallback to a ready // channel if the channel mapped to the session is not ready. FallbackToReady: true, // Establish a new connection for a channel where // no response/messages were received within last 1 second and // at least 3 RPC calls (started after the last response/message // received) timed out (deadline_exceeded). UnresponsiveDetectionMs: 1000, UnresponsiveCalls: 3, }, // Configuration for all Spanner RPCs that create, use or remove // Spanner sessions. gRPC-GCP channel pool uses this configuration // to provide session to channel affinity. If Spanner introduces any new // method that creates/uses/removes sessions, it must be added here. Method: []*gpb.MethodConfig{ { Name: []string{"/google.spanner.v1.Spanner/CreateSession"}, Affinity: &gpb.AffinityConfig{ Command: gpb.AffinityConfig_BIND, AffinityKey: "name", }, }, { Name: []string{"/google.spanner.v1.Spanner/BatchCreateSessions"}, Affinity: &gpb.AffinityConfig{ Command: gpb.AffinityConfig_BIND, AffinityKey: "session.name", }, }, { Name: []string{"/google.spanner.v1.Spanner/DeleteSession"}, Affinity: &gpb.AffinityConfig{ Command: gpb.AffinityConfig_UNBIND, AffinityKey: "name", }, }, { Name: []string{"/google.spanner.v1.Spanner/GetSession"}, Affinity: &gpb.AffinityConfig{ Command: gpb.AffinityConfig_BOUND, AffinityKey: "name", }, }, { Name: []string{ "/google.spanner.v1.Spanner/BeginTransaction", "/google.spanner.v1.Spanner/Commit", "/google.spanner.v1.Spanner/ExecuteBatchDml", "/google.spanner.v1.Spanner/ExecuteSql", "/google.spanner.v1.Spanner/ExecuteStreamingSql", "/google.spanner.v1.Spanner/PartitionQuery", "/google.spanner.v1.Spanner/PartitionRead", "/google.spanner.v1.Spanner/Read", "/google.spanner.v1.Spanner/Rollback", "/google.spanner.v1.Spanner/StreamingRead", }, Affinity: &gpb.AffinityConfig{ Command: gpb.AffinityConfig_BOUND, AffinityKey: "session", }, }, }, } } // ConnPool wrapper for gRPC-GCP channel pool. gtransport.ConnPool is the // interface Spanner client accepts as a replacement channel pool. type grpcGcpConnPool struct { gtransport.ConnPool cc *grpc.ClientConn size int } func (cp *grpcGcpConnPool) Conn() *grpc.ClientConn { return cp.cc } // Spanner client uses this function to get channel pool size. func (cp *grpcGcpConnPool) Num() int { return cp.size } func (cp *grpcGcpConnPool) Close() error { return cp.cc.Close() } // Options holds the settings required for creating a prober. type ProberOptions struct { // Cloud Spanner settings in name form, not URI (e.g. an_instance, not projects/a_project/instances/an_instance). Project string Instance string Database string InstanceConfig string // QPS rate to probe at. QPS float64 // NumRows is the number of rows in which which the prober randomly chooses to probe. NumRows int // Prober is type of probe which will be run. Prober Probe // MaxStaleness is the bound of stale reads. MaxStaleness time.Duration // PayloadSize is the number of bytes of random data used as a Payload. PayloadSize int // ProbeDeadline is the deadline for request for probes. ProbeDeadline time.Duration // Cloud Spanner Endpoint to send request. Endpoint string // Node count for Spanner instance. If specified, ProcessingUnits must be 0. NodeCount int // Processing units for the Spanner instance. If specified, NodeCount must be 0. ProcessingUnits int // Use gRPC-GCP library. UseGrpcGcp bool // Number of channels. ChannelPoolSize int } // Prober holds the internal prober state. type Prober struct { instanceAdminClient *instance.InstanceAdminClient spannerClient *spanner.Client deadline time.Duration mu *sync.Mutex opsChannel chan op numRows int qps float64 prober Probe maxStaleness time.Duration payloadSize int generatePayload func(int) ([]byte, []byte, error) opt ProberOptions // add clientOptions here as it contains credentials. clientOpts []option.ClientOption } type op struct { Latency time.Duration Error error } // Probe is the interface for a single type of prober. type Probe interface { name() string probe(context.Context, *Prober) error } // instanceURI returns an instance URI of the form: projects/{project}/instances/{instance name}. // E.g. projects/test-project/instances/test-instance. func (opt *ProberOptions) instanceURI() string { return fmt.Sprintf("projects/%s/instances/%s", opt.Project, opt.Instance) } // instanceName returns an instance name of the form: {instance name}. // E.g. test-instance. func (opt *ProberOptions) instanceName() string { return opt.Instance } // instanceConfigURI returns an instance config URI of the form: projects/{project}/instanceConfigs/{instance config name}. // E.g. projects/test-project/instanceConfigss/regional-test. func (opt *ProberOptions) instanceConfigURI() string { return fmt.Sprintf("projects/%s/instanceConfigs/%s", opt.Project, opt.InstanceConfig) } // projectURI returns a project URI of the form: projects/{project}. // E.g. projects/test-project. func (opt *ProberOptions) projectURI() string { return fmt.Sprintf("projects/%s", opt.Project) } // databaseURI returns a database URI of the form: projects/{project}/instances/{instance name}/databases/{database name}. // E.g. projects/test-project/instances/test-instance/databases/test-database. func (opt *ProberOptions) databaseURI() string { return fmt.Sprintf("projects/%s/instances/%s/databases/%s", opt.Project, opt.Instance, opt.Database) } // databaseName returns a database name of the form: {database name}. // E.g. test-database. func (opt *ProberOptions) databaseName() string { return opt.Database } func init() { view.Register(opLatencyView, opResultsView) } // New initializes Cloud Spanner clients, setup up the database, and return a new CSProber. func NewProber(ctx context.Context, opt ProberOptions, clientOpts ...option.ClientOption) (*Prober, error) { ctx, err := tag.New(ctx, tag.Insert(opNameTag, opt.Prober.name())) if err != nil { return nil, err } // Override Cloud Spanner endpoint if specified. if opt.Endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(opt.Endpoint)) } p, err := newSpannerProber(ctx, opt, clientOpts...) if err != nil { return nil, err } go p.backgroundStatsAggregator(ctx) return p, nil } func newSpannerProber(ctx context.Context, opt ProberOptions, clientOpts ...option.ClientOption) (*Prober, error) { if opt.NumRows <= 0 { return nil, fmt.Errorf("NumRows must be at least 1, got %v", opt.NumRows) } if opt.NodeCount > 0 && opt.ProcessingUnits > 0 { return nil, fmt.Errorf("At most one of NodeCount or ProcessingUnits may be specified. NodeCount: %v, ProcessingUnits: %v", opt.NodeCount, opt.ProcessingUnits) } if opt.Prober == nil { return nil, errors.New("Prober must not be nil") } if opt.ChannelPoolSize < 1 { return nil, errors.New("Number of channels must be >= 1") } instanceClient, err := instance.NewInstanceAdminClient(ctx, clientOpts...) if err != nil { return nil, err } if err := createCloudSpannerInstanceIfMissing(ctx, instanceClient, opt); err != nil { return nil, err } databaseClient, err := database.NewDatabaseAdminClient(ctx, clientOpts...) if err != nil { return nil, err } defer databaseClient.Close() if err := createCloudSpannerDatabase(ctx, databaseClient, opt); err != nil { return nil, err } if !opt.UseGrpcGcp { log.Info("Using default channel pool.") clientOpts = append( clientOpts, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(AddGFELatencyUnaryInterceptor)), option.WithGRPCDialOption(grpc.WithStreamInterceptor(AddGFELatencyStreamingInterceptor)), ) } else { log.Info("Using gRPC-GCP channel pool.") var perRPC credentials.PerRPCCredentials var err error keyFile := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") if keyFile == "" { perRPC, err = oauth.NewApplicationDefault(context.Background(), scope) } else { perRPC, err = oauth.NewServiceAccountFromFile(keyFile, scope) } if err != nil { return nil, err } // Converting gRPC-GCP config to JSON because grpc.Dial only accepts JSON // config for configuring load balancers. grpcGcpJsonConfig, err := protojson.Marshal(opt.grpcGcpConfig()) if err != nil { return nil, err } conn, err := grpc.Dial( // Spanner endpoint. Replace this with your custom endpoint if not using // the default endpoint. opt.Endpoint, // Application default or service account credentials set up above. grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), grpc.WithPerRPCCredentials(perRPC), // Do not look up load balancer (gRPC-GCP) config via DNS. grpc.WithDisableServiceConfig(), // Instead use this static config. grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, grpcgcp.Name, string(grpcGcpJsonConfig))), // gRPC-GCP interceptors required for proper operation of gRPC-GCP // channel pool. Add your interceptors as next arguments if needed. grpc.WithChainUnaryInterceptor(grpcgcp.GCPUnaryClientInterceptor, AddGFELatencyUnaryInterceptor), grpc.WithChainStreamInterceptor(grpcgcp.GCPStreamClientInterceptor, AddGFELatencyStreamingInterceptor), grpc.WithStatsHandler(new(ocgrpc.ClientHandler)), ) if err != nil { return nil, err } pool := &grpcGcpConnPool{ cc: conn, // Set the pool size on ConnPool to communicate it to Spanner client. size: opt.ChannelPoolSize, } clientOpts = append( clientOpts, gtransport.WithConnPool(pool), ) } dataClient, err := spanner.NewClientWithConfig( ctx, opt.databaseURI(), spanner.ClientConfig{ NumChannels: opt.ChannelPoolSize, }, clientOpts..., ) if err != nil { return nil, err } p := &Prober{ instanceAdminClient: instanceClient, spannerClient: dataClient, deadline: time.Duration(opt.ProbeDeadline), opsChannel: make(chan op, aggregationChannelSize), numRows: opt.NumRows, qps: opt.QPS, prober: opt.Prober, maxStaleness: opt.MaxStaleness, payloadSize: opt.PayloadSize, generatePayload: generatePayload, mu: &sync.Mutex{}, opt: opt, clientOpts: clientOpts, } return p, nil } func backoff(baseDelay, maxDelay time.Duration, retries int) time.Duration { backoff, max := float64(baseDelay), float64(maxDelay) for backoff < max && retries > 0 { backoff = backoff * 1.5 retries-- } if backoff > max { backoff = max } return time.Duration(backoff) } // createCloudSpannerInstanceIfMissing creates a one node "Instance" of Cloud Spanner in the specificed project if missing. // Instances are shared across multiple prober tasks running in differenct GCE VMs. // Instances do not get cleaned up in this prober, as we do not support turning down Cloud Spanner regions. func createCloudSpannerInstanceIfMissing(ctx context.Context, instanceClient *instance.InstanceAdminClient, opt ProberOptions) error { // Skip instance creation requests if the instance already exists. if checkInstancePresence(ctx, instanceClient, opt) { return nil } op, err := instanceClient.CreateInstance(ctx, &instancepb.CreateInstanceRequest{ Parent: opt.projectURI(), InstanceId: opt.instanceName(), Instance: &instancepb.Instance{ Name: opt.instanceURI(), Config: opt.instanceConfigURI(), DisplayName: opt.instanceName(), NodeCount: int32(opt.NodeCount), ProcessingUnits: int32(opt.ProcessingUnits), }, }) // If instance create operations fails, check if the instance was already created. If no, return error. if err != nil { if status.Code(err) != codes.AlreadyExists { return err } } else if _, err := op.Wait(ctx); err != nil { return err } // Wait for instance to be ready. var retries int for { var resp *instancepb.Instance resp, err = instanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{ Name: opt.instanceURI(), }) if err != nil || resp.State == instancepb.Instance_READY { return err } select { case <-time.After(backoff(baseLRORetryDelay, maxLRORetryDelay, retries)): case <-ctx.Done(): return ctx.Err() } retries++ } } // checkInstancePresence checks whether the instance is already created. func checkInstancePresence(ctx context.Context, instanceClient *instance.InstanceAdminClient, opt ProberOptions) bool { resp, err := instanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{ Name: opt.instanceURI(), }) // If instance is not present or instance is not ready. if err != nil || resp.State != instancepb.Instance_READY { return false } return true } // createCloudSpannerDatabase creates a prober "Database" on an "Instance" of Cloud Spanner in the specificed project. // Databases are shared across multiple prober tasks running in differenct GCE VMs. // Databases do not get cleaned up in this prober, as we do not support turning down Cloud Spanner regions. func createCloudSpannerDatabase(ctx context.Context, databaseClient *database.DatabaseAdminClient, opt ProberOptions) error { op, err := databaseClient.CreateDatabase(ctx, &dbadminpb.CreateDatabaseRequest{ Parent: opt.instanceURI(), CreateStatement: fmt.Sprintf("CREATE DATABASE `%v`", opt.databaseName()), ExtraStatements: []string{ `CREATE TABLE ProbeTarget ( Id INT64 NOT NULL, Payload BYTES(MAX), PayloadHash BYTES(MAX), ) PRIMARY KEY (Id)`, }, }) if err != nil { if code := status.Code(err); code == codes.AlreadyExists { return nil } return err } _, err = op.Wait(ctx) return err } // backgroundStatsAggregator pulls stats from the prober channel. // This avoids the case in which probes are blocked due to the channel being full. func (p *Prober) backgroundStatsAggregator(ctx context.Context) error { for op := range p.opsChannel { mutator := withSuccess if op.Error != nil { mutator = withError } stats.RecordWithTags(ctx, []tag.Mutator{mutator}, opResults.M(1)) stats.RecordWithTags(ctx, []tag.Mutator{mutator}, opLatency.M(op.Latency.Milliseconds())) } return nil } func (p *Prober) probeInterval() time.Duration { // qps must be > 0 as we validate this when constructing a CSProber. // qps is converted to a duration for type reasons, however it does not represent a value duration. // Use granularity of nanosecond to support float division. Useful for supporting probes with QPS < 1. return time.Duration(float64(time.Second) / p.qps) } // Start starts the prober. This will run a goroutinue until ctx is canceled. func (p *Prober) Start(ctx context.Context) { go func() { ticker := time.NewTicker(p.probeInterval()) for { select { case <-ctx.Done(): // Stop probing when the context is canceled. log.Info("Probing stopped as context is done.") return case <-ticker.C: go func() { probeCtx, cancel := context.WithTimeout(ctx, p.deadline) defer cancel() p.runProbe(probeCtx) }() } } }() } func (p *Prober) runProbe(ctx context.Context) { startTime := time.Now() probeErr := p.prober.probe(ctx, p) latency := time.Now().Sub(startTime) p.opsChannel <- op{ Latency: latency, Error: probeErr, } } // validateRows checks that the read did not return an error, and if the row is // present it validates the hash. If the row is not present it completes successfully. // Returns the number of validated rows, and the first error encounted. func validateRows(iter *spanner.RowIterator) (int, error) { rows := 0 for { row, err := iter.Next() if err == iterator.Done { return rows, nil } if err != nil { return rows, err } var id int64 var payload, payloadHash []byte if err := row.Columns(&id, &payload, &payloadHash); err != nil { return rows, err } h := sha256.New() _, err = h.Write(payload) if err != nil { return rows, err } if calculatedHash := h.Sum(nil); !bytes.Equal(calculatedHash, payloadHash) { return rows, fmt.Errorf("hash for row %v did not match, got %v, want %v", id, calculatedHash, payloadHash) } rows++ } } func ParseProbeType(t string) (Probe, error) { switch t { case "noop": return NoopProbe{}, nil case "stale_read": return StaleReadProbe{}, nil case "strong_query": return StrongQueryProbe{}, nil case "stale_query": return StaleQueryProbe{}, nil case "dml": return DMLProbe{}, nil case "read_write": return ReadWriteProbe{}, nil default: return NoopProbe{}, fmt.Errorf("probe_type %q is not a valid probe type", t) } } // NoopProbe is a fake prober which always succeeds. type NoopProbe struct{} func (NoopProbe) name() string { return "noop" } func (NoopProbe) probe(ctx context.Context, p *Prober) error { return nil } // StaleReadProbe performs a stale read of the database. type StaleReadProbe struct{} func (StaleReadProbe) name() string { return "stale_read" } func (StaleReadProbe) probe(ctx context.Context, p *Prober) error { // Random row within the range. k := rand.Intn(p.numRows) txn := p.spannerClient.Single().WithTimestampBound(spanner.MaxStaleness(p.maxStaleness)) iter := txn.Read(ctx, "ProbeTarget", spanner.Key{k}, []string{"Id", "Payload", "PayloadHash"}) defer iter.Stop() _, err := validateRows(iter) return err } // StrongQueryProbe performs a strong query of the database. type StrongQueryProbe struct{} func (StrongQueryProbe) name() string { return "strong_query" } func (StrongQueryProbe) probe(ctx context.Context, p *Prober) error { // Random row within the range. k := rand.Intn(p.numRows) stmt := spanner.Statement{ SQL: `select t.Id, t.Payload, t.PayloadHash from ProbeTarget t where t.Id = @Id`, Params: map[string]interface{}{ "Id": k, }, } iter := p.spannerClient.Single().Query(ctx, stmt) defer iter.Stop() _, err := validateRows(iter) return err } // StaleQueryProbe performs a stale query of the database. type StaleQueryProbe struct{} func (StaleQueryProbe) name() string { return "stale_query" } func (StaleQueryProbe) probe(ctx context.Context, p *Prober) error { // Random row within the range. k := rand.Intn(p.numRows) stmt := spanner.Statement{ SQL: `select t.Id, t.Payload, t.PayloadHash from ProbeTarget t where t.Id = @Id`, Params: map[string]interface{}{ "Id": k, }, } iter := p.spannerClient.Single().WithTimestampBound(spanner.MaxStaleness(p.maxStaleness)).Query(ctx, stmt) defer iter.Stop() _, err := validateRows(iter) return err } // DMLProbe performs a SQL based transaction in the database. type DMLProbe struct{} func (DMLProbe) name() string { return "dml" } func (DMLProbe) probe(ctx context.Context, p *Prober) error { // Random row within the range. k := rand.Intn(p.numRows) payload, payloadHash, err := p.generatePayload(p.payloadSize) if err != nil { return err } _, err = p.spannerClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { readStmt := spanner.Statement{ SQL: `select t.Id, t.Payload, t.PayloadHash from ProbeTarget t where t.Id = @Id`, Params: map[string]interface{}{ "Id": k, }, } iter := txn.Query(ctx, readStmt) defer iter.Stop() rows, err := validateRows(iter) if err != nil { return err } // Update the row with a new random value if the row already exisis, otherwise insert a new row. dmlSQL := `update ProbeTarget t set t.Payload = @payload, t.PayloadHash = @payloadHash where t.Id = @Id` if rows == 0 { dmlSQL = `insert ProbeTarget (Id, Payload, PayloadHash) VALUES(@Id, @payload, @payloadHash)` } dmlStmt := spanner.Statement{ SQL: dmlSQL, Params: map[string]interface{}{ "Id": k, "payload": payload, "payloadHash": payloadHash, }, } _, err = txn.Update(ctx, dmlStmt) return err }) return err } // ReadWriteProbe performs a mutation based transaction in the database. type ReadWriteProbe struct{} func (ReadWriteProbe) name() string { return "read_write" } func (ReadWriteProbe) probe(ctx context.Context, p *Prober) error { // Random row within the range. k := rand.Intn(p.numRows) payload, payloadHash, err := p.generatePayload(p.payloadSize) if err != nil { return err } _, err = p.spannerClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { iter := txn.Read(ctx, "ProbeTarget", spanner.Key{k}, []string{"Id", "Payload", "PayloadHash"}) defer iter.Stop() _, err := validateRows(iter) if err != nil { return err } return txn.BufferWrite([]*spanner.Mutation{ spanner.InsertOrUpdate("ProbeTarget", []string{"Id", "Payload", "PayloadHash"}, []interface{}{k, payload, payloadHash}), }) }) return err }