func()

in elastictransport/elastictransport.go [291:466]


func (c *Client) Perform(req *http.Request) (*http.Response, error) {
	var (
		res *http.Response
		err error
	)

	// Record metrics, when enabled
	if c.metrics != nil {
		c.metrics.Lock()
		c.metrics.requests++
		c.metrics.Unlock()
	}

	// Update request
	c.setReqUserAgent(req)
	c.setReqGlobalHeader(req)

	if req.Body != nil && req.Body != http.NoBody {
		if c.compressRequestBody {
			buf, err := c.gzipCompressor.compress(req.Body)
			if err != nil {
				return nil, err
			}
			defer c.gzipCompressor.collectBuffer(buf)

			req.GetBody = func() (io.ReadCloser, error) {
				// Copy value of buf so it's not destroyed on first read
				r := *buf
				return ioutil.NopCloser(&r), nil
			}
			req.Body, _ = req.GetBody()

			req.Header.Set("Content-Encoding", "gzip")
			req.ContentLength = int64(buf.Len())

		} else if req.GetBody == nil {
			if !c.disableRetry || (c.logger != nil && c.logger.RequestBodyEnabled()) {
				var buf bytes.Buffer
				buf.ReadFrom(req.Body)

				req.GetBody = func() (io.ReadCloser, error) {
					// Copy value of buf so it's not destroyed on first read
					r := buf
					return ioutil.NopCloser(&r), nil
				}
				req.Body, _ = req.GetBody()
			}
		}
	}

	originalPath := req.URL.Path
	for i := 0; i <= c.maxRetries; i++ {
		var (
			conn            *Connection
			shouldRetry     bool
			shouldCloseBody bool
		)

		// Get connection from the pool
		c.Lock()
		conn, err = c.pool.Next()
		c.Unlock()
		if err != nil {
			if c.logger != nil {
				c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0))
			}
			return nil, fmt.Errorf("cannot get connection: %s", err)
		}

		// Update request
		c.setReqURL(conn.URL, req)
		c.setReqAuth(conn.URL, req)

		if !c.disableRetry && i > 0 && req.Body != nil && req.Body != http.NoBody {
			body, err := req.GetBody()
			if err != nil {
				return nil, fmt.Errorf("cannot get request body: %s", err)
			}
			req.Body = body
		}

		// Set up time measures and execute the request
		start := time.Now().UTC()
		res, err = c.transport.RoundTrip(req)
		dur := time.Since(start)

		// Log request and response
		if c.logger != nil {
			if c.logger.RequestBodyEnabled() && req.Body != nil && req.Body != http.NoBody {
				req.Body, _ = req.GetBody()
			}
			c.logRoundTrip(req, res, err, start, dur)
		}

		if err != nil {
			// Record metrics, when enabled
			if c.metrics != nil {
				c.metrics.Lock()
				c.metrics.failures++
				c.metrics.Unlock()
			}

			// Report the connection as unsuccessful
			c.Lock()
			c.pool.OnFailure(conn)
			c.Unlock()

			// Retry upon decision by the user
			if !c.disableRetry && (c.retryOnError == nil || c.retryOnError(req, err)) {
				shouldRetry = true
			}
		} else {
			// Report the connection as succesfull
			c.Lock()
			c.pool.OnSuccess(conn)
			c.Unlock()
		}

		if res != nil && c.metrics != nil {
			c.metrics.Lock()
			c.metrics.responses[res.StatusCode]++
			c.metrics.Unlock()
		}

		if res != nil && c.instrumentation != nil {
			c.instrumentation.AfterResponse(req.Context(), res)
		}

		// Retry on configured response statuses
		if res != nil && !c.disableRetry {
			for _, code := range c.retryOnStatus {
				if res.StatusCode == code {
					shouldRetry = true
					shouldCloseBody = true
				}
			}
		}

		// Break if retry should not be performed
		if !shouldRetry {
			break
		}

		// Drain and close body when retrying after response
		if shouldCloseBody && i < c.maxRetries {
			if res.Body != nil {
				io.Copy(ioutil.Discard, res.Body)
				res.Body.Close()
			}
		}

		// Delay the retry if a backoff function is configured
		if c.retryBackoff != nil {
			var cancelled bool
			backoff := c.retryBackoff(i + 1)
			timer := time.NewTimer(backoff)
			select {
			case <-req.Context().Done():
				err = req.Context().Err()
				cancelled = true
				timer.Stop()
			case <-timer.C:
			}
			if cancelled {
				break
			}
		}

		// Re-init the path of the request to its original state
		// This will be re-enriched by the connection upon retry
		req.URL.Path = originalPath
	}

	// TODO(karmi): Wrap error
	return res, err
}