in spanner_prober/prober/interceptors.go [97:117]
func AddGFELatencyStreamingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return cs, err
}
go func() {
headers, err := cs.Header()
if err != nil {
return
}
trailers := cs.Trailer()
if gfeLatency, err := parseT4T7Latency(headers, trailers); err == nil {
if ctx, err := tag.New(ctx, tag.Insert(rpcTypeTag, "streaming")); err == nil {
recordLatency(ctx, method, gfeLatency)
}
}
}()
return cs, nil
}