pkg/promremote/client.go (122 lines of code) (raw):

package promremote import ( "bytes" "context" "fmt" "io" "net/http" "sync" "time" adxhttp "github.com/Azure/adx-mon/pkg/http" "github.com/Azure/adx-mon/pkg/prompb" "github.com/golang/snappy" ) var ( bytesPool = sync.Pool{ New: func() interface{} { return make([]byte, 0, 32*1024) }, } ) // Client is a client for the prometheus remote write API. It is safe to be shared between goroutines. type Client struct { httpClient *http.Client endpoint string opts ClientOpts } type ClientOpts struct { // Close controls whether the client closes the connection after each request. Close bool // Timeout is the timeout for the http client and the http request. Timeout time.Duration // InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name. InsecureSkipVerify bool // IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection // will remain idle before closing itself. IdleConnTimeout time.Duration // ResponseHeaderTimeout is the amount of time to wait for a server's response headers // after fully writing the request (including its body, if any). ResponseHeaderTimeout time.Duration // MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts. MaxIdleConns int // MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host. MaxIdleConnsPerHost int // MaxConnsPerHost, if non-zero, controls the maximum connections per host. MaxConnsPerHost int // TLSHandshakeTimeout specifies the maximum amount of time to // wait for a TLS handshake. Zero means no timeout. TLSHandshakeTimeout time.Duration // DisableHTTP2 controls whether the client disables HTTP/2 support. DisableHTTP2 bool // DisableKeepAlives controls whether the client disables HTTP keep-alives. DisableKeepAlives bool // Endpoint for writing to the prometheus remote write API. Endpoint string } func (c ClientOpts) WithDefaults() ClientOpts { if c.Timeout == 0 { c.Timeout = 10 * time.Second } if c.IdleConnTimeout == 0 { c.IdleConnTimeout = 1 * time.Minute } if c.ResponseHeaderTimeout == 0 { c.ResponseHeaderTimeout = 10 * time.Second } if c.MaxIdleConns == 0 { c.MaxIdleConns = 100 } if c.MaxIdleConnsPerHost == 0 { c.MaxIdleConnsPerHost = 5 } if c.MaxConnsPerHost == 0 { c.MaxConnsPerHost = 5 } if c.TLSHandshakeTimeout == 0 { c.TLSHandshakeTimeout = 10 * time.Second } return c } func NewClient(opts ClientOpts) (*Client, error) { httpClient := adxhttp.NewClient( adxhttp.ClientOpts{ Timeout: opts.Timeout, InsecureSkipVerify: opts.InsecureSkipVerify, Close: opts.Close, MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost, MaxIdleConns: opts.MaxIdleConns, IdleConnTimeout: opts.IdleConnTimeout, ResponseHeaderTimeout: opts.ResponseHeaderTimeout, DisableHTTP2: opts.DisableHTTP2, DisableKeepAlives: opts.DisableKeepAlives, }) return &Client{ httpClient: httpClient, endpoint: opts.Endpoint, opts: opts, }, nil } func (c *Client) Write(ctx context.Context, endpoint string, wr *prompb.WriteRequest) error { b := bytesPool.Get().([]byte) defer bytesPool.Put(b) b, err := wr.MarshalTo(b[:0]) if err != nil { return fmt.Errorf("marshal proto: %w", err) } b1 := bytesPool.Get().([]byte) defer bytesPool.Put(b1) encoded := snappy.Encode(b1[:0], b) body := bytes.NewReader(encoded) req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/receive", c.endpoint), body) if err != nil { return fmt.Errorf("new request: %w", err) } req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "snappy") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("User-Agent", "adx-mon") req.Close = c.opts.Close resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("http post: %w", err) } defer func() { io.Copy(io.Discard, resp.Body) resp.Body.Close() }() if resp.StatusCode/100 != 2 { body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("read resp: %w", err) } return fmt.Errorf("write failed: %s", string(body)) } return nil } func (c *Client) CloseIdleConnections() { c.httpClient.CloseIdleConnections() }