func New()

in elastictransport/elastictransport.go [154:288]


func New(cfg Config) (*Client, error) {
	if cfg.Transport == nil {
		defaultTransport, ok := http.DefaultTransport.(*http.Transport)
		if !ok {
			return nil, errors.New("cannot clone http.DefaultTransport")
		}
		cfg.Transport = defaultTransport.Clone()
	}

	if transport, ok := cfg.Transport.(*http.Transport); ok {
		if cfg.CertificateFingerprint != "" {
			transport.DialTLS = func(network, addr string) (net.Conn, error) {
				fingerprint, _ := hex.DecodeString(cfg.CertificateFingerprint)

				c, err := tls.Dial(network, addr, &tls.Config{InsecureSkipVerify: true})
				if err != nil {
					return nil, err
				}

				// Retrieve the connection state from the remote server.
				cState := c.ConnectionState()
				for _, cert := range cState.PeerCertificates {
					// Compute digest for each certificate.
					digest := sha256.Sum256(cert.Raw)

					// Provided fingerprint should match at least one certificate from remote before we continue.
					if bytes.Compare(digest[0:], fingerprint) == 0 {
						return c, nil
					}
				}
				return nil, fmt.Errorf("fingerprint mismatch, provided: %s", cfg.CertificateFingerprint)
			}
		}
	}

	if cfg.CACert != nil {
		httpTransport, ok := cfg.Transport.(*http.Transport)
		if !ok {
			return nil, fmt.Errorf("unable to set CA certificate for transport of type %T", cfg.Transport)
		}

		httpTransport = httpTransport.Clone()
		httpTransport.TLSClientConfig.RootCAs = x509.NewCertPool()

		if ok := httpTransport.TLSClientConfig.RootCAs.AppendCertsFromPEM(cfg.CACert); !ok {
			return nil, errors.New("unable to add CA certificate")
		}

		cfg.Transport = httpTransport
	}

	if len(cfg.RetryOnStatus) == 0 {
		cfg.RetryOnStatus = defaultRetryOnStatus[:]
	}

	if cfg.MaxRetries == 0 {
		cfg.MaxRetries = defaultMaxRetries
	}

	var conns []*Connection
	for _, u := range cfg.URLs {
		conns = append(conns, &Connection{URL: u})
	}

	client := Client{
		userAgent: cfg.UserAgent,

		urls:         cfg.URLs,
		username:     cfg.Username,
		password:     cfg.Password,
		apikey:       cfg.APIKey,
		servicetoken: cfg.ServiceToken,
		header:       cfg.Header,

		retryOnStatus:         cfg.RetryOnStatus,
		disableRetry:          cfg.DisableRetry,
		maxRetries:            cfg.MaxRetries,
		retryOnError:          cfg.RetryOnError,
		retryBackoff:          cfg.RetryBackoff,
		discoverNodesInterval: cfg.DiscoverNodesInterval,

		compressRequestBody:      cfg.CompressRequestBody,
		compressRequestBodyLevel: cfg.CompressRequestBodyLevel,

		transport: cfg.Transport,
		logger:    cfg.Logger,
		selector:  cfg.Selector,
		poolFunc:  cfg.ConnectionPoolFunc,

		instrumentation: cfg.Instrumentation,
	}

	if cfg.DiscoverNodeTimeout != nil {
		client.discoverNodeTimeout = cfg.DiscoverNodeTimeout
	}

	if client.poolFunc != nil {
		client.pool = client.poolFunc(conns, client.selector)
	} else {
		client.pool, _ = NewConnectionPool(conns, client.selector)
	}

	if cfg.EnableDebugLogger {
		debugLogger = &debuggingLogger{Output: os.Stdout}
	}

	if cfg.EnableMetrics {
		client.metrics = &metrics{responses: make(map[int]int)}
		// TODO(karmi): Type assertion to interface
		if pool, ok := client.pool.(*singleConnectionPool); ok {
			pool.metrics = client.metrics
		}
		if pool, ok := client.pool.(*statusConnectionPool); ok {
			pool.metrics = client.metrics
		}
	}

	if client.discoverNodesInterval > 0 {
		time.AfterFunc(client.discoverNodesInterval, func() {
			client.scheduleDiscoverNodes(client.discoverNodesInterval)
		})
	}

	if client.compressRequestBodyLevel == 0 {
		client.compressRequestBodyLevel = gzip.DefaultCompression
	}

	if cfg.PoolCompressor {
		client.gzipCompressor = newPooledGzipCompressor(client.compressRequestBodyLevel)
	} else {
		client.gzipCompressor = newSimpleGzipCompressor(client.compressRequestBodyLevel)
	}

	return &client, nil
}