in pkg/espoll/client.go [46:115]
func (es *Client) Do(
ctx context.Context,
req Request,
out any,
opts ...RequestOption,
) (*esapi.Response, error) {
requestOptions := requestOptions{
// Set the timeout to something high to account for Elasticsearch
// cluster and index/shard initialisation. Under normal conditions
// this timeout should never be reached.
timeout: time.Minute,
interval: 100 * time.Millisecond,
}
for _, opt := range opts {
opt(&requestOptions)
}
var timeoutC, tickerC <-chan time.Time
var transport esapi.Transport = es
if requestOptions.cond != nil {
// A return condition has been specified, which means we
// might retry the request. Wrap the transport with a
// bodyRepeater to ensure the body is copied as needed.
transport = &bodyRepeater{transport, nil}
if requestOptions.timeout > 0 {
timeout := time.NewTimer(requestOptions.timeout)
defer timeout.Stop()
timeoutC = timeout.C
}
}
var resp *esapi.Response
for {
if tickerC != nil {
select {
case <-timeoutC:
return nil, context.DeadlineExceeded
case <-tickerC:
}
}
var err error
resp, err = req.Do(ctx, transport)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.IsError() {
return nil, &Error{StatusCode: resp.StatusCode, Message: resp.String()}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if out != nil {
if err := json.Unmarshal(body, out); err != nil {
return nil, err
}
}
resp.Body = io.NopCloser(bytes.NewReader(body))
if requestOptions.cond == nil || requestOptions.cond(resp) {
break
}
if tickerC == nil {
// First time around, start a ticker for retrying.
ticker := time.NewTicker(requestOptions.interval)
defer ticker.Stop()
tickerC = ticker.C
}
}
return resp, nil
}