pkg/espoll/client.go (118 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package espoll wraps an elasticsearch.Client to provide utilitarian methods
// to assert for conditions until a certain threshold is met.
package espoll
import (
"bytes"
"context"
"encoding/json"
"io"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
// Client wraps an Elasticsearch client
type Client struct {
*elasticsearch.Client
}
// WrapClient wraps an Elasticsearch client and returns an espoll.Client
func WrapClient(c *elasticsearch.Client) *Client { return &Client{Client: c} }
type Request interface {
Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error)
}
// Do performs the specified request.
func (es *Client) Do(
ctx context.Context,
req Request,
out any,
opts ...RequestOption,
) (*esapi.Response, error) {
requestOptions := requestOptions{
// Set the timeout to something high to account for Elasticsearch
// cluster and index/shard initialisation. Under normal conditions
// this timeout should never be reached.
timeout: time.Minute,
interval: 100 * time.Millisecond,
}
for _, opt := range opts {
opt(&requestOptions)
}
var timeoutC, tickerC <-chan time.Time
var transport esapi.Transport = es
if requestOptions.cond != nil {
// A return condition has been specified, which means we
// might retry the request. Wrap the transport with a
// bodyRepeater to ensure the body is copied as needed.
transport = &bodyRepeater{transport, nil}
if requestOptions.timeout > 0 {
timeout := time.NewTimer(requestOptions.timeout)
defer timeout.Stop()
timeoutC = timeout.C
}
}
var resp *esapi.Response
for {
if tickerC != nil {
select {
case <-timeoutC:
return nil, context.DeadlineExceeded
case <-tickerC:
}
}
var err error
resp, err = req.Do(ctx, transport)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.IsError() {
return nil, &Error{StatusCode: resp.StatusCode, Message: resp.String()}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if out != nil {
if err := json.Unmarshal(body, out); err != nil {
return nil, err
}
}
resp.Body = io.NopCloser(bytes.NewReader(body))
if requestOptions.cond == nil || requestOptions.cond(resp) {
break
}
if tickerC == nil {
// First time around, start a ticker for retrying.
ticker := time.NewTicker(requestOptions.interval)
defer ticker.Stop()
tickerC = ticker.C
}
}
return resp, nil
}
// RequestOption modifies certain parameters for an esapi.Request.
type RequestOption func(*requestOptions)
type Error struct {
StatusCode int
Message string
}
func (e *Error) Error() string {
return e.Message
}
type requestOptions struct {
timeout time.Duration
interval time.Duration
cond ConditionFunc
}
// WithTimeout sets the timeout in an Elasticsearch request.
func WithTimeout(d time.Duration) RequestOption {
return func(opts *requestOptions) {
opts.timeout = d
}
}
// WithInterval sets the poll interval in an Elasticsearch request.
func WithInterval(d time.Duration) RequestOption {
return func(opts *requestOptions) {
opts.interval = d
}
}
// ConditionFunc evaluates the esapi.Response.
type ConditionFunc func(*esapi.Response) bool
// AllCondition returns a ConditionFunc that returns true as
// long as none of the supplied conditions returns false.
func AllCondition(conds ...ConditionFunc) ConditionFunc {
return func(resp *esapi.Response) bool {
for _, cond := range conds {
if !cond(resp) {
return false
}
}
return true
}
}
// WithCondition runs the specified condition func.
func WithCondition(cond ConditionFunc) RequestOption {
return func(opts *requestOptions) {
opts.cond = cond
}
}