in elastictransport/elastictransport.go [291:466]
func (c *Client) Perform(req *http.Request) (*http.Response, error) {
var (
res *http.Response
err error
)
// Record metrics, when enabled
if c.metrics != nil {
c.metrics.Lock()
c.metrics.requests++
c.metrics.Unlock()
}
// Update request
c.setReqUserAgent(req)
c.setReqGlobalHeader(req)
if req.Body != nil && req.Body != http.NoBody {
if c.compressRequestBody {
buf, err := c.gzipCompressor.compress(req.Body)
if err != nil {
return nil, err
}
defer c.gzipCompressor.collectBuffer(buf)
req.GetBody = func() (io.ReadCloser, error) {
// Copy value of buf so it's not destroyed on first read
r := *buf
return ioutil.NopCloser(&r), nil
}
req.Body, _ = req.GetBody()
req.Header.Set("Content-Encoding", "gzip")
req.ContentLength = int64(buf.Len())
} else if req.GetBody == nil {
if !c.disableRetry || (c.logger != nil && c.logger.RequestBodyEnabled()) {
var buf bytes.Buffer
buf.ReadFrom(req.Body)
req.GetBody = func() (io.ReadCloser, error) {
// Copy value of buf so it's not destroyed on first read
r := buf
return ioutil.NopCloser(&r), nil
}
req.Body, _ = req.GetBody()
}
}
}
originalPath := req.URL.Path
for i := 0; i <= c.maxRetries; i++ {
var (
conn *Connection
shouldRetry bool
shouldCloseBody bool
)
// Get connection from the pool
c.Lock()
conn, err = c.pool.Next()
c.Unlock()
if err != nil {
if c.logger != nil {
c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0))
}
return nil, fmt.Errorf("cannot get connection: %s", err)
}
// Update request
c.setReqURL(conn.URL, req)
c.setReqAuth(conn.URL, req)
if !c.disableRetry && i > 0 && req.Body != nil && req.Body != http.NoBody {
body, err := req.GetBody()
if err != nil {
return nil, fmt.Errorf("cannot get request body: %s", err)
}
req.Body = body
}
// Set up time measures and execute the request
start := time.Now().UTC()
res, err = c.transport.RoundTrip(req)
dur := time.Since(start)
// Log request and response
if c.logger != nil {
if c.logger.RequestBodyEnabled() && req.Body != nil && req.Body != http.NoBody {
req.Body, _ = req.GetBody()
}
c.logRoundTrip(req, res, err, start, dur)
}
if err != nil {
// Record metrics, when enabled
if c.metrics != nil {
c.metrics.Lock()
c.metrics.failures++
c.metrics.Unlock()
}
// Report the connection as unsuccessful
c.Lock()
c.pool.OnFailure(conn)
c.Unlock()
// Retry upon decision by the user
if !c.disableRetry && (c.retryOnError == nil || c.retryOnError(req, err)) {
shouldRetry = true
}
} else {
// Report the connection as succesfull
c.Lock()
c.pool.OnSuccess(conn)
c.Unlock()
}
if res != nil && c.metrics != nil {
c.metrics.Lock()
c.metrics.responses[res.StatusCode]++
c.metrics.Unlock()
}
if res != nil && c.instrumentation != nil {
c.instrumentation.AfterResponse(req.Context(), res)
}
// Retry on configured response statuses
if res != nil && !c.disableRetry {
for _, code := range c.retryOnStatus {
if res.StatusCode == code {
shouldRetry = true
shouldCloseBody = true
}
}
}
// Break if retry should not be performed
if !shouldRetry {
break
}
// Drain and close body when retrying after response
if shouldCloseBody && i < c.maxRetries {
if res.Body != nil {
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
}
}
// Delay the retry if a backoff function is configured
if c.retryBackoff != nil {
var cancelled bool
backoff := c.retryBackoff(i + 1)
timer := time.NewTimer(backoff)
select {
case <-req.Context().Done():
err = req.Context().Err()
cancelled = true
timer.Stop()
case <-timer.C:
}
if cancelled {
break
}
}
// Re-init the path of the request to its original state
// This will be re-enriched by the connection upon retry
req.URL.Path = originalPath
}
// TODO(karmi): Wrap error
return res, err
}