pkg/kubernetes/composite_client.go (413 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT license. package kubernetes import ( "crypto/x509" "errors" "net/url" "time" "github.com/Azure/aks-engine-azurestack/pkg/kubernetes/internal" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" ) // CompositeClientSet wraps a pair of Kubernetes clients hooked up to a live api server. // // Prefer this client when the cluster CA is expected to change (ex.: secret rotation operations). type CompositeClientSet struct { oldCAClient internal.Client newCAClient internal.Client timeout time.Duration backoff wait.Backoff retryFunc func(err error) bool } // NewCompositeClient returns a KubernetesClient hooked up to the api server at the apiserverURL. func NewCompositeClient(oldCAClient, newCAClient internal.Client, interval, timeout time.Duration) *CompositeClientSet { return &CompositeClientSet{ oldCAClient: oldCAClient, newCAClient: newCAClient, timeout: timeout, backoff: wait.Backoff{ Steps: int(int64(timeout/time.Millisecond) / int64(interval/time.Millisecond)), Duration: interval, Factor: 1.0, Jitter: 0.0, }, retryFunc: retriable, // Inject if ever needed } } // retriable returns true unless err is an x509.UnknownAuthorityError instance func retriable(err error) bool { switch err := err.(type) { case x509.UnknownAuthorityError: return false case *url.Error: return retriable(err.Unwrap()) default: return true } } type listPodsResult struct { x *v1.PodList err error } // ListPods returns Pods based on the passed in list options. func (c *CompositeClientSet) ListPods(namespace string, opts metav1.ListOptions) (*v1.PodList, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listPodsResult { stream := make(chan listPodsResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListPodsByOptions(namespace, opts) if err != nil { lastError = err return err } stream <- listPodsResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type listNodesResult struct { x *v1.NodeList err error } // ListNodes returns a list of Nodes registered in the api server. func (c *CompositeClientSet) ListNodes() (x *v1.NodeList, err error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listNodesResult { stream := make(chan listNodesResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListNodes() if err != nil { lastError = err return err } stream <- listNodesResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type listServiceAccountsResult struct { x *v1.ServiceAccountList err error } // ListServiceAccounts returns a list of Service Accounts in the provided namespace. func (c *CompositeClientSet) ListServiceAccounts(namespace string, opts metav1.ListOptions) (*v1.ServiceAccountList, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listServiceAccountsResult { stream := make(chan listServiceAccountsResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListServiceAccountsByOptions(namespace, opts) if err != nil { lastError = err return err } stream <- listServiceAccountsResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type listDeploymentsResult struct { x *appsv1.DeploymentList err error } // ListDeployments returns a list of deployments in the provided namespace. func (c *CompositeClientSet) ListDeployments(namespace string, opts metav1.ListOptions) (*appsv1.DeploymentList, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listDeploymentsResult { stream := make(chan listDeploymentsResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListDeployments(namespace, opts) if err != nil { lastError = err return err } stream <- listDeploymentsResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type listDaemonSetsResult struct { x *appsv1.DaemonSetList err error } // ListDaemonSets returns a list of daemonsets in the provided namespace. func (c *CompositeClientSet) ListDaemonSets(namespace string, opts metav1.ListOptions) (*appsv1.DaemonSetList, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listDaemonSetsResult { stream := make(chan listDaemonSetsResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListDaemonSets(namespace, opts) if err != nil { lastError = err return err } stream <- listDaemonSetsResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type listSecretsResult struct { x *v1.SecretList err error } // ListSecrets returns a list of secrets in the provided namespace. func (c *CompositeClientSet) ListSecrets(namespace string, opts metav1.ListOptions) (*v1.SecretList, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan listSecretsResult { stream := make(chan listSecretsResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.ListSecrets(namespace, opts) if err != nil { lastError = err return err } stream <- listSecretsResult{x, err} return nil }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type deploymentResult struct { x *appsv1.Deployment err error } // GetDeployment blah. func (c *CompositeClientSet) GetDeployment(namespace, name string) (*appsv1.Deployment, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan deploymentResult { stream := make(chan deploymentResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.GetDeployment(namespace, name) if err == nil || apierrors.IsNotFound(err) { stream <- deploymentResult{x, err} return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } // PatchDeployment applies a JSON patch to a deployment in the provided namespace. func (c *CompositeClientSet) PatchDeployment(namespace, name, jsonPatch string) (*appsv1.Deployment, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan deploymentResult { stream := make(chan deploymentResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.PatchDeployment(namespace, name, jsonPatch) if err == nil || apierrors.IsNotFound(err) { stream <- deploymentResult{x, err} return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } type daemonsetResult struct { x *appsv1.DaemonSet err error } // PatchDaemonSet applies a JSON patch to a daemonset in the provided namespace. func (c *CompositeClientSet) PatchDaemonSet(namespace, name, jsonPatch string) (*appsv1.DaemonSet, error) { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan daemonsetResult { stream := make(chan daemonsetResult) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { x, err := client.PatchDaemonSet(namespace, name, jsonPatch) if err == nil || apierrors.IsNotFound(err) { stream <- daemonsetResult{x, err} return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case res := <-result: return res.x, res.err case <-time.After(c.timeout): return nil, lastError } } } // DeletePods deletes all pods in a namespace that match the option filters. func (c *CompositeClientSet) DeletePods(namespace string, opts metav1.ListOptions) error { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan error { stream := make(chan error) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { err := client.DeletePods(namespace, opts) if err == nil || apierrors.IsNotFound(err) { stream <- err return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case err := <-result: return err case <-time.After(c.timeout): return lastError } } } // DeleteServiceAccount deletes the passed in service account. func (c *CompositeClientSet) DeleteServiceAccount(serviceAccount *v1.ServiceAccount) error { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan error { stream := make(chan error) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { err := client.DeleteServiceAccount(serviceAccount) if err == nil || apierrors.IsNotFound(err) { stream <- err return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case err := <-result: return err case <-time.After(c.timeout): return lastError } } } // DeleteSecret deletes the passed in secret. func (c *CompositeClientSet) DeleteSecret(secret *v1.Secret) error { lastError := wait.ErrorInterrupted(errors.New("timed out waiting for the condition")) result := func(oldCAClient, newCAClient internal.Client) <-chan error { stream := make(chan error) exec := func(client internal.Client) { _ = retry.OnError(c.backoff, c.retryFunc, func() error { err := client.DeleteSecret(secret) if err == nil || apierrors.IsNotFound(err) { stream <- err return nil } lastError = err return err }) } go exec(oldCAClient) go exec(newCAClient) return stream }(c.oldCAClient, c.newCAClient) for { select { case err := <-result: return err case <-time.After(c.timeout): return lastError } } }