func()

in ingestor/service.go [359:461]


func (s *Service) HandleTransfer(w http.ResponseWriter, r *http.Request) {
	adxhttp.MaybeCloseConnection(w, r)

	start := time.Now()
	m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/transfer"})
	filename := r.URL.Query().Get("filename")
	if filename == "" {
		m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
		http.Error(w, "missing filename", http.StatusBadRequest)
		return
	}

	xff := r.Header.Get("X-Forwarded-For")
	// If the header is present, split it by comma and take the first IP address
	var originalIP string
	if xff != "" {
		ips := strings.Split(xff, ",")
		originalIP = strings.TrimSpace(ips[0])
	} else {
		// If the header is not present, use the remote address
		originalIP = r.RemoteAddr
	}

	cr := reader.NewCounterReader(r.Body)
	var body io.ReadCloser = cr
	defer func() {
		io.Copy(io.Discard, body)

		metrics.RequestsBytesReceived.Add(float64(cr.Count()))
		dur := time.Since(start)
		if s.opts.SlowRequestThreshold > 0 && dur.Seconds() > s.opts.SlowRequestThreshold {
			logger.Warnf("Slow request: path=/transfer duration=%s from=%s size=%d file=%s", dur.String(), originalIP, cr.Count(), filename)
		}
		if err := body.Close(); err != nil {
			logger.Errorf("Close http body: %s, path=/transfer duration=%s from=%s", err.Error(), dur.String(), originalIP)
		}
	}()

	for _, prefix := range s.dropFilePrefixes {
		if strings.HasPrefix(filename, prefix) {
			io.Copy(io.Discard, body)
			metrics.IngestorDroppedPrefixes.WithLabelValues(prefix).Inc()
			m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
			w.WriteHeader(http.StatusAccepted)
			return
		}
	}

	if !s.health.IsHealthy() {
		m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
		http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
		return
	}

	// https://pkg.go.dev/io/fs#ValidPath
	// Check for possible traversal attacks.
	f := s.validateFileName(filename)
	if f == "" {
		logger.Errorf("Transfer requested with an invalid filename %q", filename)
		m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
		http.Error(w, "Filename is invalid", http.StatusBadRequest)
		return
	}

	// If the request is gzipped, create a gzip reader to decompress the body.
	if r.Header.Get("Content-Encoding") == "gzip" {
		gzipReader, err := gzip.NewReader(body)
		if err != nil {
			logger.Errorf("Failed to create gzip reader: %s", err.Error())
			m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
			http.Error(w, "Invalid gzip encoding", http.StatusBadRequest)
			return
		}
		defer gzipReader.Close()
		body = gzipReader
	}

	n, err := s.store.Import(f, body)
	if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
		m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
		http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
		return
	} else if errors.Is(err, wal.ErrSegmentLocked) {
		http.Error(w, err.Error(), http.StatusLocked)
		return
	} else if err != nil && strings.Contains(err.Error(), "block checksum verification failed") {
		logger.Errorf("Transfer requested with checksum error %q from=%s", filename, originalIP)
		m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
		http.Error(w, "Block checksum verification failed", http.StatusBadRequest)
		return
	} else if err != nil {
		logger.Errorf("Failed to import %s: %s from=%s", filename, err.Error(), originalIP)
		m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	} else {
		if logger.IsDebug() {
			logger.Debugf("Imported %d bytes to %s", n, filename)
		}
	}
	m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
	w.WriteHeader(http.StatusAccepted)
}