pkg/metrics/scheduler.go (333 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
const (
SchedulingError = "error"
SortingApp = "app"
SortingQueue = "queue"
NodeActive = "active"
NodeDraining = "draining"
NodeDecommissioned = "decommissioned"
)
var resourceUsageRangeBuckets = []string{
"[0,10%]",
"(10%,20%]",
"(20%,30%]",
"(30%,40%]",
"(40%,50%]",
"(50%,60%]",
"(60%,70%]",
"(70%,80%]",
"(80%,90%]",
"(90%,100%]",
}
// SchedulerMetrics to declare scheduler metrics
type SchedulerMetrics struct {
containerAllocation *prometheus.CounterVec
applicationSubmission *prometheus.CounterVec
application *prometheus.GaugeVec
node *prometheus.GaugeVec
nodeResourceUsage map[string]*prometheus.GaugeVec
schedulingLatency prometheus.Histogram
sortingLatency *prometheus.HistogramVec
tryNodeLatency prometheus.Histogram
tryPreemptionLatency prometheus.Histogram
lock locking.RWMutex
}
// InitSchedulerMetrics to initialize scheduler metrics
func InitSchedulerMetrics() *SchedulerMetrics {
s := &SchedulerMetrics{
lock: locking.RWMutex{},
}
s.nodeResourceUsage = make(map[string]*prometheus.GaugeVec) // Note: This map might be updated at runtime
s.containerAllocation = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "container_allocation_attempt_total",
Help: "Total number of attempts to allocate containers. State of the attempt includes `allocated`, `rejected`, `error`, `released`",
}, []string{"state"})
s.applicationSubmission = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "application_submission_total",
Help: "Total number of application submissions. State of the attempt includes `new`, `accepted` and `rejected`.",
}, []string{"result"})
s.application = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "application_total",
Help: "Total number of applications. State of the application includes `running`, `resuming`, `failing`, `completing`, `completed` and `failed`.",
}, []string{"state"})
s.node = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "node",
Help: "Total number of nodes. State of the node includes `active` and `failed`.",
}, []string{"state"})
s.schedulingLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "scheduling_latency_milliseconds",
Help: "Latency of the main scheduling routine, in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 10, 8), // start from 0.1ms
},
)
s.sortingLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "node_sorting_latency_milliseconds",
Help: "Latency of all nodes sorting, in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 10, 8), // start from 0.1ms
}, []string{"level"})
s.tryNodeLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "trynode_latency_milliseconds",
Help: "Latency of node condition checks for container allocations, such as placement constraints, in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 10, 8),
},
)
s.tryPreemptionLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "trypreemption_latency_milliseconds",
Help: "Latency of preemption condition checks for container allocations, in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 10, 8),
},
)
// Register the metrics
var metricsList = []prometheus.Collector{
s.containerAllocation,
s.applicationSubmission,
s.application,
s.node,
s.schedulingLatency,
s.sortingLatency,
s.tryNodeLatency,
s.tryPreemptionLatency,
}
for _, metric := range metricsList {
if err := prometheus.Register(metric); err != nil {
log.Log(log.Metrics).Warn("failed to register metrics collector", zap.Error(err))
}
}
return s
}
// Reset all metrics that implement the Reset functionality.
// should only be used in tests
func (m *SchedulerMetrics) Reset() {
m.node.Reset()
m.application.Reset()
m.applicationSubmission.Reset()
m.containerAllocation.Reset()
}
func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
func (m *SchedulerMetrics) ObserveSchedulingLatency(start time.Time) {
m.schedulingLatency.Observe(SinceInSeconds(start))
}
func (m *SchedulerMetrics) ObserveAppSortingLatency(start time.Time) {
m.sortingLatency.WithLabelValues(SortingApp).Observe(SinceInSeconds(start))
}
func (m *SchedulerMetrics) ObserveQueueSortingLatency(start time.Time) {
m.sortingLatency.WithLabelValues(SortingQueue).Observe(SinceInSeconds(start))
}
func (m *SchedulerMetrics) ObserveTryNodeLatency(start time.Time) {
m.tryNodeLatency.Observe(SinceInSeconds(start))
}
func (m *SchedulerMetrics) ObserveTryPreemptionLatency(start time.Time) {
m.tryPreemptionLatency.Observe(SinceInSeconds(start))
}
func (m *SchedulerMetrics) AddAllocatedContainers(value int) {
m.containerAllocation.WithLabelValues(ContainerAllocated).Add(float64(value))
}
func (m *SchedulerMetrics) getAllocatedContainers() (int, error) {
metricDto := &dto.Metric{}
err := m.containerAllocation.WithLabelValues(ContainerAllocated).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) AddReleasedContainers(value int) {
m.containerAllocation.WithLabelValues(ContainerReleased).Add(float64(value))
}
func (m *SchedulerMetrics) getReleasedContainers() (int, error) {
metricDto := &dto.Metric{}
err := m.containerAllocation.WithLabelValues(ContainerReleased).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) AddRejectedContainers(value int) {
m.containerAllocation.WithLabelValues(ContainerRejected).Add(float64(value))
}
func (m *SchedulerMetrics) IncSchedulingError() {
m.containerAllocation.WithLabelValues(SchedulingError).Inc()
}
func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) {
metricDto := &dto.Metric{}
err := m.containerAllocation.WithLabelValues(SchedulingError).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsNew() {
m.applicationSubmission.WithLabelValues(AppNew).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsNew() (int, error) {
metricDto := &dto.Metric{}
err := m.applicationSubmission.WithLabelValues(AppNew).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsAccepted() {
m.applicationSubmission.WithLabelValues(AppAccepted).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) {
metricDto := &dto.Metric{}
err := m.applicationSubmission.WithLabelValues(AppAccepted).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsRejected() {
m.applicationSubmission.WithLabelValues(AppRejected).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) {
metricDto := &dto.Metric{}
err := m.applicationSubmission.WithLabelValues(AppRejected).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsRunning() {
m.application.WithLabelValues(AppRunning).Inc()
}
func (m *SchedulerMetrics) DecTotalApplicationsRunning() {
m.application.WithLabelValues(AppRunning).Dec()
}
func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
err := m.application.WithLabelValues(AppRunning).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsFailing() {
m.application.WithLabelValues(AppFailing).Inc()
}
func (m *SchedulerMetrics) DecTotalApplicationsFailing() {
m.application.WithLabelValues(AppFailing).Dec()
}
func (m *SchedulerMetrics) IncTotalApplicationsFailed() {
m.application.WithLabelValues(AppFailed).Inc()
}
func (m *SchedulerMetrics) IncTotalApplicationsCompleting() {
m.application.WithLabelValues(AppCompleting).Inc()
}
func (m *SchedulerMetrics) DecTotalApplicationsCompleting() {
m.application.WithLabelValues(AppCompleting).Dec()
}
func (m *SchedulerMetrics) IncTotalApplicationsResuming() {
m.application.WithLabelValues(AppResuming).Inc()
}
func (m *SchedulerMetrics) DecTotalApplicationsResuming() {
m.application.WithLabelValues(AppResuming).Dec()
}
func (m *SchedulerMetrics) GetTotalApplicationsResuming() (int, error) {
metricDto := &dto.Metric{}
err := m.application.WithLabelValues(AppResuming).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsCompleted() {
m.application.WithLabelValues(AppCompleted).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
metricDto := &dto.Metric{}
err := m.application.WithLabelValues(AppCompleted).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncActiveNodes() {
m.node.WithLabelValues(NodeActive).Inc()
}
func (m *SchedulerMetrics) DecActiveNodes() {
m.node.WithLabelValues(NodeActive).Dec()
}
func (m *SchedulerMetrics) IncFailedNodes() {
m.node.WithLabelValues(AppFailed).Inc()
}
func (m *SchedulerMetrics) DecFailedNodes() {
m.node.WithLabelValues(AppFailed).Dec()
}
func (m *SchedulerMetrics) GetFailedNodes() (int, error) {
metricDto := &dto.Metric{}
err := m.node.WithLabelValues(AppFailed).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) SetNodeResourceUsage(resourceName string, rangeIdx int, value float64) {
m.lock.Lock()
defer m.lock.Unlock()
var resourceMetrics *prometheus.GaugeVec
resourceMetrics, ok := m.nodeResourceUsage[resourceName]
if !ok {
metricsName := fmt.Sprintf("%s_node_usage_total", formatMetricName(resourceName))
resourceMetrics = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: metricsName,
Help: "Total resource usage of node, by resource name.",
}, []string{"range"})
if err := prometheus.Register(resourceMetrics); err != nil {
log.Log(log.Metrics).Warn("failed to register metrics collector", zap.Error(err))
return
}
m.nodeResourceUsage[resourceName] = resourceMetrics
}
resourceMetrics.WithLabelValues(resourceUsageRangeBuckets[rangeIdx]).Set(value)
}
func (m *SchedulerMetrics) IncDrainingNodes() {
m.node.WithLabelValues(NodeDraining).Inc()
}
func (m *SchedulerMetrics) DecDrainingNodes() {
m.node.WithLabelValues(NodeDraining).Dec()
}
func (m *SchedulerMetrics) GetDrainingNodes() (int, error) {
metricDto := &dto.Metric{}
err := m.node.WithLabelValues(NodeDraining).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}
func (m *SchedulerMetrics) IncTotalDecommissionedNodes() {
m.node.WithLabelValues(NodeDecommissioned).Inc()
}