func()

in plugins/client/grpc/client_config.go [42:107]


func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
	options := make([]grpc.DialOption, 0)

	if c.EnableTLS {
		configTLS, err := c.configTLS()
		if err != nil {
			return nil, err
		}
		options = append(options, grpc.WithTransportCredentials(credentials.NewTLS(configTLS)))
	} else {
		options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
	}

	var authHeader metadata.MD
	if c.Authentication != "" {
		authHeader = metadata.New(map[string]string{"Authentication": c.Authentication})
	}

	unaryRequestTimeout, err := time.ParseDuration(c.Timeout.Unary)
	if err != nil {
		return nil, fmt.Errorf("cannot parse the unary request timeout: %v", err)
	}
	streamRequestTimeout, err := time.ParseDuration(c.Timeout.Stream)
	if err != nil {
		return nil, fmt.Errorf("cannot parse the stream request timeout: %v", err)
	}

	// append auth or report error
	options = append(options, grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc,
		cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
		if authHeader != nil {
			ctx = metadata.NewOutgoingContext(ctx, authHeader)
		}
		supportBidirectionalStream := false
		if b := ctx.Value(CtxBidirectionalStreamKey); b != nil {
			supportBidirectionalStream = b.(bool)
		}
		timeout, timeoutFunc := context.WithTimeout(ctx, streamRequestTimeout)
		clientStream, err := streamer(timeout, desc, cc, method, opts...)
		if err != nil {
			timeoutFunc()
			c.reportError(err)
			return nil, err
		}
		streamWrapper := &timeoutClientStream{clientStream, supportBidirectionalStream, timeoutFunc}
		return streamWrapper, err
	}))
	grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{},
		cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		if authHeader != nil {
			ctx = metadata.NewOutgoingContext(ctx, authHeader)
		}
		timeout, timeoutFunc := context.WithTimeout(ctx, unaryRequestTimeout)
		defer timeoutFunc()
		err := invoker(timeout, method, req, reply, cc, opts...)
		if err != nil {
			c.reportError(err)
		}
		return err
	})

	// using self build load balancer
	options = append(options, grpc.WithDefaultServiceConfig(fmt.Sprintf("{\"loadBalancingPolicy\":%q}", lb.Name)))

	return &options, nil
}