func()

in collector/metrics/handler.go [81:161]


func (s *Handler) HandleReceive(w http.ResponseWriter, r *http.Request) {
	start := time.Now()
	m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": s.Path})

	defer func() {
		dur := time.Since(start)
		if dur.Seconds() > 10 {
			logger.Warnf("slow request: path=%s duration=%s from=%s size=%d", s.Path, dur.String(), r.RemoteAddr, r.ContentLength)
		}
		if err := r.Body.Close(); err != nil {
			logger.Errorf("close http body: %s, path=/transfer duration=%s", err.Error(), dur.String())
		}
	}()

	defer func() {
		if err := r.Body.Close(); err != nil {
			logger.Errorf("close http body: %s, path=%s duration=%s", err.Error(), s.Path, time.Since(start).String())
		}
	}()

	if !s.health.IsHealthy() {
		m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
		http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
		return
	}

	bodyBuf := pool.BytesBufferPool.Get(512 * 1024).(*gbp.Buffer)
	defer pool.BytesBufferPool.Put(bodyBuf)
	bodyBuf.Reset()

	_, err := io.Copy(bodyBuf, r.Body)
	if err != nil {
		m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	compressed := bodyBuf.Bytes()
	buf := gbp.Get(512 * 1024)
	defer gbp.Put(buf)
	buf = buf[:0]

	reqBuf, err := snappy.Decode(buf, compressed)
	if err != nil {
		m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	// Note: this cause allocations, but holding onto them in a pool causes a lot of memory to be used over time.
	req := prompb.WriteRequestPool.Get()
	if err := req.Unmarshal(reqBuf); err != nil {
		m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	// Apply any label or metrics drops or additions
	req = s.requestTransformer.TransformWriteRequest(req)

	if len(req.Timeseries) == 0 {
		m.WithLabelValues(strconv.Itoa(http.StatusNoContent)).Inc()
		w.WriteHeader(http.StatusAccepted)
		return
	}

	err = s.Write(r.Context(), req)
	if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
		m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
		http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
		return
	} else if err != nil {
		logger.Errorf("Failed to write ts: %s", err.Error())
		m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
	w.WriteHeader(http.StatusAccepted)
}