correlation/grpc/client_interceptors.go (35 lines of code) (raw):
package grpccorrelation
import (
"context"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func appendToOutgoingContext(ctx context.Context, clientName string) context.Context {
correlationID := correlation.ExtractFromContext(ctx)
if correlationID != "" {
ctx = metadata.AppendToOutgoingContext(ctx, metadataCorrelatorKey, correlationID)
}
if clientName != "" {
ctx = metadata.AppendToOutgoingContext(ctx, metadataClientNameKey, clientName)
}
return ctx
}
// UnaryClientCorrelationInterceptor propagates Correlation-IDs downstream.
func UnaryClientCorrelationInterceptor(opts ...ClientCorrelationInterceptorOption) grpc.UnaryClientInterceptor {
config := applyClientCorrelationInterceptorOptions(opts)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = appendToOutgoingContext(ctx, config.clientName)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
// StreamClientCorrelationInterceptor propagates Correlation-IDs downstream.
func StreamClientCorrelationInterceptor(opts ...ClientCorrelationInterceptorOption) grpc.StreamClientInterceptor {
config := applyClientCorrelationInterceptorOptions(opts)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = appendToOutgoingContext(ctx, config.clientName)
return streamer(ctx, desc, cc, method, opts...)
}
}
// InjectToOutgoingContext will inject the correlation ID into the
// outgoing context metadata. Repeat calls will overwrite any existing
// correlation IDs.
func InjectToOutgoingContext(ctx context.Context, correlationID string) context.Context {
return metadata.AppendToOutgoingContext(ctx, metadataCorrelatorKey, correlationID)
}