func()

in pkg/espoll/client.go [46:115]


func (es *Client) Do(
	ctx context.Context,
	req Request,
	out any,
	opts ...RequestOption,
) (*esapi.Response, error) {
	requestOptions := requestOptions{
		// Set the timeout to something high to account for Elasticsearch
		// cluster and index/shard initialisation. Under normal conditions
		// this timeout should never be reached.
		timeout:  time.Minute,
		interval: 100 * time.Millisecond,
	}
	for _, opt := range opts {
		opt(&requestOptions)
	}
	var timeoutC, tickerC <-chan time.Time
	var transport esapi.Transport = es
	if requestOptions.cond != nil {
		// A return condition has been specified, which means we
		// might retry the request. Wrap the transport with a
		// bodyRepeater to ensure the body is copied as needed.
		transport = &bodyRepeater{transport, nil}
		if requestOptions.timeout > 0 {
			timeout := time.NewTimer(requestOptions.timeout)
			defer timeout.Stop()
			timeoutC = timeout.C
		}
	}

	var resp *esapi.Response
	for {
		if tickerC != nil {
			select {
			case <-timeoutC:
				return nil, context.DeadlineExceeded
			case <-tickerC:
			}
		}
		var err error
		resp, err = req.Do(ctx, transport)
		if err != nil {
			return nil, err
		}
		defer resp.Body.Close()
		if resp.IsError() {
			return nil, &Error{StatusCode: resp.StatusCode, Message: resp.String()}
		}
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return nil, err
		}
		if out != nil {
			if err := json.Unmarshal(body, out); err != nil {
				return nil, err
			}
		}
		resp.Body = io.NopCloser(bytes.NewReader(body))
		if requestOptions.cond == nil || requestOptions.cond(resp) {
			break
		}
		if tickerC == nil {
			// First time around, start a ticker for retrying.
			ticker := time.NewTicker(requestOptions.interval)
			defer ticker.Stop()
			tickerC = ticker.C
		}
	}
	return resp, nil
}