util/metrics/logmetrics/log.go (228 lines of code) (raw):
/*
Copyright 2021 The TestGrid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logmetrics
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"bitbucket.org/creachadair/stringset"
"github.com/GoogleCloudPlatform/testgrid/util/metrics"
"github.com/sirupsen/logrus"
)
// Valuer extends a metric to include a report on its values.
type Valuer interface {
metrics.Metric
Values() map[string]map[string]interface{}
}
// Reporter is a collection of metric values to report.
type Reporter []Valuer
// ReportNow reports all metrics once, immediately
func (r *Reporter) ReportNow(log logrus.FieldLogger) {
if log == nil {
log = logrus.New()
}
for _, metric := range *r {
log := log.WithField("metric", metric.Name())
for field, values := range metric.Values() {
log := log.WithField("field", field)
for value, measurement := range values {
log = log.WithField(value, measurement)
}
log.Info("Current status")
}
}
}
// Report the status of its metrics every freq until the context expires.
func (r *Reporter) Report(ctx context.Context, log logrus.FieldLogger, freq time.Duration) error {
if log == nil {
log = logrus.New()
}
ticker := time.NewTicker(freq)
defer ticker.Stop()
names := stringset.NewSize(len(*r))
metricMap := make(map[string]int, len(*r))
for i, m := range *r {
name := m.Name()
names.Add(name)
metricMap[name] = i
}
for {
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}
for _, name := range names.Elements() {
i := metricMap[name]
metric := (*r)[i]
log := log.WithField("metric", metric.Name())
for field, values := range metric.Values() {
log := log.WithField("field", field)
for value, measurement := range values {
log = log.WithField(value, measurement)
}
log.Info("Current status")
}
}
}
}
// Int64 configures a new Int64 metric to report.
func (r *Reporter) Int64(name, desc string, log logrus.FieldLogger, fields ...string) metrics.Int64 {
current := make([]map[string][]int64, len(fields))
for i := range fields {
current[i] = make(map[string][]int64, 1)
}
out := &logInt64{
name: name,
desc: desc,
log: log,
fields: fields,
current: current,
}
*r = append(*r, out)
return out
}
// Counter configures a new Counter metric to report
func (r *Reporter) Counter(name, desc string, log logrus.FieldLogger, fields ...string) metrics.Counter {
current := make([]map[string]int64, len(fields))
previous := make([]map[string]int64, len(fields))
for i := range fields {
current[i] = make(map[string]int64, 1)
previous[i] = make(map[string]int64, 1)
}
out := &logCounter{
name: name,
desc: desc,
log: log,
fields: fields,
current: current,
previous: previous,
last: time.Now(),
}
*r = append(*r, out)
return out
}
type logInt64 struct {
name string
desc string
fields []string
log logrus.FieldLogger
current []map[string][]int64
lock sync.Mutex
}
// Name returns the metric's name.
func (m *logInt64) Name() string {
return m.name
}
// Set the metric's value to the given number.
func (m *logInt64) Set(n int64, fields ...string) {
m.lock.Lock()
defer m.lock.Unlock()
if len(fields) != len(m.fields) {
m.log.WithFields(logrus.Fields{
"want": m.fields,
"got": fields,
}).Fatal("Wrong number of fields")
}
for i, fieldValue := range fields {
m.current[i][fieldValue] = append(m.current[i][fieldValue], n)
}
}
func (m *logInt64) Values() map[string]map[string]interface{} {
m.lock.Lock()
defer m.lock.Unlock()
trend := make(map[string]map[string]interface{}, len(m.current))
for i, field := range m.fields {
current := m.current[i]
if len(current) == 0 {
continue
}
trend[field] = make(map[string]interface{}, len(current))
for fieldValue, measurements := range current {
trend[field][fieldValue] = mean{measurements}
delete(current, fieldValue)
}
}
return trend
}
type mean struct {
values []int64
}
func (m mean) String() string {
tot := len(m.values)
switch tot {
case 0:
return "0 values"
case 1:
return strconv.FormatInt(m.values[0], 10)
}
var val float64
n := float64(tot)
for _, v := range m.values {
val += (float64(v) / n)
}
return fmt.Sprintf("%s average (%d values)", strconv.FormatFloat(val, 'g', 3, 64), tot)
}
type logCounter struct {
name string
desc string
fields []string
log logrus.FieldLogger
current []map[string]int64
previous []map[string]int64
last time.Time
lock sync.Mutex
}
// Name returns the metric's name.
func (m *logCounter) Name() string {
return m.name
}
// Add the given number to the counter.
func (m *logCounter) Add(n int64, fields ...string) {
m.lock.Lock()
defer m.lock.Unlock()
if len(fields) != len(m.fields) {
m.log.WithFields(logrus.Fields{
"want": m.fields,
"got": fields,
}).Fatal("Wrong number of fields")
}
if n < 0 {
m.log.WithField("value", n).Fatal("Negative value are invalid")
}
for i, fieldValue := range fields {
m.current[i][fieldValue] += n
}
}
func (m *logCounter) Values() map[string]map[string]interface{} {
m.lock.Lock()
defer m.lock.Unlock()
dur := time.Since(m.last)
m.last = time.Now()
rate := make(map[string]map[string]interface{}, len(m.current))
for i, field := range m.fields {
current := m.current[i]
previous := m.previous[i]
rate[field] = make(map[string]interface{}, len(current))
for fieldValue, now := range current {
delta := now - previous[fieldValue]
rate[field][fieldValue] = gauge{now, delta, dur}
previous[fieldValue] = now
}
}
return rate
}
type gauge struct {
total int64
delta int64
dur time.Duration
}
func (g gauge) qps() string {
s := g.dur.Seconds()
if s == 0 {
return "0 per second"
}
qps := float64(g.delta) / s
if qps == 0 {
return "0 per second"
}
if qps > 0.5 {
return fmt.Sprintf("%.2f per second", qps)
}
seconds := time.Duration(1 / qps * float64(time.Second))
seconds = seconds.Round(time.Millisecond)
return fmt.Sprintf("once per %s", seconds)
}
func (g gauge) String() string {
return fmt.Sprintf("%s (%d total)", g.qps(), g.total)
}