in ingestor/service.go [359:461]
func (s *Service) HandleTransfer(w http.ResponseWriter, r *http.Request) {
adxhttp.MaybeCloseConnection(w, r)
start := time.Now()
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/transfer"})
filename := r.URL.Query().Get("filename")
if filename == "" {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "missing filename", http.StatusBadRequest)
return
}
xff := r.Header.Get("X-Forwarded-For")
// If the header is present, split it by comma and take the first IP address
var originalIP string
if xff != "" {
ips := strings.Split(xff, ",")
originalIP = strings.TrimSpace(ips[0])
} else {
// If the header is not present, use the remote address
originalIP = r.RemoteAddr
}
cr := reader.NewCounterReader(r.Body)
var body io.ReadCloser = cr
defer func() {
io.Copy(io.Discard, body)
metrics.RequestsBytesReceived.Add(float64(cr.Count()))
dur := time.Since(start)
if s.opts.SlowRequestThreshold > 0 && dur.Seconds() > s.opts.SlowRequestThreshold {
logger.Warnf("Slow request: path=/transfer duration=%s from=%s size=%d file=%s", dur.String(), originalIP, cr.Count(), filename)
}
if err := body.Close(); err != nil {
logger.Errorf("Close http body: %s, path=/transfer duration=%s from=%s", err.Error(), dur.String(), originalIP)
}
}()
for _, prefix := range s.dropFilePrefixes {
if strings.HasPrefix(filename, prefix) {
io.Copy(io.Discard, body)
metrics.IngestorDroppedPrefixes.WithLabelValues(prefix).Inc()
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
return
}
}
if !s.health.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
// https://pkg.go.dev/io/fs#ValidPath
// Check for possible traversal attacks.
f := s.validateFileName(filename)
if f == "" {
logger.Errorf("Transfer requested with an invalid filename %q", filename)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Filename is invalid", http.StatusBadRequest)
return
}
// If the request is gzipped, create a gzip reader to decompress the body.
if r.Header.Get("Content-Encoding") == "gzip" {
gzipReader, err := gzip.NewReader(body)
if err != nil {
logger.Errorf("Failed to create gzip reader: %s", err.Error())
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Invalid gzip encoding", http.StatusBadRequest)
return
}
defer gzipReader.Close()
body = gzipReader
}
n, err := s.store.Import(f, body)
if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
} else if errors.Is(err, wal.ErrSegmentLocked) {
http.Error(w, err.Error(), http.StatusLocked)
return
} else if err != nil && strings.Contains(err.Error(), "block checksum verification failed") {
logger.Errorf("Transfer requested with checksum error %q from=%s", filename, originalIP)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, "Block checksum verification failed", http.StatusBadRequest)
return
} else if err != nil {
logger.Errorf("Failed to import %s: %s from=%s", filename, err.Error(), originalIP)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
if logger.IsDebug() {
logger.Debugf("Imported %d bytes to %s", n, filename)
}
}
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
}