ingestor/cluster/client.go (146 lines of code) (raw):

package cluster import ( "bufio" "context" "fmt" "io" "net/http" "strings" "time" adxhttp "github.com/Azure/adx-mon/pkg/http" "github.com/Azure/adx-mon/pkg/logger" "github.com/klauspost/compress/gzip" ) var ( ErrPeerOverloaded = fmt.Errorf("peer overloaded") ErrSegmentExists = fmt.Errorf("segment already exists") ErrSegmentLocked = fmt.Errorf("segment is locked") ) type ErrBadRequest struct { Msg string } func (e ErrBadRequest) Error() string { return fmt.Sprintf("bad request: %s", e.Msg) } func (e ErrBadRequest) Is(target error) bool { return target == ErrBadRequest{} } type Client struct { httpClient *http.Client opts ClientOpts } type ClientOpts struct { // Close controls whether the client closes the connection after each request. Close bool // Timeout is the timeout for the http client and the http request. Timeout time.Duration // InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name. InsecureSkipVerify bool // IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection // will remain idle before closing itself. IdleConnTimeout time.Duration // ResponseHeaderTimeout is the amount of time to wait for a server's response headers // after fully writing the request (including its body, if any). ResponseHeaderTimeout time.Duration // MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts. MaxIdleConns int // MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host. MaxIdleConnsPerHost int // MaxConnsPerHost, if non-zero, controls the maximum connections per host. MaxConnsPerHost int // TLSHandshakeTimeout specifies the maximum amount of time to // wait for a TLS handshake. Zero means no timeout. TLSHandshakeTimeout time.Duration // DisableHTTP2 controls whether the client disables HTTP/2 support. DisableHTTP2 bool // DisableKeepAlives controls whether the client disables HTTP keep-alives. DisableKeepAlives bool // DisableGzip controls whether the client disables gzip compression. DisableGzip bool } func (c ClientOpts) WithDefaults() ClientOpts { if c.Timeout == 0 { c.Timeout = 10 * time.Second } if c.IdleConnTimeout == 0 { c.IdleConnTimeout = 1 * time.Minute } if c.ResponseHeaderTimeout == 0 { c.ResponseHeaderTimeout = 10 * time.Second } if c.MaxIdleConns == 0 { c.MaxIdleConns = 100 } if c.MaxIdleConnsPerHost == 0 { c.MaxIdleConnsPerHost = 5 } if c.MaxConnsPerHost == 0 { c.MaxConnsPerHost = 5 } if c.TLSHandshakeTimeout == 0 { c.TLSHandshakeTimeout = 10 * time.Second } return c } func NewClient(opts ClientOpts) (*Client, error) { httpClient := adxhttp.NewClient( adxhttp.ClientOpts{ Timeout: opts.Timeout, IdleConnTimeout: opts.IdleConnTimeout, ResponseHeaderTimeout: opts.ResponseHeaderTimeout, MaxIdleConns: opts.MaxIdleConns, MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost, MaxConnsPerHost: opts.MaxConnsPerHost, TLSHandshakeTimeout: opts.TLSHandshakeTimeout, InsecureSkipVerify: opts.InsecureSkipVerify, DisableHTTP2: opts.DisableHTTP2, Close: opts.Close, DisableKeepAlives: opts.DisableKeepAlives, }) return &Client{ httpClient: httpClient, opts: opts, }, nil } // Write writes the given paths to the given endpoint. If multiple paths are given, they are // merged into the first file at the destination. This ensures we transfer the full batch // atomimcally. func (c *Client) Write(ctx context.Context, endpoint string, filename string, body io.Reader) error { br := bufio.NewReaderSize(body, 4*1024) // Send the body with gzip compression unless the client has that option disabled. if !c.opts.DisableGzip { gzipReader, gzipWriter := io.Pipe() go func() { defer gzipWriter.Close() gzipCompressor := gzip.NewWriter(gzipWriter) defer gzipCompressor.Close() if _, err := io.Copy(gzipCompressor, body); err != nil { if err := gzipWriter.CloseWithError(err); err != nil { logger.Errorf("failed to close gzip writer: %v", err) } } }() br.Reset(gzipReader) } req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/transfer", endpoint), br) if err != nil { return fmt.Errorf("new request: %w", err) } params := req.URL.Query() params.Add("filename", filename) req.URL.RawQuery = params.Encode() if !c.opts.DisableGzip { req.Header.Set("Content-Encoding", "gzip") } req.Header.Set("Content-Type", "text/csv") req.Header.Set("User-Agent", "adx-mon") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("http post: %w", err) } defer func() { io.Copy(io.Discard, resp.Body) resp.Body.Close() }() if resp.StatusCode != 202 { body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("read resp: %w", err) } if resp.StatusCode == http.StatusTooManyRequests { return ErrPeerOverloaded } if resp.StatusCode == http.StatusConflict { return ErrSegmentExists } if resp.StatusCode == http.StatusLocked { return ErrSegmentLocked } if resp.StatusCode == http.StatusBadRequest { return &ErrBadRequest{Msg: fmt.Sprintf("write failed: %s", strings.TrimSpace(string(body)))} } return fmt.Errorf("write failed: %s", strings.TrimSpace(string(body))) } return nil }