v2/internal/util/kubeclient/kube_client.go (107 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ package kubeclient import ( "context" "github.com/rotisserie/eris" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) type CommitType string const ( SpecOnly = CommitType("SpecOnly") SpecAndStatus = CommitType("SpecAndStatus") ) type Client interface { client.Client // Additional helpers GetObject(ctx context.Context, namespacedName types.NamespacedName, gvk schema.GroupVersionKind) (client.Object, error) GetObjectOrDefault(ctx context.Context, namespacedName types.NamespacedName, gvk schema.GroupVersionKind) (client.Object, error) CommitObject(ctx context.Context, obj client.Object, commitType CommitType) error } type clientHelper struct { client client.Client } var _ Client = &clientHelper{} func NewClient(client client.Client) Client { return &clientHelper{ client: client, } } func (c *clientHelper) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { return c.client.Get(ctx, key, obj, opts...) } func (c *clientHelper) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { return c.client.List(ctx, list, opts...) } func (c *clientHelper) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { return c.client.Create(ctx, obj, opts...) } func (c *clientHelper) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { return c.client.Delete(ctx, obj, opts...) } func (c *clientHelper) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { return c.client.Update(ctx, obj, opts...) } func (c *clientHelper) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { return c.client.Patch(ctx, obj, patch, opts...) } func (c *clientHelper) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { return c.client.DeleteAllOf(ctx, obj, opts...) } func (c *clientHelper) Status() client.StatusWriter { return c.client.Status() } func (c *clientHelper) Scheme() *runtime.Scheme { return c.client.Scheme() } func (c *clientHelper) RESTMapper() meta.RESTMapper { return c.client.RESTMapper() } func (c *clientHelper) SubResource(subResource string) client.SubResourceClient { return c.client.SubResource(subResource) } func (c *clientHelper) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { return c.client.GroupVersionKindFor(obj) } func (c *clientHelper) IsObjectNamespaced(obj runtime.Object) (bool, error) { return c.client.IsObjectNamespaced(obj) } func (c *clientHelper) GetObject(ctx context.Context, namespacedName types.NamespacedName, gvk schema.GroupVersionKind) (client.Object, error) { obj, err := c.Scheme().New(gvk) if err != nil { return nil, eris.Wrapf(err, "unable to create object from gvk %s with", gvk) } clientObj, ok := obj.(client.Object) if !ok { return nil, eris.Errorf("gvk %s doesn't implement client.Object", gvk) } if err := c.Get(ctx, namespacedName, clientObj); err != nil { return nil, err } // Ensure GVK is populated clientObj.GetObjectKind().SetGroupVersionKind(gvk) return clientObj, nil } func (c *clientHelper) GetObjectOrDefault(ctx context.Context, namespacedName types.NamespacedName, gvk schema.GroupVersionKind) (client.Object, error) { result, err := c.GetObject(ctx, namespacedName, gvk) if apierrors.IsNotFound(err) { return nil, nil } return result, err } // CommitObject persists the contents of obj to etcd by using the Kubernetes client. // Note that after this method has been called, obj contains the result of the update // from APIServer (including an updated resourceVersion). Both Spec and Status are written func (c *clientHelper) CommitObject(ctx context.Context, obj client.Object, commitType CommitType) error { // Order of updates (spec first or status first) matters here. // If the status is updated first: clients that are waiting on status // Condition Ready == true might see that quickly enough, and make a spec // update fast enough, to conflict with the second write (that of the spec). // This will trigger extra requests to Azure and fail our recording tests but is // otherwise harmless in an actual deployment. // We update the spec first to avoid the above problem. // We must clone here because the result of this update could contain // fields such as status.location that may not be set but are not omitempty. // This will cause the contents we have in Status.Location to be overwritten. clone := obj.DeepCopyObject().(client.Object) err := c.Update(ctx, clone) if err != nil { return eris.Wrapf(err, "updating %s/%s resource", obj.GetNamespace(), obj.GetName()) } obj.SetResourceVersion(clone.GetResourceVersion()) if commitType == SpecAndStatus { // Note that subsequent calls to GET can (if using a cached client) can miss the updates we've just done. // See: https://github.com/kubernetes-sigs/controller-runtime/issues/1464. err = c.Status().Update(ctx, obj) if err != nil { return eris.Wrapf(err, "updating %s/%s resource status", obj.GetNamespace(), obj.GetName()) } } return nil }