in ingestor/cluster/client.go [135:203]
func (c *Client) Write(ctx context.Context, endpoint string, filename string, body io.Reader) error {
br := bufio.NewReaderSize(body, 4*1024)
// Send the body with gzip compression unless the client has that option disabled.
if !c.opts.DisableGzip {
gzipReader, gzipWriter := io.Pipe()
go func() {
defer gzipWriter.Close()
gzipCompressor := gzip.NewWriter(gzipWriter)
defer gzipCompressor.Close()
if _, err := io.Copy(gzipCompressor, body); err != nil {
if err := gzipWriter.CloseWithError(err); err != nil {
logger.Errorf("failed to close gzip writer: %v", err)
}
}
}()
br.Reset(gzipReader)
}
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/transfer", endpoint), br)
if err != nil {
return fmt.Errorf("new request: %w", err)
}
params := req.URL.Query()
params.Add("filename", filename)
req.URL.RawQuery = params.Encode()
if !c.opts.DisableGzip {
req.Header.Set("Content-Encoding", "gzip")
}
req.Header.Set("Content-Type", "text/csv")
req.Header.Set("User-Agent", "adx-mon")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("http post: %w", err)
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
if resp.StatusCode != 202 {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read resp: %w", err)
}
if resp.StatusCode == http.StatusTooManyRequests {
return ErrPeerOverloaded
}
if resp.StatusCode == http.StatusConflict {
return ErrSegmentExists
}
if resp.StatusCode == http.StatusLocked {
return ErrSegmentLocked
}
if resp.StatusCode == http.StatusBadRequest {
return &ErrBadRequest{Msg: fmt.Sprintf("write failed: %s", strings.TrimSpace(string(body)))}
}
return fmt.Errorf("write failed: %s", strings.TrimSpace(string(body)))
}
return nil
}