pkg/monitoring/server.go (158 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE.txt file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package monitoring
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log"
)
const defaultFailureThreshold = 3
var (
clientErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "client_errors_total",
Help: "The total number of errors raised by a client",
}, []string{"client", "type"})
clientSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "client_success_total",
Help: "The total number of successful call to a metrics server",
}, []string{"client", "type"})
metrics = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "metrics_count",
Help: "The current number of metrics served by this metrics server",
}, []string{"client", "type"})
)
type Counters struct {
CustomMetrics map[string]int `json:"customMetrics,omitempty"`
ExternalMetrics map[string]int `json:"externalMetrics,omitempty"`
}
func NewCounters() *Counters {
return &Counters{
CustomMetrics: make(map[string]int),
ExternalMetrics: make(map[string]int),
}
}
func NewServer(metricServers []config.MetricServer, port int, failureThreshold int) *Server {
if failureThreshold == 0 {
failureThreshold = defaultFailureThreshold
}
clientSuccesses := NewCounters()
for _, clientCfg := range metricServers {
if clientCfg.MetricTypes.HasType(config.CustomMetricType) {
clientSuccesses.CustomMetrics[clientCfg.Name] = 0
}
if clientCfg.MetricTypes.HasType(config.ExternalMetricType) {
clientSuccesses.ExternalMetrics[clientCfg.Name] = 0
}
}
return &Server{
logger: log.ForPackage("monitoring"),
lock: sync.RWMutex{},
metricServers: metricServers,
monitoringPort: port,
clientFailures: NewCounters(),
clientSuccesses: clientSuccesses,
failureThreshold: failureThreshold,
}
}
type Server struct {
logger logr.Logger
lock sync.RWMutex
metricServers []config.MetricServer
monitoringPort int
failureThreshold int
clientFailures *Counters
clientSuccesses *Counters
}
func (m *Server) OnError(c client.Interface, metricType config.MetricType, err error) {
clientName := c.GetConfiguration().Name
m.lock.Lock()
defer m.lock.Unlock()
if metricType == config.CustomMetricType {
m.clientFailures.CustomMetrics[clientName]++
}
if metricType == config.ExternalMetricType {
m.clientFailures.ExternalMetrics[clientName]++
}
clientErrors.WithLabelValues(c.GetConfiguration().Name, string(metricType)).Inc()
}
func (m *Server) UpdateExternalMetrics(c client.Interface, ems map[provider.ExternalMetricInfo]struct{}) {
clientName := c.GetConfiguration().Name
m.lock.Lock()
defer m.lock.Unlock()
// reset client failures as we got some metrics
m.clientFailures.ExternalMetrics[clientName] = 0
// increment success counters
m.clientSuccesses.ExternalMetrics[clientName]++
clientSuccess.WithLabelValues(c.GetConfiguration().Name, string(config.ExternalMetricType)).Inc()
// update external metrics stats
metrics.WithLabelValues(c.GetConfiguration().Name, string(config.ExternalMetricType)).Set(float64(len(ems)))
}
func (m *Server) UpdateCustomMetrics(c client.Interface, cms map[provider.CustomMetricInfo]struct{}) {
clientName := c.GetConfiguration().Name
m.lock.Lock()
defer m.lock.Unlock()
// reset client failures as we got some metrics
m.clientFailures.CustomMetrics[clientName] = 0
// increment success counters
m.clientSuccesses.CustomMetrics[clientName]++
clientSuccess.WithLabelValues(c.GetConfiguration().Name, string(config.CustomMetricType)).Inc()
// update custom metrics stats
metrics.WithLabelValues(c.GetConfiguration().Name, string(config.CustomMetricType)).Set(float64(len(cms)))
}
func (m *Server) Start() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/readyz", m.readyHandler)
_ = http.ListenAndServe(fmt.Sprintf(":%d", m.monitoringPort), nil)
}
func (m *Server) readyHandler(writer http.ResponseWriter, _ *http.Request) {
status := http.StatusOK
health, err := m.isReadyAndHealthy()
if err != nil {
status = http.StatusServiceUnavailable
}
if err := writeJSONResponse(writer, status, health); err != nil {
m.logger.Error(err, "Failed to write readiness endpoint status to client")
}
}
func (m *Server) isReadyAndHealthy() (ClientsHealthResponse, error) {
m.lock.RLock()
defer m.lock.RUnlock()
healthResponse := ClientsHealthResponse{ClientFailures: m.clientFailures, ClientOk: m.clientSuccesses}
for _, server := range m.metricServers {
if customMetricsSuccess, hasCustomMetrics := m.clientSuccesses.CustomMetrics[server.Name]; hasCustomMetrics && customMetricsSuccess == 0 {
return healthResponse, errors.New("client has not retrieved an initial set of custom metrics yet")
}
if externalMetricsSuccess, hasExternalMetrics := m.clientSuccesses.ExternalMetrics[server.Name]; hasExternalMetrics && externalMetricsSuccess == 0 {
return healthResponse, errors.New("client has not retrieved an initial set of external metrics yet")
}
failures := m.clientFailures.CustomMetrics[server.Name]
if failures >= m.failureThreshold {
return healthResponse, fmt.Errorf("client got %d consecutive failures while retrieving custom metrics", failures)
}
failures = m.clientFailures.ExternalMetrics[server.Name]
if failures >= m.failureThreshold {
return healthResponse, fmt.Errorf("client got %d consecutive failures while retrieving external metrics", failures)
}
}
return healthResponse, nil
}
type ClientsHealthResponse struct {
ClientFailures *Counters `json:"consecutiveFailures,omitempty"`
ClientOk *Counters `json:"successTotal,omitempty"`
}
func writeJSONResponse(w http.ResponseWriter, code int, resp interface{}) error {
enc, err := json.MarshalIndent(resp, "", "\t")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return err
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, err = w.Write(enc)
if err != nil {
return err
}
return nil
}