opentelemetry_collector/receiver/nginxreceiver/nginx_stats_collector.go (187 lines of code) (raw):
package nginxreceiver
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"github.com/googlecloudplatform/appengine-sidecars-docker/opentelemetry_collector/receiver/metricgenerator"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus"
)
// NginxStatsCollector is a struct that generates metrics by polling the nginx status page at statsURL.
type NginxStatsCollector struct {
consumer consumer.Metrics
now func() time.Time
startTime time.Time
done chan struct{}
logger *zap.Logger
getStatus func(string) (resp *http.Response, err error)
exportInterval time.Duration
statsURL string
}
// LatencyStats is a struct to parse the latency stats json into.
type LatencyStats struct {
RequestCount int64 `json:"request_count"`
LatencySum int64 `json:"latency_sum"`
SumSquares int64 `json:"sum_squares"`
Distribution []int64 `json:"distribution"`
}
// NginxStats is a struct to parse the nginx stats json into.
type NginxStats struct {
RequestLatency LatencyStats `json:"request_latency"`
UpstreamLatency LatencyStats `json:"upstream_latency"`
WebsocketLatency LatencyStats `json:"websocket_latency"`
LatencyBucketBounds []float64 `json:"latency_bucket_bounds"`
}
// NewNginxStatsCollector creates a new NginxStatsCollector that generates metrics
// based on nginx stats found by polling the url
func NewNginxStatsCollector(interval time.Duration, statsURL string, logger *zap.Logger, consumer consumer.Metrics) (*NginxStatsCollector, error) {
if interval <= 0 {
return nil, errors.New("ExportInterval must be greater than 0")
}
if _, err := url.ParseRequestURI(statsURL); err != nil {
return nil, fmt.Errorf("StatsURL %s is not valid: %v", statsURL, err)
}
collector := &NginxStatsCollector{
consumer: consumer,
now: time.Now,
done: make(chan struct{}),
logger: logger,
exportInterval: interval,
statsURL: statsURL,
getStatus: http.Get,
}
return collector, nil
}
// StartCollection starts a go routine that periodically polls nginx for stats and exports metrics based on them.
func (collector *NginxStatsCollector) StartCollection() {
collector.startTime = collector.now()
go func() {
ticker := time.NewTicker(collector.exportInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
collector.scrapeAndExport()
case <-collector.done:
return
}
}
}()
}
// StopCollection stops the polling for nginx stats and the export of the metrics.
func (collector *NginxStatsCollector) StopCollection() {
close(collector.done)
}
// Get the stats from the nginx latency status module and parse them into the NginxStats struct.
func (collector *NginxStatsCollector) scrapeNginxStats() (*NginxStats, error) {
resp, err := collector.getStatus(collector.statsURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Error getting nginx stats. status code: %d content: %s", resp.StatusCode, body)
}
if err != nil {
return nil, err
}
return readStatsJSON(body)
}
// readStatsJSON parses the stats JSON and sets defaults.
func readStatsJSON(statsJSON []byte) (*NginxStats, error) {
// Setting the default int value to -1 makes it possible to tell when a value is missing from the json
// since the regular default is 0, which is a valid value for the stats.
stats := NginxStats{
RequestLatency: LatencyStats{
RequestCount: -1,
LatencySum: -1,
SumSquares: -1,
},
UpstreamLatency: LatencyStats{
RequestCount: -1,
LatencySum: -1,
SumSquares: -1,
},
WebsocketLatency: LatencyStats{
RequestCount: -1,
LatencySum: -1,
SumSquares: -1,
},
}
if err := json.Unmarshal(statsJSON, &stats); err != nil {
return nil, err
}
return &stats, nil
}
func (collector *NginxStatsCollector) appendDistributionMetric(
stats *LatencyStats,
bucketOptions *metricspb.DistributionValue_BucketOptions,
metrics []*metricspb.Metric,
descriptor *metricspb.MetricDescriptor) []*metricspb.Metric {
sumSquaredDeviation := metricgenerator.GetSumOfSquaredDeviationsFromIntDist(
stats.LatencySum, stats.SumSquares, stats.RequestCount)
timeseries := metricgenerator.MakeDistributionTimeSeries(
stats.Distribution,
float64(stats.LatencySum),
sumSquaredDeviation,
stats.RequestCount,
collector.startTime,
collector.now(),
bucketOptions,
[]*metricspb.LabelValue{},
)
return append(metrics, &metricspb.Metric{
MetricDescriptor: descriptor,
Timeseries: []*metricspb.TimeSeries{timeseries},
},
)
}
func (stats *LatencyStats) checkConsistency(bounds []float64) error {
if len(bounds) == 0 || len(stats.Distribution) == 0 {
return errors.New("One of the distribution values from the stats json is unset")
}
if len(bounds)+1 != len(stats.Distribution) {
return errors.New("The length of the latency distribution and distribution bucket boundaries do not match")
}
if stats.RequestCount < 0 {
return errors.New("The request count is less than 0")
}
if stats.SumSquares < 0 {
return errors.New("The sum of squared latencies is less than 0")
}
if stats.LatencySum < 0 {
return errors.New("The sum of latencies is less than 0")
}
for _, count := range stats.Distribution {
if count < 0 {
return errors.New("One of the latency distribution counts is less than 0")
}
}
return nil
}
func (collector *NginxStatsCollector) scrapeAndExport() {
metrics := make([]*metricspb.Metric, 0, 3)
stats, err := collector.scrapeNginxStats()
if err != nil {
collector.logger.Error("Could not read nginx stats", zap.Error(err))
} else {
bucketOptions := metricgenerator.FormatBucketOptions(stats.LatencyBucketBounds)
if err = stats.RequestLatency.checkConsistency(stats.LatencyBucketBounds); err != nil {
collector.logger.Error("Invalid value received for RequestLatency", zap.Error(err))
} else {
metrics = collector.appendDistributionMetric(&stats.RequestLatency, bucketOptions, metrics, requestLatencyMetric)
}
if err = stats.WebsocketLatency.checkConsistency(stats.LatencyBucketBounds); err != nil {
collector.logger.Error("Invalid value received for WebsocketLatency", zap.Error(err))
} else {
metrics = collector.appendDistributionMetric(&stats.WebsocketLatency, bucketOptions, metrics, websocketLatencyMetric)
}
if err = stats.UpstreamLatency.checkConsistency(stats.LatencyBucketBounds); err != nil {
collector.logger.Error("Invalid value received for UpstreamLatency", zap.Error(err))
} else {
metrics = collector.appendDistributionMetric(&stats.UpstreamLatency, bucketOptions, metrics, upstreamLatencyMetric)
}
}
ctx := context.Background()
err = collector.consumer.ConsumeMetrics(ctx, opencensus.OCToMetrics(nil, nil, metrics))
if err != nil {
collector.logger.Error("Error sending nginx metrics", zap.Error(err))
}
}