func()

in ingestor/cluster/client.go [135:203]


func (c *Client) Write(ctx context.Context, endpoint string, filename string, body io.Reader) error {

	br := bufio.NewReaderSize(body, 4*1024)
	// Send the body with gzip compression unless the client has that option disabled.
	if !c.opts.DisableGzip {
		gzipReader, gzipWriter := io.Pipe()
		go func() {
			defer gzipWriter.Close()
			gzipCompressor := gzip.NewWriter(gzipWriter)
			defer gzipCompressor.Close()
			if _, err := io.Copy(gzipCompressor, body); err != nil {
				if err := gzipWriter.CloseWithError(err); err != nil {
					logger.Errorf("failed to close gzip writer: %v", err)
				}
			}
		}()
		br.Reset(gzipReader)
	}

	req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/transfer", endpoint), br)
	if err != nil {
		return fmt.Errorf("new request: %w", err)
	}
	params := req.URL.Query()
	params.Add("filename", filename)
	req.URL.RawQuery = params.Encode()

	if !c.opts.DisableGzip {
		req.Header.Set("Content-Encoding", "gzip")
	}

	req.Header.Set("Content-Type", "text/csv")
	req.Header.Set("User-Agent", "adx-mon")

	resp, err := c.httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("http post: %w", err)
	}
	defer func() {
		io.Copy(io.Discard, resp.Body)
		resp.Body.Close()
	}()

	if resp.StatusCode != 202 {
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return fmt.Errorf("read resp: %w", err)
		}

		if resp.StatusCode == http.StatusTooManyRequests {
			return ErrPeerOverloaded
		}

		if resp.StatusCode == http.StatusConflict {
			return ErrSegmentExists
		}

		if resp.StatusCode == http.StatusLocked {
			return ErrSegmentLocked
		}

		if resp.StatusCode == http.StatusBadRequest {
			return &ErrBadRequest{Msg: fmt.Sprintf("write failed: %s", strings.TrimSpace(string(body)))}
		}

		return fmt.Errorf("write failed: %s", strings.TrimSpace(string(body)))
	}
	return nil
}