hub/hub.go (352 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
package hub
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/golang/glog"
"github.com/labstack/echo"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"log"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
)
const (
scrapeWorkerPoolSize = 100
)
var (
hubLimit = prometheus.NewGauge(prometheus.GaugeOpts{Name: "hub_limit", Help: "Maximum number of datapoints in hub"})
hubSize = prometheus.NewGauge(prometheus.GaugeOpts{Name: "hub_size", Help: "Number of datapoints in hub"})
httpReceiveSizeFam = prometheus.NewGauge(prometheus.GaugeOpts{Name: "http_receive_size_fam", Help: "Size of last HTTP receive (number of families)"})
httpReceiveSizeDP = prometheus.NewGauge(prometheus.GaugeOpts{Name: "http_receive_size_dp", Help: "Size of last HTTP receive (number of datapoints)"})
httpReceiveTime = prometheus.NewGauge(prometheus.GaugeOpts{Name: "http_receive_time", Help: "Time to ingest last HTTP receive"})
parseTime = prometheus.NewGauge(prometheus.GaugeOpts{Name: "parse_time", Help: "Time to parse last HTTP receive"})
grpcReceiveSizeFam = prometheus.NewGauge(prometheus.GaugeOpts{Name: "grpc_receive_size_fam", Help: "Size of last GRPC receive (number of families)"})
grpcReceiveSizeDP = prometheus.NewGauge(prometheus.GaugeOpts{Name: "grpc_receive_size_dp", Help: "Size of last GRPC receive (number of datapoints)"})
grpcReceiveTime = prometheus.NewGauge(prometheus.GaugeOpts{Name: "grpc_receive_time", Help: "Time to ingest last GRPC receive"})
scrapeLockWait = prometheus.NewGauge(prometheus.GaugeOpts{Name: "scrape_lock_wait", Help: "Time spent waiting on lock by last scrape request"})
)
func init() {
prometheus.MustRegister(hubLimit, hubSize, httpReceiveSizeFam, httpReceiveSizeDP, httpReceiveTime, parseTime,
grpcReceiveTime, grpcReceiveSizeDP, grpcReceiveSizeFam, scrapeLockWait)
}
// MetricHub serves as a replacement for the prometheus pushgateway. Accepts
// timestamps with metrics, and stores them in a queue to allow multiple
// datapoints per metric series to be scraped
type MetricHub struct {
metricFamiliesByName map[string]*familyAndMetrics
limit int
stats hubStats
sync.Mutex
scrapeTimeout int
}
// hubStats are for metrics that aren't worth exposing to prometheus, and also
// to provide a simpler way of exposing them in the `Debug` method since extracting
// values from the prometheus registry is not simple
type hubStats struct {
lastScrapeTime int64
lastScrapeSize int64
lastScrapeNumFamilies int
lastHTTPReceiveTime int64
lastHTTPReceiveSize int64
lastHTTPReceiveNumFamilies int
lastGRPCReceiveTime int64
lastGRPCReceiveSize int
lastGRPCReceiveNumFamilies int
currentCountFamilies int
currentCountSeries int
currentCountDatapoints int
}
func NewMetricHub(limit int, scrapeTimeout int) *MetricHub {
if limit > 0 {
glog.Infof("Prometheus-Edge-Hub created with a limit of %d\n", limit)
} else {
glog.Info("Prometheus-Edge-Hub created with no limit\n")
}
hubLimit.Set(float64(limit))
return &MetricHub{
metricFamiliesByName: make(map[string]*familyAndMetrics),
limit: limit,
scrapeTimeout: scrapeTimeout,
}
}
// Receive is a handler function to receive metric pushes
func (c *MetricHub) Receive(ctx echo.Context) error {
t0 := time.Now()
var (
err error
parser expfmt.TextParser
)
parsedFamilies, err := parser.TextToMetricFamilies(ctx.Request().Body)
if err != nil {
return ctx.String(http.StatusBadRequest, fmt.Sprintf("error parsing metrics: %v", err))
}
parseTime.Set(time.Since(t0).Seconds())
newDatapoints := 0
for _, fam := range parsedFamilies {
newDatapoints += len(fam.Metric)
}
// Check if new datapoints will exceed the specified limit
if c.limit > 0 {
if c.stats.currentCountDatapoints+newDatapoints > c.limit {
errString := fmt.Sprintf("Not accepting push of size %d. Would overfill hub limit of %d. Current hub size: %d\n", newDatapoints, c.limit, c.stats.currentCountDatapoints)
glog.Error(errString)
return ctx.String(http.StatusNotAcceptable, errString)
}
}
httpReceiveSizeDP.Set(float64(newDatapoints))
httpReceiveSizeFam.Set(float64(len(parsedFamilies)))
t2 := time.Now()
c.hubMetrics(parsedFamilies)
httpReceiveTime.Set(time.Since(t2).Seconds())
c.stats.lastHTTPReceiveTime = time.Now().Unix()
c.stats.lastHTTPReceiveSize = ctx.Request().ContentLength
c.stats.lastHTTPReceiveNumFamilies = len(parsedFamilies)
c.stats.currentCountDatapoints += newDatapoints
hubSize.Set(float64(c.stats.currentCountDatapoints))
return ctx.NoContent(http.StatusOK)
}
func (c *MetricHub) hubMetrics(families map[string]*dto.MetricFamily) {
c.Lock()
defer c.Unlock()
for _, fam := range families {
if families, ok := c.metricFamiliesByName[fam.GetName()]; ok {
families.addMetrics(fam.Metric)
} else {
c.metricFamiliesByName[fam.GetName()] = newFamilyAndMetrics(fam)
}
}
}
func (c *MetricHub) ReceiveGRPC(families []*dto.MetricFamily) {
t0 := time.Now()
c.Lock()
defer c.Unlock()
newDatapoints := 0
for _, fam := range families {
newDatapoints += len(fam.Metric)
}
// Check if new datapoints will exceed the specified limit
if c.limit > 0 {
if c.stats.currentCountDatapoints+newDatapoints > c.limit {
errString := fmt.Sprintf("Not accepting push of size %d. Would overfill hub limit of %d. Current hub size: %d\n", newDatapoints, c.limit, c.stats.currentCountDatapoints)
glog.Error(errString)
return
}
}
for _, fam := range families {
if families, ok := c.metricFamiliesByName[fam.GetName()]; ok {
families.addMetrics(fam.Metric)
} else {
c.metricFamiliesByName[fam.GetName()] = newFamilyAndMetrics(fam)
}
}
grpcReceiveTime.Set(time.Since(t0).Seconds())
log.Printf("GRPC Time: %v\n", time.Since(t0))
log.Printf("GRPC Time(seconds): %f\n", time.Since(t0).Seconds())
grpcReceiveSizeFam.Set(float64(len(families)))
grpcReceiveSizeDP.Set(float64(newDatapoints))
c.stats.lastGRPCReceiveTime = time.Now().Unix()
c.stats.lastGRPCReceiveNumFamilies = len(families)
c.stats.lastGRPCReceiveSize = binary.Size(families)
c.stats.currentCountDatapoints += newDatapoints
}
// Scrape is a handler function for prometheus scrape requests. Formats the
// metrics for scraping.
func (c *MetricHub) Scrape(ctx echo.Context) error {
c.Lock()
scrapeMetrics := c.metricFamiliesByName
c.clearMetrics()
c.Unlock()
expositionString := c.exposeMetrics(scrapeMetrics, scrapeWorkerPoolSize)
c.stats.lastScrapeTime = time.Now().Unix()
c.stats.lastScrapeSize = int64(len(expositionString))
c.stats.lastScrapeNumFamilies = len(scrapeMetrics)
c.stats.currentCountDatapoints = 0
hubSize.Set(0)
return ctx.String(http.StatusOK, expositionString)
}
func (c *MetricHub) clearMetrics() {
c.metricFamiliesByName = make(map[string]*familyAndMetrics)
}
func (c *MetricHub) exposeMetrics(metricFamiliesByName map[string]*familyAndMetrics, workers int) string {
fams := make(chan *familyAndMetrics, workers)
results := make(chan string, workers)
respCh := make(chan string, 1)
waitGroup := &sync.WaitGroup{}
for i := 0; i < workers; i++ {
waitGroup.Add(1)
go processFamilyWorker(fams, results, waitGroup)
}
go processFamilyStringsWorker(results, respCh)
for _, fam := range metricFamiliesByName {
fams <- fam
}
close(fams)
waitGroup.Wait()
close(results)
select {
case resp := <-respCh:
return resp
case <-time.After(time.Duration(c.scrapeTimeout) * time.Second):
log.Print("Timeout reached for building metrics string")
return ""
}
}
func processFamilyWorker(fams <-chan *familyAndMetrics, results chan<- string, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
for fam := range fams {
pullFamily := fam.popDatapoints()
familyStr, err := familyToString(pullFamily)
if err != nil {
log.Printf("metric %s dropped. error converting metric to string: %v", *pullFamily.Name, err)
} else {
results <- familyStr
}
}
}
func processFamilyStringsWorker(results <-chan string, respCh chan<- string) {
var resp strings.Builder
for result := range results {
resp.WriteString(result)
}
respCh <- resp.String()
}
// Debug is a handler function to show the current state of the hub without
// consuming any datapoints
func (c *MetricHub) Debug(ctx echo.Context) error {
verbose := ctx.QueryParam("verbose")
c.updateCountStats()
hostname, _ := os.Hostname()
var limitValue, utilizationValue string
if c.limit <= 0 {
limitValue = "None"
utilizationValue = "0"
} else {
limitValue = strconv.Itoa(c.limit)
utilizationValue = strconv.FormatFloat(float64(c.stats.currentCountDatapoints)*100/float64(c.limit), 'f', 2, 64)
}
debugString := fmt.Sprintf(`Prometheus Edge Hub running on %s
Hub Limit: %s
Hub Utilization: %s%%
Last Scrape: %d
Scrape Size: %d
Number of Familes: %d
Last HTTP Receive: %d
Receive Size: %d
Number of Families: %d
Last GRPC Receive: %d
Receive Size: %d
Number of families: %d
Current Count Families: %d
Current Count Series: %d
Current Count Datapoints: %d `, hostname, limitValue, utilizationValue,
c.stats.lastScrapeTime, c.stats.lastScrapeSize, c.stats.lastScrapeNumFamilies,
c.stats.lastHTTPReceiveTime, c.stats.lastHTTPReceiveSize, c.stats.lastHTTPReceiveNumFamilies,
c.stats.lastGRPCReceiveTime, c.stats.lastGRPCReceiveSize, c.stats.lastGRPCReceiveNumFamilies,
c.stats.currentCountFamilies, c.stats.currentCountSeries, c.stats.currentCountDatapoints)
if verbose != "" {
debugString += fmt.Sprintf("\n\nCurrent Exposition Text:\n%s\n", c.exposeMetrics(c.metricFamiliesByName, scrapeWorkerPoolSize))
}
return ctx.String(http.StatusOK, debugString)
}
func (c *MetricHub) updateCountStats() {
numFamilies := len(c.metricFamiliesByName)
numSeries := 0
numDatapoints := 0
for _, family := range c.metricFamiliesByName {
numSeries += len(family.metrics)
for _, series := range family.metrics {
numDatapoints += len(series)
}
}
c.stats.currentCountFamilies = numFamilies
c.stats.currentCountSeries = numSeries
c.stats.currentCountDatapoints = numDatapoints
}
type familyAndMetrics struct {
family *dto.MetricFamily
metrics map[string][]*dto.Metric
}
func newFamilyAndMetrics(family *dto.MetricFamily) *familyAndMetrics {
metrics := make(map[string][]*dto.Metric)
for _, metric := range family.Metric {
name := makeLabeledName(metric, family.GetName())
if metricQueue, ok := metrics[name]; ok {
if *metric.TimestampMs >= *metricQueue[len(metricQueue)-1].TimestampMs {
metrics[name] = append(metricQueue, metric)
} else {
metrics[name] = sortedInsert(metricQueue, metric)
}
} else {
metrics[name] = []*dto.Metric{metric}
}
}
// clear metrics in family because we are keeping them in the queues
family.Metric = nil
return &familyAndMetrics{
family: family,
metrics: metrics,
}
}
func (f *familyAndMetrics) addMetrics(newMetrics []*dto.Metric) {
// Keep array sorted [t0, t1, t2...] each insert
for _, metric := range newMetrics {
metricName := makeLabeledName(metric, f.family.GetName())
if queue, ok := f.metrics[metricName]; ok {
if *metric.TimestampMs >= *queue[len(queue)-1].TimestampMs {
f.metrics[metricName] = append(queue, metric)
} else {
f.metrics[metricName] = sortedInsert(queue, metric)
}
} else {
f.metrics[metricName] = []*dto.Metric{metric}
}
}
}
// Returns a prometheus MetricFamily populated with all datapoints, sorted so
// that the earliest datapoint appears first
func (f *familyAndMetrics) popDatapoints() *dto.MetricFamily {
pullFamily := f.copyFamily()
for _, queue := range f.metrics {
if len(queue) == 0 {
continue
}
pullFamily.Metric = append(pullFamily.Metric, queue...)
}
return &pullFamily
}
// return a copy of the MetricFamily that can be modified safely
func (f *familyAndMetrics) copyFamily() dto.MetricFamily {
return *f.family
}
// makeLabeledName builds a unique name from a metric LabelPairs
func makeLabeledName(metric *dto.Metric, metricName string) string {
labels := metric.GetLabel()
sort.Slice(labels, func(i, j int) bool {
return labels[i].GetName() < labels[j].GetName()
})
labeledName := strings.Builder{}
labeledName.WriteString(metricName)
for _, labelPair := range labels {
labeledName.WriteString(fmt.Sprintf("_%s_%s", labelPair.GetName(), labelPair.GetValue()))
}
return labeledName.String()
}
func familyToString(family *dto.MetricFamily) (string, error) {
var buf bytes.Buffer
_, err := expfmt.MetricFamilyToText(&buf, family)
if err != nil {
return "", fmt.Errorf("error writing family string: %v", err)
}
return buf.String(), nil
}
func WriteInternalMetrics() (string, error) {
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return "", err
}
str := strings.Builder{}
for _, fam := range metrics {
buf := bytes.Buffer{}
_, err := expfmt.MetricFamilyToText(&buf, fam)
if err != nil {
return "", err
}
str.WriteString(buf.String())
}
return str.String(), nil
}
func sortedInsert(data []*dto.Metric, el *dto.Metric) []*dto.Metric {
index := sort.Search(len(data), func(i int) bool { return *data[i].TimestampMs > *el.TimestampMs })
data = append(data, &dto.Metric{})
copy(data[index+1:], data[index:])
data[index] = el
return data
}