in elastictransport/elastictransport.go [154:288]
func New(cfg Config) (*Client, error) {
if cfg.Transport == nil {
defaultTransport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, errors.New("cannot clone http.DefaultTransport")
}
cfg.Transport = defaultTransport.Clone()
}
if transport, ok := cfg.Transport.(*http.Transport); ok {
if cfg.CertificateFingerprint != "" {
transport.DialTLS = func(network, addr string) (net.Conn, error) {
fingerprint, _ := hex.DecodeString(cfg.CertificateFingerprint)
c, err := tls.Dial(network, addr, &tls.Config{InsecureSkipVerify: true})
if err != nil {
return nil, err
}
// Retrieve the connection state from the remote server.
cState := c.ConnectionState()
for _, cert := range cState.PeerCertificates {
// Compute digest for each certificate.
digest := sha256.Sum256(cert.Raw)
// Provided fingerprint should match at least one certificate from remote before we continue.
if bytes.Compare(digest[0:], fingerprint) == 0 {
return c, nil
}
}
return nil, fmt.Errorf("fingerprint mismatch, provided: %s", cfg.CertificateFingerprint)
}
}
}
if cfg.CACert != nil {
httpTransport, ok := cfg.Transport.(*http.Transport)
if !ok {
return nil, fmt.Errorf("unable to set CA certificate for transport of type %T", cfg.Transport)
}
httpTransport = httpTransport.Clone()
httpTransport.TLSClientConfig.RootCAs = x509.NewCertPool()
if ok := httpTransport.TLSClientConfig.RootCAs.AppendCertsFromPEM(cfg.CACert); !ok {
return nil, errors.New("unable to add CA certificate")
}
cfg.Transport = httpTransport
}
if len(cfg.RetryOnStatus) == 0 {
cfg.RetryOnStatus = defaultRetryOnStatus[:]
}
if cfg.MaxRetries == 0 {
cfg.MaxRetries = defaultMaxRetries
}
var conns []*Connection
for _, u := range cfg.URLs {
conns = append(conns, &Connection{URL: u})
}
client := Client{
userAgent: cfg.UserAgent,
urls: cfg.URLs,
username: cfg.Username,
password: cfg.Password,
apikey: cfg.APIKey,
servicetoken: cfg.ServiceToken,
header: cfg.Header,
retryOnStatus: cfg.RetryOnStatus,
disableRetry: cfg.DisableRetry,
maxRetries: cfg.MaxRetries,
retryOnError: cfg.RetryOnError,
retryBackoff: cfg.RetryBackoff,
discoverNodesInterval: cfg.DiscoverNodesInterval,
compressRequestBody: cfg.CompressRequestBody,
compressRequestBodyLevel: cfg.CompressRequestBodyLevel,
transport: cfg.Transport,
logger: cfg.Logger,
selector: cfg.Selector,
poolFunc: cfg.ConnectionPoolFunc,
instrumentation: cfg.Instrumentation,
}
if cfg.DiscoverNodeTimeout != nil {
client.discoverNodeTimeout = cfg.DiscoverNodeTimeout
}
if client.poolFunc != nil {
client.pool = client.poolFunc(conns, client.selector)
} else {
client.pool, _ = NewConnectionPool(conns, client.selector)
}
if cfg.EnableDebugLogger {
debugLogger = &debuggingLogger{Output: os.Stdout}
}
if cfg.EnableMetrics {
client.metrics = &metrics{responses: make(map[int]int)}
// TODO(karmi): Type assertion to interface
if pool, ok := client.pool.(*singleConnectionPool); ok {
pool.metrics = client.metrics
}
if pool, ok := client.pool.(*statusConnectionPool); ok {
pool.metrics = client.metrics
}
}
if client.discoverNodesInterval > 0 {
time.AfterFunc(client.discoverNodesInterval, func() {
client.scheduleDiscoverNodes(client.discoverNodesInterval)
})
}
if client.compressRequestBodyLevel == 0 {
client.compressRequestBodyLevel = gzip.DefaultCompression
}
if cfg.PoolCompressor {
client.gzipCompressor = newPooledGzipCompressor(client.compressRequestBodyLevel)
} else {
client.gzipCompressor = newSimpleGzipCompressor(client.compressRequestBodyLevel)
}
return &client, nil
}