common/pkg/monitoring/monitoring.go (470 lines of code) (raw):
package monitoring
import (
"database/sql"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/go-logr/logr"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/fs2"
"github.com/prometheus/client_golang/prometheus"
)
// How a column value should be used.
// It will either be a label value applied to the entire MetricSet
// or a metric value.
type Usage string
const (
Label Usage = "label"
Gauge = "gauge"
Counter = "counter"
Histogram = "histogram"
volumeNameLabel = "volume_name"
)
const WORKER_COUNT = 3
var (
neededSystem = map[string]bool{
"cpu": true,
"cpuacct": true,
"memory": true,
}
// for test
getDuration = func(start time.Time) int64 {
return time.Now().Sub(start).Milliseconds()
}
)
// diskStats represents used and total disk space in bytes.
type diskStats struct {
Used int64
Total int64
}
// A set of metrics that will be reported to prometheus
// Metrics/labels are derived from the columns of the query.
// The set of metrics reported will be Namespace_Name_Metric.Name for every non-label
// Metric in the MetricSet.
//
// You must use ReadConfig or StartMonitoring to fill out this struct correctly.
type MetricSet struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
Query string `yaml:"query"`
Metrics []Metric `yaml:"metrics"`
}
// Specifies a metric within the MetricSet, its Name (which is also the column
// name that will provide its value), the Usage determines what kind of Metric
// this portion of the query represents
//
// When specifying a histogram metric the defined column name will be used as
// the base name of buckets+2 columns that must be in the MetricSet's query.
// `Column_key` for each bucket key and `Column_count`,`Column_sum` for the
// total event count and total sum of events.
type Metric struct {
Name string `yaml:"name"`
Desc string `yaml:"desc"`
Usage Usage `yaml:"usage"`
// internal
column string
// Only for Histograms, defines buckets of the histogram
Buckets map[string]float64 `yaml:"buckets,omitempty"`
}
// Allow users to pass in the driver specific db connector without this code
// needing a direct dependency to potential C code.
type DBFactory interface {
Open() (*sql.DB, error)
}
type Monitor struct {
DBFactory DBFactory
MetricSets []MetricSet
collectMS prometheus.Gauge
metricCount prometheus.Gauge
errCount prometheus.Gauge
log logr.Logger
}
func validPromName(n string) error {
// https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
validChars := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_:"
invalid := strings.Trim(n, validChars)
if len(invalid) != 0 {
return fmt.Errorf("invalid prometheus characters in %s: %q", n, invalid)
}
return nil
}
// Numbers pass through as oracle exponential form we need to avoid this.
func parseString(log logr.Logger, i interface{}) string {
switch v := i.(type) {
case string:
return v
case []byte:
return string(v)
case int, uint, int32, uint32, int64, uint64:
return fmt.Sprintf("%d", v)
case float32, float64:
return fmt.Sprintf("%f", v)
case time.Time:
return fmt.Sprintf("%d", v.Unix())
case nil: // Null values
return ""
}
log.Info("Failed to parse string", "type", fmt.Sprintf("%T", i), "input", i)
return ""
}
func parseUint64(log logr.Logger, i interface{}) uint64 {
switch v := i.(type) {
case string:
val, _ := strconv.ParseUint(v, 10, 64)
return val
case []byte:
val, _ := strconv.ParseUint(string(v), 10, 64)
return val
case int, uint, int32, uint32, uint64:
return v.(uint64)
case int64:
return uint64(v)
case float32, float64:
return uint64(v.(float64))
case time.Time:
return uint64(v.Unix())
case nil: // Null values
return 0
}
log.Info("Failed to parse uint", "type", fmt.Sprintf("%T", i), "input", i)
return 0
}
func parseFloat64(log logr.Logger, i interface{}) float64 {
switch v := i.(type) {
case string:
val, _ := strconv.ParseFloat(v, 10)
return val
case []byte:
val, _ := strconv.ParseFloat(string(v), 10)
return val
case int, uint, int32, uint32, int64, uint64:
return float64(v.(int64))
case float32, float64:
return v.(float64)
case time.Time:
return float64(v.Unix())
case nil: // Null values
return 0
}
log.Info("Failed to parse float", "type", fmt.Sprintf("%T", i), "input", i)
return 0
}
func sortedKeys(m map[string]float64) []string {
keys := []string{}
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// Given the database connection and the metricset, do the queries and make the
// metric. All metrics are float64 types, all labels must be strings. errCount
// should be atomically incremented for each error occured and will be reported
// by prometheus.
func queryMetrics(log logr.Logger, db *sql.DB, ms MetricSet, errCount *uint64) []prometheus.Metric {
rows, err := db.Query(ms.Query)
if err != nil {
atomic.AddUint64(errCount, 1)
log.Error(err, "Failed to query metric set", "query", ms.Query)
return nil
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
atomic.AddUint64(errCount, 1)
log.Error(err, "Failed to read query columns", "query", ms.Query)
return nil
}
// Force columns to match config (lower case).
for i := 0; i < len(columns); i++ {
columns[i] = strings.ToLower(columns[i])
}
labels := map[string]string{}
var metrics []prometheus.Metric
values := make([]interface{}, len(columns))
// Setup pointers to interfaces for splatting into rows.Scan
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
// Find the columns for our metrics based on Name.
// For histograms provide the bucket, sum, count column indexes in that
// order.
forMetric := make([]int, len(ms.Metrics))
forHistMetric := make([][]int, len(ms.Metrics))
columnToIdx := map[string]int{}
for i, c := range columns {
columnToIdx[c] = i
}
// Build mapping arrays.
for i, m := range ms.Metrics {
if m.Usage == Histogram {
// [b1,b1,...,bn,sum,count]
forHistMetric[i] = make([]int, len(m.Buckets)+2)
j := 0
for _, k := range sortedKeys(m.Buckets) {
if idx, found := columnToIdx[fmt.Sprintf("%s_%s", m.column, k)]; found {
forHistMetric[i][j] = idx
}
j += 1
}
if idx, found := columnToIdx[m.column+"_sum"]; found {
forHistMetric[i][j] = idx
}
if idx, found := columnToIdx[m.column+"_count"]; found {
forHistMetric[i][j+1] = idx
}
} else if idx, found := columnToIdx[m.column]; found {
forMetric[i] = idx
}
}
rowCount := 0
for rows.Next() {
rows.Scan(valuePtrs...)
rowCount += 1
// Build labels from the query first
for i := range ms.Metrics {
if ms.Metrics[i].Usage == Label {
labels[ms.Metrics[i].Name] = parseString(log, values[forMetric[i]])
}
}
// Build metrics
for i := range ms.Metrics {
m := ms.Metrics[i]
mDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", ms.Namespace, ms.Name, m.Name), m.Desc, nil, labels)
var metric prometheus.Metric
var err error
switch m.Usage {
case Counter:
metric, err = prometheus.NewConstMetric(mDesc, prometheus.CounterValue, parseFloat64(log, values[forMetric[i]]))
case Gauge:
metric, err = prometheus.NewConstMetric(mDesc, prometheus.GaugeValue, parseFloat64(log, values[forMetric[i]]))
case Histogram:
bucketVals := map[float64]uint64{}
j := 0
for _, k := range sortedKeys(m.Buckets) {
bucketVals[m.Buckets[k]] = parseUint64(log, values[forHistMetric[i][j]])
j += 1
}
sum := parseFloat64(log, values[forHistMetric[i][j]])
count := parseUint64(log, values[forHistMetric[i][j+1]])
metric, err = prometheus.NewConstHistogram(mDesc, count, sum, bucketVals)
case Label:
continue
}
if err != nil || metric == nil {
atomic.AddUint64(errCount, 1)
log.Error(err, "Failed to create prometheus metric", "desc", mDesc, "err", err, "metric", metric)
continue
}
metrics = append(metrics, metric)
}
}
if rowCount == 0 {
// This is likely due to bad row level security or a poor query.
// If you dont see metrics you expected this might be a cause so lets log it.
log.Info("Query returned no rows.", "query", ms.Query)
}
return metrics
}
// NewMonitor prepares a monitor that can be passed to prometheus as a
// Collector. Alternately you can use StartExporting to handle creation of the
// monitor and setting up promtheus.
func NewMonitor(log logr.Logger, db DBFactory, ms []MetricSet) *Monitor {
mon := &Monitor{
DBFactory: db,
MetricSets: ms,
collectMS: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "db_monitor",
Name: "collect_ms",
Help: "Number of milliseconds spent to collect metrics",
}),
metricCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "db_monitor",
Name: "metric_count",
Help: "Number of metrics collected successfully from config this cycle",
}),
errCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "db_monitor",
Name: "error_count",
Help: "Number of errors encountered while trying to collect metrics this cycle",
}),
log: log,
}
return mon
}
// Describe intentionally left blank as we will dynamically be generating metrics.
func (m *Monitor) Describe(ch chan<- *prometheus.Desc) {}
// Collect for prometheus.Collector interface, called when we should report metrics.
// TODO thread contexts.
func (m *Monitor) Collect(ch chan<- prometheus.Metric) {
start := time.Now()
errCount := uint64(0)
metricCount := uint64(0)
msQueue := make(chan MetricSet)
started := 0
var wg sync.WaitGroup
for i := 0; i < WORKER_COUNT; i++ {
// Ensure workers can do work before starting them.
db, err := m.DBFactory.Open()
if err != nil {
atomic.AddUint64(&errCount, 1)
continue
}
if err := db.Ping(); err != nil {
atomic.AddUint64(&errCount, 1)
m.log.Error(err, "failed to connect to database", "collector", i)
continue
}
wg.Add(1)
started += 1
go func(i int, db *sql.DB) {
defer wg.Done()
for {
ms, more := <-msQueue
if !more { // all metrics collected.
m.log.V(2).Info("done", "collector", i)
return
}
for _, metric := range queryMetrics(m.log, db, ms, &errCount) {
ch <- metric
atomic.AddUint64(&metricCount, 1)
}
}
}(i, db)
}
// Cant queue work if we didnt start any connections successfully
if started > 0 {
m.log.V(2).Info("queueing work")
for _, ms := range m.MetricSets {
msQueue <- ms
}
}
// Wait for metrics to be collected and reported.
close(msQueue)
wg.Wait()
duration := getDuration(start)
m.collectMS.Set(float64(duration))
m.metricCount.Set(float64(metricCount))
m.errCount.Set(float64(errCount))
ch <- m.collectMS
ch <- m.metricCount
ch <- m.errCount
m.log.Info("reported metrics", "count", metricCount, "errors", errCount, "time(ms)", duration)
}
// VolumeMetrics specifies a pod volume metrics
type VolumeMetrics struct {
// usage of volume in bytes
Usage prometheus.Gauge
// total available space of volume in bytes
Total prometheus.Gauge
}
// VolumeInfo provides required information to expose pod volume metrics
type VolumeInfo struct {
Mount string
Name string
}
// DBContainerMonitor metrics followed http://google3/configs/monitoring/cloud_pulse_monarch/kubernetes/metrics_def_core
// this is a workaround if a platform does not provide system metrics(container CPU/memory, volumes) to DB users.
type DBContainerMonitor struct {
// usage of memory in bytes
MemoryUsage prometheus.Gauge
// memory used for cache
MemoryCacheUsage prometheus.Gauge
mountToMetrics map[string]VolumeMetrics
collectMS prometheus.Gauge
metricCount prometheus.Gauge
errCount prometheus.Gauge
log logr.Logger
}
// NewDBContainerMonitor prepares a monitor that can be passed to prometheus as a
// Collector to collect container system(memory/CPU) metrics
func NewDBContainerMonitor(log logr.Logger, volumes []VolumeInfo) *DBContainerMonitor {
mon := &DBContainerMonitor{
MemoryUsage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_container",
Name: "memory_used_bytes",
Help: "Memory usage in bytes.",
}),
MemoryCacheUsage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_container",
Name: "memory_cache_used_bytes",
Help: "Cache memory usage in bytes.",
}),
collectMS: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_system_monitor",
Name: "collect_ms",
Help: "Number of milliseconds spent to collect metrics",
}),
metricCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_system_monitor",
Name: "metric_count",
Help: "Number of metrics collected successfully from config this cycle",
}),
errCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_system_monitor",
Name: "error_count",
Help: "Number of errors encountered while trying to collect metrics this cycle",
}),
log: log,
}
if len(volumes) > 0 {
mon.mountToMetrics = make(map[string]VolumeMetrics)
}
for _, v := range volumes {
log.Info("Adding volume metrics", "volume", v.Name, "mount", v.Mount)
mon.mountToMetrics[v.Mount] = VolumeMetrics{
Usage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_pod_volume",
Name: "used_bytes",
Help: "Number of disk bytes used by the pod volume.",
ConstLabels: map[string]string{volumeNameLabel: v.Name},
}),
Total: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ods_pod_volume",
Name: "total_bytes",
Help: "Total number of disk bytes available to the pod volume.",
ConstLabels: map[string]string{volumeNameLabel: v.Name},
}),
}
}
return mon
}
func (m *DBContainerMonitor) Collect(ch chan<- prometheus.Metric) {
start := time.Now()
errCount := uint64(0)
metricCount := uint64(0)
if cstats, err := cgroupStats(m.log); err == nil {
m.MemoryUsage.Set(float64(cstats.MemoryStats.Usage.Usage))
m.MemoryCacheUsage.Set(float64(cstats.MemoryStats.Cache))
for _, m := range []prometheus.Metric{m.MemoryUsage, m.MemoryCacheUsage} {
ch <- m
metricCount++
}
// Total CPU time consumed in seconds. `
// unit follow http://google3/configs/monitoring/cloud_pulse_monarch/kubernetes/metrics_def_core;l=51;rcl=448039408
if CPUTotalUsage, err := prometheus.NewConstMetric(
prometheus.NewDesc("ods_container_cpu_usage_time_seconds", "Cumulative CPU usage on all cores used by the container in seconds.", nil, nil),
prometheus.CounterValue,
float64(cstats.CpuStats.CpuUsage.TotalUsage/1000_000_000), // in seconds
); err == nil {
ch <- CPUTotalUsage
metricCount++
} else {
m.log.Error(err, "error while reporting CPU metrics")
errCount++
}
} else {
m.log.Error(err, "error while parsing cgroup for metrics")
errCount++
}
for mount, metrics := range m.mountToMetrics {
if stats, err := dStats(mount); err == nil {
metrics.Total.Set(float64(stats.Total))
metrics.Usage.Set(float64(stats.Used))
ch <- metrics.Total
ch <- metrics.Usage
metricCount += 2
} else {
m.log.Error(err, "error while reading disk stats", "mount", mount)
errCount++
}
}
duration := getDuration(start)
m.collectMS.Set(float64(duration))
m.metricCount.Set(float64(metricCount))
m.errCount.Set(float64(errCount))
ch <- m.collectMS
ch <- m.metricCount
ch <- m.errCount
m.log.Info("reported metrics", "count", metricCount, "errors", errCount, "time(ms)", duration)
}
// Describe intentionally left blank as we will dynamically be generating metrics.
func (m *DBContainerMonitor) Describe(ch chan<- *prometheus.Desc) {}
// followed https://github.com/google/cadvisor/blob/3beb265804ea4b00dc8ed9125f1f71d3328a7a94/container/libcontainer/helpers.go#L95
var cgroupStats = func(log logr.Logger) (*cgroups.Stats, error) {
if cgroups.IsCgroup2UnifiedMode() {
m, err := fs2.NewManager(nil, fs2.UnifiedMountpoint)
if err != nil {
return nil, err
}
return m.GetStats()
}
mounts, err := cgroups.GetCgroupMounts(true)
if err != nil {
return nil, err
}
paths := make(map[string]string, len(mounts))
for _, m := range mounts {
for _, subsystem := range m.Subsystems {
if !neededSystem[subsystem] {
continue
}
if existing, ok := paths[subsystem]; ok {
log.Info("skipping current mount point for a sub system", "existing mount", existing, "current mount", m.Mountpoint)
continue
}
paths[subsystem] = m.Mountpoint
}
}
mgr, err := fs.NewManager(nil, paths)
if err != nil {
return nil, err
}
return mgr.GetStats()
}
// dStats returns the diskStats for the provided mount point, or an error.
var dStats = func(path string) (diskStats, error) {
f := syscall.Statfs_t{}
err := syscall.Statfs(path, &f)
if err != nil {
return diskStats{}, err
}
var stats diskStats
stats.Total = int64(f.Blocks) * f.Bsize
free := int64(f.Bfree) * f.Bsize
stats.Used = stats.Total - free
return stats, nil
}