spanner_prober/main.go (186 lines of code) (raw):
package main
import (
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
proberlib "spanner_prober/prober"
"cloud.google.com/go/spanner"
"contrib.go.opencensus.io/exporter/stackdriver"
"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
log "github.com/golang/glog"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/resource"
"go.opencensus.io/stats/view"
"google.golang.org/grpc/grpclog"
)
var (
enableCloudOps = flag.Bool("enable_cloud_ops", true, "Export metrics to Cloud Operations (former Stackdriver).")
project = flag.String("project", "", "GCP project for Cloud Spanner.")
opsProject = flag.String("ops_project", "", "Cloud Operations project if differs from Spanner project.")
instance_name = flag.String("instance", "test1", "Target instance.")
database_name = flag.String("database", "test1", "Target database.")
instanceConfig = flag.String("instance_config", "regional-us-central1", "Target instance config.")
nodeCount = flag.Int("node_count", 1, "Node count for the prober. If specified, processing_units must be 0.")
processingUnits = flag.Int("processing_units", 0, "Processing units for the prober. If specified, node_count must be 0.")
qps = flag.Float64("qps", 1, "QPS to probe per prober [1, 1000].")
numRows = flag.Int("num_rows", 1000, "Number of rows in database to be probed.")
probeType = flag.String("probe_type", "noop", "The probe type this prober will run.")
maxStaleness = flag.Duration("max_staleness", 15*time.Second, "Maximum staleness for stale queries.")
payloadSize = flag.Int("payload_size", 1024, "Size of payload to write to the probe database.")
probeDeadline = flag.Duration("probe_deadline", 10*time.Second, "Deadline for probe request.")
useGrpcGcp = flag.Bool("grpc_gcp", false, "Use gRPC-GCP library.")
channelPoolSize = flag.Int("channels", 2, "Number of channels.")
endpoint = flag.String("endpoint", "spanner.googleapis.com:443", "Cloud Spanner Endpoint to send request to.")
)
func main() {
flag.Parse()
ctx := context.Background()
errs := validateFlags()
if len(errs) > 0 {
log.Errorf("Flag validation failed with %v errors", len(errs))
for _, err := range errs {
log.Errorf("%v", err)
}
log.Exit("Flag validation failed... exiting.")
}
fmt.Printf("Prober started with options:\nenable_cloud_ops: %v\nproject: %q\n"+
"ops_project: %q\ninstance: %q\ndatabase: %q\ninstance_config: %q\n"+
"node_count: %d\nprocessing_units: %d\nqps: %0.3f\nnum_rows: %d\n"+
"probe_type: %q\nmax_staleness: %v\npayload_size: %d\nprobe_deadline: %v\n"+
"grpc_gcp: %v\nchannels: %v\nendpoint: %q\n", *enableCloudOps, *project, *opsProject, *instance_name,
*database_name, *instanceConfig, *nodeCount, *processingUnits, *qps, *numRows,
*probeType, *maxStaleness, *payloadSize, *probeDeadline, *useGrpcGcp, *channelPoolSize,
*endpoint)
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, /* Discard logs at INFO level */
os.Stderr, os.Stderr))
if *enableCloudOps {
// Set up the stackdriver exporter for sending metrics.
// Register gRPC views.
if err := view.Register(ocgrpc.DefaultClientViews...); err != nil {
log.Fatalf("Failed to register ocgrpc client views: %v", err)
}
// Enable all default views for Cloud Spanner.
if err := spanner.EnableStatViews(); err != nil {
log.Errorf("Failed to export stats view: %v", err)
}
getPrefix := func(name string) string {
if strings.HasPrefix(name, proberlib.MetricPrefix) {
return ""
}
return proberlib.MetricPrefix
}
exporterOptions := stackdriver.Options{
ProjectID: *project,
BundleDelayThreshold: 60 * time.Second,
BundleCountThreshold: 3000,
GetMetricPrefix: getPrefix,
}
if *opsProject != "" {
exporterOptions.ProjectID = *opsProject
}
if os.Getenv(resource.EnvVarType) == "" {
exporterOptions.MonitoredResource = &MonitoredResource{delegate: monitoredresource.Autodetect()}
}
sd, err := stackdriver.NewExporter(exporterOptions)
if err != nil {
log.Fatalf("Failed to create the StackDriver exporter: %v", err)
}
defer sd.Flush()
sd.StartMetricsExporter()
defer sd.StopMetricsExporter()
}
prober, err := proberlib.ParseProbeType(*probeType)
if err != nil {
log.Exitf("Could not create prober due to %v.", err)
}
opts := proberlib.ProberOptions{
Project: *project,
Instance: *instance_name,
Database: *database_name,
InstanceConfig: *instanceConfig,
QPS: *qps,
NumRows: *numRows,
Prober: prober,
MaxStaleness: *maxStaleness,
PayloadSize: *payloadSize,
ProbeDeadline: *probeDeadline,
Endpoint: *endpoint,
NodeCount: *nodeCount,
ProcessingUnits: *processingUnits,
UseGrpcGcp: *useGrpcGcp,
ChannelPoolSize: *channelPoolSize,
}
p, err := proberlib.NewProber(ctx, opts)
if err != nil {
log.Exitf("Failed to initialize the cloud prober, %v", err)
}
p.Start(ctx)
cancelChan := make(chan os.Signal, 1)
signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT)
select {
case <-cancelChan:
}
}
type MonitoredResource struct {
monitoredresource.Interface
delegate monitoredresource.Interface
}
func (mr *MonitoredResource) MonitoredResource() (resType string, labels map[string]string) {
dType, dLabels := mr.delegate.MonitoredResource()
resType = dType
labels = make(map[string]string)
for k, v := range dLabels {
if k == "project_id" {
// Overwrite project id to satisfy Cloud Monitoring rule.
labels[k] = *project
if *opsProject != "" {
labels[k] = *opsProject
}
continue
}
labels[k] = v
}
return
}
func validateFlags() []error {
var errs []error
projectRegex, err := regexp.Compile(`^[-_:.a-zA-Z0-9]*$`)
if err != nil {
return []error{err}
}
instanceDBRegex, err := regexp.Compile(`^[-_.a-zA-Z0-9]*$`)
if err != nil {
return []error{err}
}
// We limit qps to < 1000 to ensure we don't overload Spanner accidentally.
if *qps <= 0 || *qps > 1000 {
errs = append(errs, fmt.Errorf("qps must be 1 <= qps <= 1000, was %v", *qps))
}
if *numRows <= 0 {
errs = append(errs, fmt.Errorf("num_rows must be > 0, was %v", *numRows))
}
if *payloadSize <= 0 {
errs = append(errs, fmt.Errorf("payload_size must be > 0, was %v", *payloadSize))
}
if matched := projectRegex.MatchString(*project); !matched {
errs = append(errs, fmt.Errorf("project did not match %v, was %v", projectRegex, *project))
}
if matched := projectRegex.MatchString(*opsProject); !matched {
errs = append(errs, fmt.Errorf("ops_project did not match %v, was %v", projectRegex, *opsProject))
}
if matched := instanceDBRegex.MatchString(*instance_name); !matched {
errs = append(errs, fmt.Errorf("instance did not match %v, was %v", instanceDBRegex, *instance_name))
}
if matched := instanceDBRegex.MatchString(*database_name); !matched {
errs = append(errs, fmt.Errorf("database did not match %v, was %v", instanceDBRegex, *database_name))
}
if matched := instanceDBRegex.MatchString(*instanceConfig); !matched {
errs = append(errs, fmt.Errorf("instance_config did not match %v, was %v", instanceDBRegex, *instanceConfig))
}
if _, err := proberlib.ParseProbeType(*probeType); err != nil {
errs = append(errs, err)
}
return errs
}