collector/metrics/handler.go (125 lines of code) (raw):
package metrics
import (
"context"
"errors"
"io"
"net/http"
"regexp"
"strconv"
"time"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/pool"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/Azure/adx-mon/pkg/wal"
"github.com/golang/snappy"
gbp "github.com/libp2p/go-buffer-pool"
"github.com/prometheus/client_golang/prometheus"
)
type SeriesCounter interface {
AddSeries(key string, id uint64)
}
type HealthChecker interface {
IsHealthy() bool
}
type HandlerOpts struct {
// Path is the path where the handler will be registered.
Path string
RequestTransformer interface {
TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest
}
// RequestWriters is the interface that writes the time series to a destination.
RequestWriters []remote.RemoteWriteClient
// Health is the interface that determines if the service is healthy.
HealthChecker HealthChecker
// Database is the name of the Kusto database where time series will be written.
Database string
}
type Handler struct {
Path string
// DropLabels is a map of metric names regexes to label name regexes. When both match, the label will be dropped.
DropLabels map[*regexp.Regexp]*regexp.Regexp
// DropMetrics is a slice of regexes that drops metrics when the metric name matches. The metric name format
// should match the Prometheus naming style before the metric is translated to a Kusto table name.
DropMetrics []*regexp.Regexp
requestTransformer interface {
TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest
}
requestWriters []remote.RemoteWriteClient
health HealthChecker
}
func (s *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
s.HandleReceive(writer, request)
}
func NewHandler(opts HandlerOpts) *Handler {
return &Handler{
Path: opts.Path,
health: opts.HealthChecker,
requestTransformer: opts.RequestTransformer,
requestWriters: opts.RequestWriters,
}
}
// HandleReceive handles the prometheus remote write requests and writes them to the store.
func (s *Handler) HandleReceive(w http.ResponseWriter, r *http.Request) {
start := time.Now()
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": s.Path})
defer func() {
dur := time.Since(start)
if dur.Seconds() > 10 {
logger.Warnf("slow request: path=%s duration=%s from=%s size=%d", s.Path, dur.String(), r.RemoteAddr, r.ContentLength)
}
if err := r.Body.Close(); err != nil {
logger.Errorf("close http body: %s, path=/transfer duration=%s", err.Error(), dur.String())
}
}()
defer func() {
if err := r.Body.Close(); err != nil {
logger.Errorf("close http body: %s, path=%s duration=%s", err.Error(), s.Path, time.Since(start).String())
}
}()
if !s.health.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
bodyBuf := pool.BytesBufferPool.Get(512 * 1024).(*gbp.Buffer)
defer pool.BytesBufferPool.Put(bodyBuf)
bodyBuf.Reset()
_, err := io.Copy(bodyBuf, r.Body)
if err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
compressed := bodyBuf.Bytes()
buf := gbp.Get(512 * 1024)
defer gbp.Put(buf)
buf = buf[:0]
reqBuf, err := snappy.Decode(buf, compressed)
if err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Note: this cause allocations, but holding onto them in a pool causes a lot of memory to be used over time.
req := prompb.WriteRequestPool.Get()
if err := req.Unmarshal(reqBuf); err != nil {
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Apply any label or metrics drops or additions
req = s.requestTransformer.TransformWriteRequest(req)
if len(req.Timeseries) == 0 {
m.WithLabelValues(strconv.Itoa(http.StatusNoContent)).Inc()
w.WriteHeader(http.StatusAccepted)
return
}
err = s.Write(r.Context(), req)
if errors.Is(err, wal.ErrMaxSegmentsExceeded) || errors.Is(err, wal.ErrMaxDiskUsageExceeded) {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
} else if err != nil {
logger.Errorf("Failed to write ts: %s", err.Error())
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
m.WithLabelValues(strconv.Itoa(http.StatusAccepted)).Inc()
w.WriteHeader(http.StatusAccepted)
}
func (s *Handler) Write(ctx context.Context, req *prompb.WriteRequest) error {
return remote.WriteRequest(ctx, s.requestWriters, req)
}