in pulsar/client_impl.go [54:199]
func newClient(options ClientOptions) (Client, error) {
var logger log.Logger
if options.Logger != nil {
logger = options.Logger
} else {
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
}
connectionMaxIdleTime := options.ConnectionMaxIdleTime
if connectionMaxIdleTime == 0 {
connectionMaxIdleTime = defaultConnMaxIdleTime
} else if connectionMaxIdleTime > 0 && connectionMaxIdleTime < minConnMaxIdleTime {
return nil, newError(InvalidConfiguration, fmt.Sprintf("Connection max idle time should be at least %f "+
"seconds", minConnMaxIdleTime.Seconds()))
} else {
logger.Debugf("Disable auto release idle connections")
}
if options.URL == "" {
return nil, newError(InvalidConfiguration, "URL is required for client")
}
url, err := url.Parse(options.URL)
if err != nil {
logger.WithError(err).Error("Failed to parse service URL")
return nil, newError(InvalidConfiguration, "Invalid service URL")
}
var tlsConfig *internal.TLSOptions
switch url.Scheme {
case "pulsar", "http":
tlsConfig = nil
case "pulsar+ssl", "https":
tlsConfig = &internal.TLSOptions{
AllowInsecureConnection: options.TLSAllowInsecureConnection,
KeyFile: options.TLSKeyFilePath,
CertFile: options.TLSCertificateFile,
TrustCertsFilePath: options.TLSTrustCertsFilePath,
ValidateHostname: options.TLSValidateHostname,
ServerName: url.Hostname(),
CipherSuites: options.TLSCipherSuites,
MinVersion: options.TLSMinVersion,
MaxVersion: options.TLSMaxVersion,
}
default:
return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
var authProvider auth.Provider
var ok bool
if options.Authentication == nil {
authProvider = auth.NewAuthDisabled()
} else {
authProvider, ok = options.Authentication.(auth.Provider)
if !ok {
return nil, newError(AuthenticationError, "invalid auth provider interface")
}
}
err = authProvider.Init()
if err != nil {
return nil, err
}
connectionTimeout := options.ConnectionTimeout
if connectionTimeout.Nanoseconds() == 0 {
connectionTimeout = defaultConnectionTimeout
}
operationTimeout := options.OperationTimeout
if operationTimeout.Nanoseconds() == 0 {
operationTimeout = defaultOperationTimeout
}
maxConnectionsPerHost := options.MaxConnectionsPerBroker
if maxConnectionsPerHost <= 0 {
maxConnectionsPerHost = 1
}
if options.MetricsCardinality == 0 {
options.MetricsCardinality = MetricsCardinalityNamespace
}
if options.MetricsRegisterer == nil {
options.MetricsRegisterer = prometheus.DefaultRegisterer
}
var metrics *internal.Metrics
if options.CustomMetricsLabels != nil {
metrics = internal.NewMetricsProvider(
int(options.MetricsCardinality), options.CustomMetricsLabels, options.MetricsRegisterer)
} else {
metrics = internal.NewMetricsProvider(
int(options.MetricsCardinality), map[string]string{}, options.MetricsRegisterer)
}
keepAliveInterval := options.KeepAliveInterval
if keepAliveInterval.Nanoseconds() == 0 {
keepAliveInterval = defaultKeepAliveInterval
}
memLimitBytes := options.MemoryLimitBytes
if memLimitBytes == 0 {
memLimitBytes = defaultMemoryLimitBytes
}
c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
switch url.Scheme {
case "pulsar", "pulsar+ssl":
c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
tlsConfig != nil, options.ListenerName, logger, metrics)
case "http", "https":
httpClient, err := internal.NewHTTPClient(url, serviceNameResolver, tlsConfig,
operationTimeout, logger, metrics, authProvider)
if err != nil {
return nil, newError(InvalidConfiguration, fmt.Sprintf("Failed to init http client with err: '%s'",
err.Error()))
}
c.lookupService = internal.NewHTTPLookupService(httpClient, url, serviceNameResolver,
tlsConfig != nil, logger, metrics)
default:
return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
c.handlers = internal.NewClientHandlers()
if options.EnableTransaction {
c.tcClient = newTransactionCoordinatorClientImpl(c)
err = c.tcClient.start()
if err != nil {
return nil, err
}
}
return c, nil
}