pkg/scheduler/scheduler.go (209 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"context"
"fmt"
"sync"
"github.com/GoogleCloudPlatform/gke-prober/pkg/common"
"github.com/GoogleCloudPlatform/gke-prober/pkg/metrics"
"github.com/GoogleCloudPlatform/gke-prober/pkg/probe"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
func RecordClusterMetrics(ctx context.Context, m metrics.ClusterRecorder, nodes []*v1.Node, daemonsets []*appsv1.DaemonSet, deployments []*appsv1.Deployment) {
// Report on node conditions
if nodes == nil {
klog.V(1).Infoln("Informer caches syncing. Skip next scraping cycle and wait until the caches are fully synced")
return
}
conditions := nodeConditions(nodes)
conditionCounts := []metrics.LabelCount{}
for _, v := range conditions.Dump() {
conditionCounts = append(conditionCounts, metrics.LabelCount{
Labels: v.Key,
Count: v.Value,
})
}
m.RecordNodeConditions(ctx, conditionCounts)
// Report on node availability (SLI)
availabilities, availableNodes := nodeAvailabilities(nodes)
availabilityCounts := []metrics.LabelCount{}
for _, v := range availabilities.Dump() {
availabilityCounts = append(availabilityCounts, metrics.LabelCount{
Labels: v.Key,
Count: v.Value,
})
}
m.RecordNodeAvailabilities(ctx, availabilityCounts)
// Report on expected addons
addonCounts := []metrics.LabelCount{}
for addon, count := range daemonSetPodCountByAddon(daemonsets) {
labels := addonLabels(addon)
// Use _available_ nodes as expected number of nodes used by daemonsets
addonCounts = append(addonCounts, metrics.LabelCount{
Labels: labels,
Count: availableNodes * count,
})
}
for addon, count := range deploymentPodCountByAddon(deployments) {
labels := addonLabels(addon)
addonCounts = append(addonCounts, metrics.LabelCount{
Labels: labels,
Count: count,
})
}
m.RecordAddonCounts(ctx, addonCounts)
}
type NodeScheduler struct {
cfg common.Config
mr metrics.NodeRecorder
*addonRestarts
}
type addonRestarts struct {
restarts map[common.Addon]int
m sync.Mutex
}
func NewNodeScheduler(mr metrics.NodeRecorder, cfg common.Config) *NodeScheduler {
return &NodeScheduler{
cfg: cfg,
mr: mr,
addonRestarts: &addonRestarts{restarts: make(map[common.Addon]int)},
}
}
func (ar *addonRestarts) RegisterAddonRestart(a common.Addon) {
ar.m.Lock()
defer ar.m.Unlock()
_, ok := ar.restarts[a]
if !ok {
ar.restarts[a] = 1
return
}
ar.restarts[a]++
}
func (ar *addonRestarts) PopAddonRestarts() map[common.Addon]int {
ar.m.Lock()
defer ar.m.Unlock()
results := make(map[common.Addon]int)
for a, n := range ar.restarts {
results[a] = n
delete(ar.restarts, a)
}
return results
}
func RecordNodeMetrics(ctx context.Context, mr metrics.NodeRecorder, cfg common.Config, nodes []*v1.Node, pods []*v1.Pod, probes probe.ProbeMap) {
if nodes == nil {
klog.V(1).Infoln("nodes is 0, wait until the next scraping cycle")
return
}
node := nodes[0] // node watcher will only watch for a single node
// Record node conditions
conditions := conditionStatuses(node.Status.Conditions)
clabels := []map[string]string{}
for t, st := range conditions {
labels := map[string]string{
"nodepool": cfg.Nodepool,
"zone": cfg.Location,
"type": t,
"status": st,
}
clabels = append(clabels, labels)
}
mr.RecordNodeConditions(ctx, clabels)
// Record node availability
ready, scheduleable, doneWarming := nodeAvailability(node)
nodeAvailable := ready && scheduleable && doneWarming
labels := map[string]string{
"nodepool": cfg.Nodepool,
"zone": cfg.Location,
"available": boolToStr(nodeAvailable),
"ready": boolToStr(ready),
"scheduleable": boolToStr(scheduleable),
"done_warming": boolToStr(doneWarming),
}
mr.RecordNodeAvailability(ctx, labels)
// Record addon availability
// Addon control plane depends on node availability
cpAvailable := nodeAvailable
// Use counts: deployments may sometimes have multiple pods on a single node
// TODO: probing to handle case of multiple pods on node
counts := podsByAddon(pods)
labelCounts := []metrics.LabelCount{}
for addon, pods := range counts {
// For now, just probe the first pod
pod := pods[0]
running := podIsRunning(pod)
probeResult := probe.AvailableUnknown
result := probe.Run(ctx, probes, pod, addon)
if result.Err != nil {
klog.Warning(result.Err)
}
probeResult = result.Available
available := running && (probeResult == "True" || probeResult == "Unknown")
if !available {
cpAvailable = false
}
labels := map[string]string{
"nodepool": cfg.Nodepool,
"zone": cfg.Location,
"available": boolToStr(available),
"node_available": boolToStr(nodeAvailable),
"running": boolToStr(running),
"healthy": probeResult,
}
addAddonLabels(addon, labels)
labelCounts = append(labelCounts, metrics.LabelCount{
Labels: labels,
Count: len(pods),
})
}
mr.RecordAddonAvailabilies(ctx, labelCounts)
// Record addon control plane availability
labels = map[string]string{
"nodepool": cfg.Nodepool,
"zone": cfg.Location,
"available": boolToStr(cpAvailable),
}
mr.RecordAddonControlPlaneAvailability(ctx, labels)
}
func (s *NodeScheduler) ContainerRestartHandler(ctx context.Context) (handler func(pod *v1.Pod, status v1.ContainerStatus)) {
handler = func(pod *v1.Pod, status v1.ContainerStatus) {
addon, ok := common.AddonFromPod(pod)
if !ok {
return
}
s.RegisterAddonRestart(addon)
state := status.LastTerminationState.Terminated
var reason, exitCode string
if state != nil {
klog.V(1).Infof("container %q restarted because of %s (%d)\n", status.Name, state.Reason, state.ExitCode)
reason = state.Reason
exitCode = fmt.Sprint(state.ExitCode)
} else {
klog.V(1).Infof("container %q restarted but termination reason unknown\n", status.Name)
reason = "unknown"
exitCode = "unknown"
}
labels := map[string]string{
"nodepool": s.cfg.Nodepool,
"zone": s.cfg.Location,
"container_name": status.Name,
"reason": reason,
"exit_code": exitCode,
}
addAddonLabels(addon, labels)
s.mr.RecordContainerRestart(ctx, labels)
}
return
}
func RecordClusterProbeMetrics(ctx context.Context, clientset *kubernetes.Clientset,
recorder metrics.ProbeRecorder, probes probe.ClusterProbeMap) {
var res probe.Result
clabels := []map[string]string{}
for addon, probe := range probes {
res = probe.Run(ctx, clientset)
labels := map[string]string{
"name": addon,
"condition": res.Available,
"reason": res.Err.Error(),
}
clabels = append(clabels, labels)
}
recorder.RecordAddonHealth(ctx, clabels)
}
func runConnectivityProbes(ctx context.Context, recorder metrics.ProbeRecorder, probes probe.ConnectivityProbeMap) {
for name, pr := range probes {
err := pr.Run(ctx, recorder)
if err != nil {
klog.Warningf("probe %q returned unexpected error %v\n", name, err)
}
}
}