func newClient()

in pulsar/client_impl.go [57:190]


func newClient(options ClientOptions) (Client, error) {
	var logger log.Logger
	if options.Logger != nil {
		logger = options.Logger
	} else {
		logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
	}

	connectionMaxIdleTime := options.ConnectionMaxIdleTime
	if connectionMaxIdleTime == 0 {
		connectionMaxIdleTime = defaultConnMaxIdleTime
	} else if connectionMaxIdleTime > 0 && connectionMaxIdleTime < minConnMaxIdleTime {
		return nil, newError(InvalidConfiguration, fmt.Sprintf("Connection max idle time should be at least %f "+
			"seconds", minConnMaxIdleTime.Seconds()))
	} else {
		logger.Debugf("Disable auto release idle connections")
	}

	if options.URL == "" {
		return nil, newError(InvalidConfiguration, "URL is required for client")
	}

	url, err := url.Parse(options.URL)
	if err != nil {
		logger.WithError(err).Error("Failed to parse service URL")
		return nil, newError(InvalidConfiguration, "Invalid service URL")
	}

	var tlsConfig *internal.TLSOptions
	switch url.Scheme {
	case "pulsar", "http":
		tlsConfig = nil
	case "pulsar+ssl", "https":
		tlsConfig = &internal.TLSOptions{
			AllowInsecureConnection: options.TLSAllowInsecureConnection,
			KeyFile:                 options.TLSKeyFilePath,
			CertFile:                options.TLSCertificateFile,
			TrustCertsFilePath:      options.TLSTrustCertsFilePath,
			ValidateHostname:        options.TLSValidateHostname,
			ServerName:              url.Hostname(),
			CipherSuites:            options.TLSCipherSuites,
			MinVersion:              options.TLSMinVersion,
			MaxVersion:              options.TLSMaxVersion,
			TLSConfig:               options.TLSConfig,
		}
	default:
		return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
	}

	var authProvider auth.Provider
	var ok bool

	if options.Authentication == nil {
		authProvider = auth.NewAuthDisabled()
	} else {
		authProvider, ok = options.Authentication.(auth.Provider)
		if !ok {
			return nil, newError(AuthenticationError, "invalid auth provider interface")
		}
	}
	err = authProvider.Init()
	if err != nil {
		return nil, err
	}

	// the default timeout respects Go's default timeout which is no timeout
	// Missing user specified timeout renders 0 values that matches
	// net.Dailer's default if time.Duration value is not initialized
	connectionTimeout := options.ConnectionTimeout

	operationTimeout := options.OperationTimeout
	if operationTimeout.Nanoseconds() == 0 {
		operationTimeout = defaultOperationTimeout
	}

	maxConnectionsPerHost := options.MaxConnectionsPerBroker
	if maxConnectionsPerHost <= 0 {
		maxConnectionsPerHost = 1
	}

	if options.MetricsCardinality == 0 {
		options.MetricsCardinality = MetricsCardinalityNamespace
	}

	if options.MetricsRegisterer == nil {
		options.MetricsRegisterer = prometheus.DefaultRegisterer
	}

	var metrics *internal.Metrics
	if options.CustomMetricsLabels != nil {
		metrics = internal.NewMetricsProvider(
			int(options.MetricsCardinality), options.CustomMetricsLabels, options.MetricsRegisterer)
	} else {
		metrics = internal.NewMetricsProvider(
			int(options.MetricsCardinality), map[string]string{}, options.MetricsRegisterer)
	}

	keepAliveInterval := options.KeepAliveInterval
	if keepAliveInterval.Nanoseconds() == 0 {
		keepAliveInterval = defaultKeepAliveInterval
	}

	memLimitBytes := options.MemoryLimitBytes
	if memLimitBytes == 0 {
		memLimitBytes = defaultMemoryLimitBytes
	}

	c := &client{
		cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
			maxConnectionsPerHost, logger, metrics, options.Description, connectionMaxIdleTime),
		log:              logger,
		metrics:          metrics,
		memLimit:         internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
		operationTimeout: operationTimeout,
		tlsEnabled:       tlsConfig != nil,
	}

	c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
		options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))

	c.lookupService = c.rpcClient.LookupService("")

	c.handlers = internal.NewClientHandlers()

	if options.EnableTransaction {
		c.tcClient = newTransactionCoordinatorClientImpl(c)
		err = c.tcClient.start()
		if err != nil {
			return nil, err
		}
	}

	return c, nil
}