pkg/instrumentation/auto/callback.go (123 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package auto
import (
"context"
"encoding/json"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type objectCallbackFunc func(client.Object, any) (any, bool)
// chainCallbacks is a func that invokes functions in a callback chain one after another as long as each function
// returns true. The result of each function in the callback is passed along to the next. Eventually returns true if
// all callbacks in chain are executed and false otherwise.
func chainCallbacks(fns ...objectCallbackFunc) objectCallbackFunc {
return func(obj client.Object, passToNext any) (any, bool) {
var ok bool
for _, fn := range fns {
passToNext, ok = fn(obj, passToNext)
if !ok {
return passToNext, false
}
}
return passToNext, true
}
}
// basicPatch based on client.mergeFromPatch. Takes in a pre-marshalled JSON instead of the original object.
type basicPatch struct {
originalJSON []byte
}
var _ client.Patch = (*basicPatch)(nil)
func (p *basicPatch) Type() types.PatchType {
return types.MergePatchType
}
func (p *basicPatch) Data(obj client.Object) ([]byte, error) {
modifiedJSON, err := json.Marshal(obj)
if err != nil {
return nil, err
}
data, err := strategicpatch.CreateTwoWayMergePatch(p.originalJSON, modifiedJSON, obj)
if err != nil {
return nil, err
}
return data, nil
}
func createPatch(obj client.Object) (client.Patch, error) {
originalJSON, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return &basicPatch{originalJSON: originalJSON}, nil
}
func (m *AnnotationMutators) patchFunc(ctx context.Context, callback objectCallbackFunc) objectCallbackFunc {
return func(obj client.Object, _ any) (any, bool) {
patch, err := createPatch(obj)
if err != nil {
m.logger.Error(err, "Unable to create patch",
"kind", fmt.Sprintf("%T", obj),
"name", obj.GetName(),
"namespace", obj.GetNamespace(),
)
return nil, false
}
ret, ok := callback(obj, nil)
if !ok {
return ret, false
}
if err = m.clientWriter.Patch(ctx, obj, patch); err != nil {
m.logger.Error(err, "Unable to send patch",
"kind", fmt.Sprintf("%T", obj),
"name", obj.GetName(),
"namespace", obj.GetNamespace(),
)
return ret, false
}
return ret, true
}
}
func (m *AnnotationMutators) restartNamespaceFunc(ctx context.Context) objectCallbackFunc {
return func(obj client.Object, previousResult any) (any, bool) {
mutatedAnnotations, ok := previousResult.(map[string]string)
if !ok {
return nil, false
}
namespace, ok := obj.(*corev1.Namespace)
if !ok {
return nil, false
}
m.RestartNamespace(ctx, namespace, mutatedAnnotations)
return nil, true
}
}
// shouldRestartFunc returns a func that determines if a resource should be restarted
func (m *AnnotationMutators) shouldRestartFunc(namespaceMutatedAnnotations map[string]string) objectCallbackFunc {
return func(obj client.Object, _ any) (any, bool) {
switch o := obj.(type) {
case *appsv1.Deployment:
return nil, m.shouldRestartResource(namespaceMutatedAnnotations, o.Spec.Template.GetObjectMeta())
case *appsv1.DaemonSet:
return nil, m.shouldRestartResource(namespaceMutatedAnnotations, o.Spec.Template.GetObjectMeta())
case *appsv1.StatefulSet:
return nil, m.shouldRestartResource(namespaceMutatedAnnotations, o.Spec.Template.GetObjectMeta())
default:
return nil, false
}
}
}
// shouldRestartResource returns true if a resource requires a restart corresponding to the mutated annotations on its namespace
func (m *AnnotationMutators) shouldRestartResource(namespaceMutatedAnnotations map[string]string, obj metav1.Object) bool {
var shouldRestart bool
if resourceAnnotations := obj.GetAnnotations(); resourceAnnotations != nil {
// For each of the namespace mutated annotations,
for namespaceMutatedAnnotation, namespaceMutatedAnnotationValue := range namespaceMutatedAnnotations {
if _, ok := m.injectAnnotations[namespaceMutatedAnnotation]; !ok {
// If it is not an inject-* annotation, we can ignore it
continue
}
resourceAnnotationValue, ok := resourceAnnotations[namespaceMutatedAnnotation]
if ok && namespaceMutatedAnnotationValue == resourceAnnotationValue {
// If the resource already has the same annotation with the same value, do not restart it since it
// was explicitly annotated on the resource and hence the annotation on the namespace being mutated
// should have no overall impact
continue
} else {
// Else the resource needs to be instrumented/un-instrumented via the namespace and hence needs a restart
shouldRestart = true
}
}
} else {
shouldRestart = true
}
return shouldRestart
}