runtime/grpc_client.go (101 lines of code) (raw):
// Copyright (c) 2023 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package zanzibar
import (
"context"
"strings"
"time"
"go.uber.org/yarpc/yarpcerrors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// GRPCClientOpts used to configure various client options.
type GRPCClientOpts struct {
ContextLogger ContextLogger
Metrics ContextMetrics
ContextExtractor ContextExtractor
RoutingKey string
RequestUUIDHeaderKey string
CircuitBreakerDisabled bool
Timeout time.Duration
ScopeTags map[string]map[string]string
}
// NewGRPCClientOpts creates a new instance of GRPCClientOpts.
func NewGRPCClientOpts(
contextLogger ContextLogger,
metrics ContextMetrics,
contextExtractor ContextExtractor,
methodNames map[string]string,
clientID, routingKey, requestUUIDHeaderKey string,
circuitBreakerDisabled bool,
timeoutInMS int,
) *GRPCClientOpts {
scopeTags := make(map[string]map[string]string)
for serviceMethod, methodName := range methodNames {
scopeTags[serviceMethod] = map[string]string{
scopeTagClient: clientID,
scopeTagClientMethod: methodName,
scopeTagsTargetEndpoint: serviceMethod,
}
}
return &GRPCClientOpts{
ContextLogger: contextLogger,
Metrics: metrics,
ContextExtractor: contextExtractor,
RoutingKey: routingKey,
RequestUUIDHeaderKey: requestUUIDHeaderKey,
CircuitBreakerDisabled: circuitBreakerDisabled,
Timeout: time.Duration(timeoutInMS) * time.Millisecond,
ScopeTags: scopeTags,
}
}
// GRPCClientCallHelper is used to track internal state of logging and metrics.
type GRPCClientCallHelper interface {
// Start method should be used just before calling the actual gRPC client method call.
Start()
// Finish method should be used right after the actual call to gRPC client method.
Finish(ctx context.Context, err error) context.Context
}
type callHelper struct {
startTime time.Time
finishTime time.Time
contextLogger ContextLogger
metrics ContextMetrics
extractor ContextExtractor
}
// NewGRPCClientCallHelper used to initialize a helper that will
// be used to track logging and metric for a gRPC Client call.
func NewGRPCClientCallHelper(ctx context.Context, serviceMethod string, opts *GRPCClientOpts) (context.Context, GRPCClientCallHelper) {
ctx = WithScopeTagsDefault(ctx, opts.ScopeTags[serviceMethod], opts.Metrics.Scope())
return ctx, &callHelper{
contextLogger: opts.ContextLogger,
metrics: opts.Metrics,
extractor: opts.ContextExtractor,
}
}
// Start method should be used just before calling the actual gRPC client method call.
// This method starts a timer used for metric.
func (c *callHelper) Start() {
c.startTime = time.Now()
}
// Finish method should be used right after the actual call to gRPC client method.
// This method emits latency and error metric as well as logging in case of error.
func (c *callHelper) Finish(ctx context.Context, err error) context.Context {
c.finishTime = time.Now()
delta := c.finishTime.Sub(c.startTime)
c.metrics.RecordTimer(ctx, clientLatency, delta)
c.metrics.RecordHistogramDuration(ctx, clientLatency, delta)
var fields []zapcore.Field
ctx = WithEndpointRequestHeadersField(ctx, map[string]string{})
if c.extractor != nil {
fields = append(fields, c.extractor.ExtractLogFields(ctx)...)
}
fields = append(fields, GetLogFieldsFromCtx(ctx)...)
if err != nil {
if yarpcerrors.IsStatus(err) {
yarpcErr := yarpcerrors.FromError(err)
errCode := strings.Builder{}
errCode.WriteString("client.errors.")
errCode.WriteString(yarpcErr.Code().String())
c.metrics.IncCounter(ctx, errCode.String(), 1)
fields = append(fields, zap.Int("code", int(yarpcErr.Code())))
fields = append(fields, zap.String("message", yarpcErr.Message()))
fields = append(fields, zap.String("name", yarpcErr.Name()))
} else {
fields = append(fields, zap.Error(err))
}
c.metrics.IncCounter(ctx, "client.errors", 1)
c.contextLogger.WarnZ(ctx, "Failed to send outgoing client gRPC request", fields...)
return ctx
}
c.contextLogger.DebugZ(ctx, "Finished an outgoing client gRPC request", fields...)
c.metrics.IncCounter(ctx, "client.success", 1)
return ctx
}