cmd/rotatecerts/wait.go (271 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package rotatecerts
import (
"context"
"time"
"github.com/Azure/aks-engine-azurestack/cmd/rotatecerts/internal"
"github.com/Azure/aks-engine-azurestack/pkg/helpers/ssh"
"github.com/Azure/aks-engine-azurestack/pkg/kubernetes"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
const defaultSuccessesNeeded int = 5
type nodesCondition func(*v1.NodeList) bool
// waitForNodesCondition checks that nodesCondition is met for every node in the cluster
func waitForNodesCondition(client internal.KubeClient, condition nodesCondition, successesNeeded int, interval, timeout time.Duration) (*v1.NodeList, error) {
var nl *v1.NodeList
var err error
var successesCount int
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err = wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
nl, err = client.ListNodes()
if err != nil {
return false, err
}
if !condition(nl) {
return false, nil
}
successesCount++
if successesCount < successesNeeded {
return false, nil
}
return true, nil
})
return nl, err
}
// WaitForNodesReady returns true if all requiredNodes reached the Ready state
func WaitForNodesReady(client internal.KubeClient, requiredNodes []string, interval, timeout time.Duration) error {
_, err := waitForNodesCondition(client, allNodesReadyCondition(requiredNodes), defaultSuccessesNeeded, interval, timeout)
return err
}
func allNodesReadyCondition(requiredNodes []string) nodesCondition {
return func(nl *v1.NodeList) bool {
requiredReady := make(map[string]bool)
for _, name := range requiredNodes {
requiredReady[name] = false
}
for _, nli := range nl.Items {
_, ok := requiredReady[nli.ObjectMeta.Name]
if !ok {
continue
}
ready := kubernetes.IsNodeReady(&nli)
if !ready {
return false
}
requiredReady[nli.ObjectMeta.Name] = ready
}
for _, ready := range requiredReady {
if !ready {
return false
}
}
return true
}
}
type podsCondition func(*v1.PodList) error
// waitForPodsCondition checks that podsCondition is met for every pod in the specified namespace
func waitForPodsCondition(client internal.KubeClient, namespace string, condition podsCondition, successesNeeded int, interval, timeout time.Duration) error {
var listErr, condErr error
var successesCount int
var pl *v1.PodList
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
pl, listErr = client.ListPods(namespace, metav1.ListOptions{})
if listErr != nil {
return false, listErr
}
if condErr = condition(pl); condErr != nil {
return false, nil
}
successesCount++
if successesCount < successesNeeded {
return false, nil
}
return true, nil
})
if listErr != nil {
return errors.Wrapf(listErr, "condition successesCount: %d", successesCount)
}
if condErr != nil {
return errors.Wrapf(condErr, "condition successesCount: %d", successesCount)
}
return err
}
// WaitForAllInNamespaceReady returns true if all containers in a given namespace reached the Ready state
func WaitForAllInNamespaceReady(client internal.KubeClient, namespace string, interval, timeout time.Duration, nodes map[string]*ssh.RemoteHost) error {
if err := waitForDaemonSetCondition(client, namespace, allDaemontSetReplicasUpdatedCondition, defaultSuccessesNeeded, interval, timeout); err != nil {
return err
}
if err := waitForDeploymentCondition(client, namespace, allDeploymentReplicasUpdatedCondition, defaultSuccessesNeeded, interval, timeout); err != nil {
return err
}
return waitForPodsCondition(client, namespace, allListedPodsReadyCondition, defaultSuccessesNeeded, interval, timeout)
}
func allListedPodsReadyCondition(pl *v1.PodList) error {
podsNotReady := make([]string, 0)
for _, pli := range pl.Items {
ready := pli.Status.Phase == v1.PodRunning
for _, c := range pli.Status.ContainerStatuses {
ready = ready && c.State.Running != nil && c.Ready
}
if !ready {
podsNotReady = append(podsNotReady, pli.Name)
}
}
if len(podsNotReady) != 0 {
return errors.Errorf("at least one pod did not reach the Ready state: %s", podsNotReady)
}
return nil
}
// WaitForReady returns true if all containers in a given pod list reached the Ready state
func WaitForReady(client internal.KubeClient, namespace string, pods []string, interval, timeout time.Duration, nodes map[string]*ssh.RemoteHost) error {
waitFor := allExpectedPodsReadyCondition(pods)
return waitForPodsCondition(client, namespace, waitFor, defaultSuccessesNeeded, interval, timeout)
}
func allExpectedPodsReadyCondition(expectedPods []string) podsCondition {
return func(pl *v1.PodList) error {
podReady := make(map[string]bool, len(expectedPods))
for _, n := range expectedPods {
podReady[n] = false
}
for _, pli := range pl.Items {
_, ok := podReady[pli.ObjectMeta.Name]
if !ok {
continue
}
ready := pli.Status.Phase == v1.PodRunning
for _, c := range pli.Status.ContainerStatuses {
ready = ready && c.State.Running != nil && c.Ready
}
podReady[pli.ObjectMeta.Name] = ready
}
podsNotReady := make([]string, 0)
for pod, ready := range podReady {
if !ready {
podsNotReady = append(podsNotReady, pod)
}
}
if len(podsNotReady) != 0 {
return errors.Errorf("at least one pod did not reach the Ready state: %s", podsNotReady)
}
return nil
}
}
type daemonsetCondition func(*appsv1.DaemonSetList) error
// waitForDaemonSetCondition fetches the ds in a namespace and checks that daemonsetCondition is met for every ds in the cluster
func waitForDaemonSetCondition(client internal.KubeClient, namespace string, condition daemonsetCondition, successesNeeded int, interval, timeout time.Duration) error {
var listErr, condErr error
var successesCount int
var dsl *appsv1.DaemonSetList
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
dsl, listErr = client.ListDaemonSets(namespace, metav1.ListOptions{})
if listErr != nil {
return false, listErr
}
if condErr = condition(dsl); condErr != nil {
return false, nil
}
successesCount++
if successesCount < successesNeeded {
return false, nil
}
return true, nil
})
if listErr != nil {
return errors.Wrapf(listErr, "condition successesCount: %d", successesCount)
}
if condErr != nil {
return errors.Wrapf(condErr, "condition successesCount: %d", successesCount)
}
return err
}
func allDaemontSetReplicasUpdatedCondition(dsl *appsv1.DaemonSetList) error {
dsNotReady := make([]string, 0)
for _, dsli := range dsl.Items {
desired := dsli.Status.DesiredNumberScheduled
current := dsli.Status.CurrentNumberScheduled
updated := dsli.Status.UpdatedNumberScheduled
if desired != current || desired != updated {
dsNotReady = append(dsNotReady, dsli.Name)
}
}
if len(dsNotReady) != 0 {
return errors.Errorf("at least one daemonset is still updating replicas: %s", dsNotReady)
}
return nil
}
type deploymentCondition func(*appsv1.DeploymentList) error
// waitForDeploymentCondition fetches the deployment in a namespace and checks that deployCondition is met for every deployment in the cluster
func waitForDeploymentCondition(client internal.KubeClient, namespace string, condition deploymentCondition, successesNeeded int, interval, timeout time.Duration) error {
var listErr, condErr error
var successesCount int
var dl *appsv1.DeploymentList
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
dl, listErr = client.ListDeployments(namespace, metav1.ListOptions{})
if listErr != nil {
return false, listErr
}
if condErr = condition(dl); condErr != nil {
return false, nil
}
successesCount++
if successesCount < successesNeeded {
return false, nil
}
return true, nil
})
if listErr != nil {
return errors.Wrapf(listErr, "condition successesCount: %d", successesCount)
}
if condErr != nil {
return errors.Wrapf(condErr, "condition successesCount: %d", successesCount)
}
return err
}
func allDeploymentReplicasUpdatedCondition(dsl *appsv1.DeploymentList) error {
deployNotReady := make([]string, 0)
for _, dli := range dsl.Items {
desired := dli.Status.Replicas
current := dli.Status.AvailableReplicas
updated := dli.Status.UpdatedReplicas
if desired != current || desired != updated {
deployNotReady = append(deployNotReady, dli.Name)
}
}
if len(deployNotReady) != 0 {
return errors.Errorf("at least one deployment is still updating replicas: %s", deployNotReady)
}
return nil
}
// WaitForVMsRunning checks that all requiredVMs are running
func WaitForVMsRunning(client internal.ARMClient, resourceGroupName string, requiredVMs []string, interval, timeout time.Duration) error {
var err error
var successesCount int
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err = wait.PollUntilContextCancel(ctx, interval, true, func(ctx context.Context) (bool, error) {
allRunning := true
for _, vm := range requiredVMs {
var state string
state, err = client.GetVirtualMachinePowerState(resourceGroupName, vm)
if err != nil {
return false, nil
}
running := isVirtualMachineRunning(state)
if err != nil {
return false, err
}
allRunning = allRunning && running
}
if !allRunning {
return false, nil
}
successesCount++
if successesCount < 1 {
return false, nil
}
return true, nil
})
return err
}