benchmarks/benchmark/tools/locust-load-inference/locust-custom-exporter/main.go (168 lines of code) (raw):
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/log"
"github.com/prometheus/common/version"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
namespace string
NameSpace *string
)
// Exporter structure
type Exporter struct {
uri string
mutex sync.RWMutex
fetch func(endpoint string) (io.ReadCloser, error)
locustAvgTokensSent,
locustAvgTokensReceived,
locustAvgTestTime,
locustAvgOutputTokenLatency,
locustTimeToFirstToken prometheus.Gauge
}
// NewExporter function
func NewExporter(uri string, timeout time.Duration) (*Exporter, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
var fetch func(endpoint string) (io.ReadCloser, error)
switch u.Scheme {
case "http", "https", "file":
fetch = fetchHTTP(uri, timeout)
default:
return nil, fmt.Errorf("unsupported scheme: %q", u.Scheme)
}
return &Exporter{
uri: uri,
fetch: fetch,
locustAvgTokensSent: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "custom_metrics",
Name: "avg_tokens_sent",
},
),
locustAvgTokensReceived: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "custom_metrics",
Name: "avg_tokens_received",
},
),
locustAvgTestTime: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "custom_metrics",
Name: "avg_test_time",
},
),
locustAvgOutputTokenLatency: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "custom_metrics",
Name: "avg_output_token_latency",
},
),
locustTimeToFirstToken: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "custom_metrics",
Name: "avg_time_to_first_token",
},
),
}, nil
}
// Describe function of Exporter
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
e.mutex.Lock()
defer e.mutex.Unlock()
ch <- e.locustAvgTokensSent.Desc()
ch <- e.locustAvgTokensReceived.Desc()
ch <- e.locustAvgTestTime.Desc()
ch <- e.locustAvgOutputTokenLatency.Desc()
ch <- e.locustTimeToFirstToken.Desc()
}
// Collect function of Exporter
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
e.mutex.Lock()
defer e.mutex.Unlock()
var locustStats locustStats
body, err := e.fetch("/stats/custom_metrics")
if err != nil {
log.Errorf("Can't scrape Pack: %v", err)
return
}
defer body.Close()
bodyAll, err := ioutil.ReadAll(body)
if err != nil {
return
}
_ = json.Unmarshal([]byte(bodyAll), &locustStats)
ch <- prometheus.MustNewConstMetric(e.locustAvgTokensSent.Desc(), prometheus.GaugeValue, float64(locustStats.AvgTokensSent))
ch <- prometheus.MustNewConstMetric(e.locustAvgTokensReceived.Desc(), prometheus.GaugeValue, float64(locustStats.AvgTokensReceived))
ch <- prometheus.MustNewConstMetric(e.locustAvgTestTime.Desc(), prometheus.GaugeValue, float64(locustStats.AvgTestTime))
ch <- prometheus.MustNewConstMetric(e.locustAvgOutputTokenLatency.Desc(), prometheus.GaugeValue, float64(locustStats.AvgOutputTokenLatency))
ch <- prometheus.MustNewConstMetric(e.locustTimeToFirstToken.Desc(), prometheus.GaugeValue, float64(locustStats.AvgTimeToFirstToken))
}
type locustStats struct {
AvgTokensSent float64 `json:"average-tokens-sent"`
AvgTokensReceived float64 `json:"average-tokens-received"`
AvgTestTime float64 `json:"average-test-time"`
AvgOutputTokenLatency float64 `json:"average-output-token-latency"`
AvgTimeToFirstToken float64 `json:"average-time-to-first-token"`
}
func fetchHTTP(uri string, timeout time.Duration) func(endpoint string) (io.ReadCloser, error) {
tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
client := http.Client{
Timeout: timeout,
Transport: tr,
}
return func(endpoint string) (io.ReadCloser, error) {
resp, err := client.Get(uri + endpoint)
if err != nil {
return nil, err
}
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
resp.Body.Close()
return nil, fmt.Errorf("HTTP status %d", resp.StatusCode)
}
return resp.Body, nil
}
}
func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":8080").Envar("locust_custom_exporter_WEB_LISTEN_ADDRESS").String()
metricsPath = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").Envar("locust_custom_exporter_WEB_TELEMETRY_PATH").String()
uri = kingpin.Flag("locust.uri", "URI of Locust.").Default("http://localhost:8089").Envar("locust_custom_exporter_URI").String()
NameSpace = kingpin.Flag("locust.namespace", "Namespace for prometheus metrics.").Default("locust").Envar("LOCUST_METRIC_NAMESPACE").String()
timeout = kingpin.Flag("locust.timeout", "Scrape timeout").Default("5s").Envar("locust_custom_exporter_TIMEOUT").Duration()
)
log.AddFlags(kingpin.CommandLine)
kingpin.Version(version.Print("locust_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()
namespace = *NameSpace
log.Infoln("Starting locust_custom_exporter", version.Info())
log.Infoln("Build context", version.BuildContext())
exporter, err := NewExporter(*uri, *timeout)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(exporter)
prometheus.MustRegister(version.NewCollector("locustexporter"))
http.Handle(*metricsPath, promhttp.Handler())
http.HandleFunc("/quitquitquit", func(http.ResponseWriter, *http.Request) { os.Exit(0) })
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`<html><head><title>Locust Exporter</title></head><body><h1>Locust Exporter</h1><p><a href='` + *metricsPath + `'>Metrics</a></p></body></html>`))
})
log.Infoln("Listening on", *listenAddress)
log.Fatal(http.ListenAndServe(*listenAddress, nil))
}