in collector/metrics/handler.go [81:161]
func (s *Handler) HandleReceive(w http.ResponseWriter, r *http.Request) {
start := time.Now()
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": s.Path})
defer func() {
dur := time.Since(start)
if dur.Seconds() > 10 {
logger.Warnf("slow request: path=%s duration=%s from=%s size=%d", s.Path, dur.String(), r.RemoteAddr, r.ContentLength)
}
if err := r.Body.Close(); err != nil {
logger.Errorf("close http body: %s, path=/transfer duration=%s", err.Error(), dur.String())
}
}()
defer func() {
if err := r.Body.Close(); err != nil {
logger.Errorf("close http body: %s, path=%s duration=%s", err.Error(), s.Path, time.Since(start).String())
}
}()
if !s.health.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
bodyBuf := pool.BytesBufferPool.Get(512 * 1024).(*gbp.Buffer)
defer pool.BytesBufferPool.Put(bodyBuf)
bodyBuf.Reset()
_, err := io.Copy(bodyBuf, r.Body)
if err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
compressed := bodyBuf.Bytes()
buf := gbp.Get(512 * 1024)
defer gbp.Put(buf)
buf = buf[:0]
reqBuf, err := snappy.Decode(buf, compressed)
if err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Note: this cause allocations, but holding onto them in a pool causes a lot of memory to be used over time.
req := prompb.WriteRequestPool.Get()
if err := req.Unmarshal(reqBuf); err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Apply any label or metrics drops or additions
req = s.requestTransformer.TransformWriteRequest(req)
if len(req.Timeseries) == 0 {
m.WithLabelValues(strconv.Itoa(http.StatusNoContent)).Inc()
w.WriteHeader(http.StatusAccepted)
return
}
err = s.Write(r.Context(), req)
if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
} else if err != nil {
logger.Errorf("Failed to write ts: %s", err.Error())
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
}