spanner_prober/prober/interceptors.go (97 lines of code) (raw):
// Package prober defines a Cloud Spanner prober with interceptors.
package prober
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
const gfeT4T7prefix = "gfet4t7; dur="
const serverTimingKey = "server-timing"
var (
expDistribution = []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288}
methodTag = tag.MustNewKey("grpc_client_method")
rpcTypeTag = tag.MustNewKey("rpc_type")
t4t7Latency = stats.Int64(
"t4t7_latency",
"gRPC-GCP Spanner prober GFE latency",
stats.UnitMilliseconds,
)
t4t7LatencyView = &view.View{
Name: MetricPrefix + t4t7Latency.Name(),
Measure: t4t7Latency,
Aggregation: view.Distribution(expDistribution...),
TagKeys: []tag.Key{opNameTag, methodTag, rpcTypeTag},
}
)
func init() {
view.Register(t4t7LatencyView)
}
func recordLatency(ctx context.Context, method string, latency time.Duration) {
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Insert(methodTag, method)},
t4t7Latency.M(latency.Milliseconds()),
)
}
// parseT4T7Latency parse the headers and trailers for finding the gfet4t7 latency.
func parseT4T7Latency(headers, trailers metadata.MD) (time.Duration, error) {
var serverTiming []string
if len(headers[serverTimingKey]) > 0 {
serverTiming = headers[serverTimingKey]
} else if len(trailers[serverTimingKey]) > 0 {
serverTiming = trailers[serverTimingKey]
} else {
return 0, fmt.Errorf("server-timing headers not found")
}
for _, entry := range serverTiming {
if !strings.HasPrefix(entry, gfeT4T7prefix) {
continue
}
durationText := strings.TrimPrefix(entry, gfeT4T7prefix)
durationMillis, err := strconv.ParseInt(durationText, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse gfe latency: %v", err)
}
return time.Duration(durationMillis) * time.Millisecond, nil
}
return 0, fmt.Errorf("no gfe latency response available")
}
// AddGFELatencyUnaryInterceptor intercepts unary client requests (spanner.Commit, spanner.ExecuteSQL) and annotates GFE latency.
func AddGFELatencyUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var headers, trailers metadata.MD
opts = append(opts, grpc.Header(&headers))
opts = append(opts, grpc.Trailer(&trailers))
if err := invoker(ctx, method, req, reply, cc, opts...); err != nil {
return err
}
if gfeLatency, err := parseT4T7Latency(headers, trailers); err == nil {
if ctx, err := tag.New(ctx, tag.Insert(rpcTypeTag, "unary")); err == nil {
recordLatency(ctx, method, gfeLatency)
}
}
return nil
}
// AddGFELatencyStreamingInterceptor intercepts streaming requests StreamingSQL and annotates GFE latency.
func AddGFELatencyStreamingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return cs, err
}
go func() {
headers, err := cs.Header()
if err != nil {
return
}
trailers := cs.Trailer()
if gfeLatency, err := parseT4T7Latency(headers, trailers); err == nil {
if ctx, err := tag.New(ctx, tag.Insert(rpcTypeTag, "streaming")); err == nil {
recordLatency(ctx, method, gfeLatency)
}
}
}()
return cs, nil
}