in opensearchtransport/opensearchtransport.go [239:413]
func (c *Client) Perform(req *http.Request) (*http.Response, error) {
var (
res *http.Response
err error
)
// Compatibility Header
if compatibilityHeader {
if req.Body != nil {
req.Header.Set("Content-Type", "application/vnd.elasticsearch+json;compatible-with=7")
}
req.Header.Set("Accept", "application/vnd.elasticsearch+json;compatible-with=7")
}
// Record metrics, when enabled
if c.metrics != nil {
c.metrics.Lock()
c.metrics.requests++
c.metrics.Unlock()
}
// Update request
c.setReqUserAgent(req)
c.setReqGlobalHeader(req)
if req.Body != nil && req.Body != http.NoBody {
if c.compressRequestBody {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
if _, err := io.Copy(zw, req.Body); err != nil {
return nil, fmt.Errorf("failed to compress request body: %s", err)
}
if err := zw.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request body (during close): %s", err)
}
req.GetBody = func() (io.ReadCloser, error) {
r := buf
return ioutil.NopCloser(&r), nil
}
req.Body, _ = req.GetBody()
req.Header.Set("Content-Encoding", "gzip")
req.ContentLength = int64(buf.Len())
} else if req.GetBody == nil {
if !c.disableRetry || (c.logger != nil && c.logger.RequestBodyEnabled()) {
var buf bytes.Buffer
buf.ReadFrom(req.Body)
req.GetBody = func() (io.ReadCloser, error) {
r := buf
return ioutil.NopCloser(&r), nil
}
req.Body, _ = req.GetBody()
}
}
}
for i := 0; i <= c.maxRetries; i++ {
var (
conn *Connection
shouldRetry bool
shouldCloseBody bool
)
// Get connection from the pool
c.Lock()
conn, err = c.pool.Next()
c.Unlock()
if err != nil {
if c.logger != nil {
c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0))
}
return nil, fmt.Errorf("cannot get connection: %s", err)
}
// Update request
c.setReqURL(conn.URL, req)
c.setReqAuth(conn.URL, req)
if err = c.signRequest(req); err != nil {
return nil, fmt.Errorf("failed to sign request: %s", err)
}
if !c.disableRetry && i > 0 && req.Body != nil && req.Body != http.NoBody {
body, err := req.GetBody()
if err != nil {
return nil, fmt.Errorf("cannot get request body: %s", err)
}
req.Body = body
}
// Set up time measures and execute the request
start := time.Now().UTC()
res, err = c.transport.RoundTrip(req)
dur := time.Since(start)
// Log request and response
if c.logger != nil {
if c.logger.RequestBodyEnabled() && req.Body != nil && req.Body != http.NoBody {
req.Body, _ = req.GetBody()
}
c.logRoundTrip(req, res, err, start, dur)
}
if err != nil {
// Record metrics, when enabled
if c.metrics != nil {
c.metrics.Lock()
c.metrics.failures++
c.metrics.Unlock()
}
// Report the connection as unsuccessful
c.Lock()
c.pool.OnFailure(conn)
c.Unlock()
// Retry on EOF errors
if err == io.EOF {
shouldRetry = true
}
// Retry on network errors, but not on timeout errors, unless configured
if err, ok := err.(net.Error); ok {
if (!err.Timeout() || c.enableRetryOnTimeout) && !c.disableRetry {
shouldRetry = true
}
}
} else {
// Report the connection as succesfull
c.Lock()
c.pool.OnSuccess(conn)
c.Unlock()
}
if res != nil && c.metrics != nil {
c.metrics.Lock()
c.metrics.responses[res.StatusCode]++
c.metrics.Unlock()
}
// Retry on configured response statuses
if res != nil && !c.disableRetry {
for _, code := range c.retryOnStatus {
if res.StatusCode == code {
shouldRetry = true
shouldCloseBody = true
}
}
}
// Break if retry should not be performed
if !shouldRetry {
break
}
// Drain and close body when retrying after response
if shouldCloseBody && i < c.maxRetries {
if res.Body != nil {
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
}
}
// Delay the retry if a backoff function is configured
if c.retryBackoff != nil {
time.Sleep(c.retryBackoff(i + 1))
}
}
// TODO(karmi): Wrap error
return res, err
}