operatortrace-go/pkg/client/tracing_client.go (254 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. // pkg/client/tracing_client.go package client import ( "context" "fmt" constants "github.com/Azure/operatortrace/operatortrace-go/pkg/constants" "github.com/Azure/operatortrace/operatortrace-go/pkg/predicates" "github.com/go-logr/logr" "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // TracingClient wraps the Kubernetes client to add tracing functionality type tracingClient struct { scheme *runtime.Scheme client.Client client.Reader trace.Tracer Logger logr.Logger options Options } // Options holds the configuration for TracingClient type Options struct { LinkedTraceIDLocation string } // Option is a function that configures Options type Option func(*Options) var _ TracingClient = (*tracingClient)(nil) // NewTracingClient initializes and returns a new TracingClient // optional scheme. If not, it will use client-go scheme func NewTracingClient(c client.Client, r client.Reader, t trace.Tracer, l logr.Logger, scheme ...*runtime.Scheme) TracingClient { tracingScheme := clientgoscheme.Scheme if len(scheme) > 0 { tracingScheme = scheme[0] } return &tracingClient{ scheme: tracingScheme, Client: c, Reader: r, Tracer: t, Logger: l, } } // Create adds tracing and traceID annotation around the original client's Create method func (tc *tracingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("Create %s %s", kind, obj.GetName())) defer span.End() addTraceIDAnnotation(ctx, obj) tc.Logger.Info("Creating object", "object", obj.GetName()) err = tc.Client.Create(ctx, obj, opts...) if err != nil { span.RecordError(err) } return err } // Update adds tracing and traceID annotation around the original client's Update method func (tc *tracingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("Update %s %s", kind, obj.GetName())) defer span.End() existingObj := obj.DeepCopyObject().(client.Object) if err := tc.Client.Get(ctx, client.ObjectKeyFromObject(obj), existingObj); err != nil { return err } if !predicates.HasSignificantUpdate(existingObj, obj) { tc.Logger.Info("Skipping update as object content has not changed", "object", obj.GetName()) return nil } addTraceIDAnnotation(ctx, obj) tc.Logger.Info("Updating object", "object", obj.GetName()) err = tc.Client.Update(ctx, obj, opts...) if err != nil { span.RecordError(err) } return err } func (tc *tracingClient) StartSpan(ctx context.Context, operationName string) (context.Context, trace.Span) { return startSpanFromContext(ctx, tc.Logger, tc.Tracer, nil, tc.scheme, operationName) } // EmbedTraceIDInNamespacedName embeds the traceID and spanID in the key.Name func (tc *tracingClient) EmbedTraceIDInNamespacedName(key *client.ObjectKey, obj client.Object) error { traceID := obj.GetAnnotations()[constants.TraceIDAnnotation] spanID := obj.GetAnnotations()[constants.SpanIDAnnotation] if traceID == "" || spanID == "" { return nil } gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } objectKind := gvk.GroupKind().Kind objectName := obj.GetName() embedTraceID := &EmbedTraceID{ TraceID: traceID, SpanID: spanID, ObjectKind: objectKind, ObjectName: objectName, KeyName: key.Name, } key.Name = embedTraceID.ToString() tc.Logger.Info("EmbedTraceIDInNamespacedName", "objectName", key.Name) return nil } // Get adds tracing around the original client's Get method // IMPORTANT: Caller MUST call `defer span.End()` to end the trace from the calling function func (tc *tracingClient) StartTrace(ctx context.Context, key *client.ObjectKey, obj client.Object, opts ...client.GetOption) (context.Context, trace.Span, error) { name := getNameFromNamespacedName(*key) incomingKey := *key key.Name = name // Create or retrieve the span from the context getErr := tc.Reader.Get(ctx, *key, obj, opts...) if getErr != nil { ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("StartTrace Unknown Object %s", name)) return trace.ContextWithSpan(ctx, span), span, getErr } overrideTraceIDFromNamespacedName(incomingKey, obj) gvk, err := apiutil.GVKForObject(obj, tc.scheme) objectKind := "" if err == nil { objectKind = gvk.GroupKind().Kind } callerName := getCallerNameFromNamespacedName(incomingKey) callerKind := getCallerKindFromNamespacedName(incomingKey) operationName := "" if callerKind != "" && callerName != "" { operationName = fmt.Sprintf("StartTrace %s/%s Triggered By Changed Object %s/%s", objectKind, name, callerKind, callerName) } else { operationName = fmt.Sprintf("StartTrace %s %s", objectKind, name) } ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, operationName) if err != nil { span.RecordError(err) } tc.Logger.Info("Getting object", "object", key.Name) return trace.ContextWithSpan(ctx, span), span, err } // Ends the trace by clearing the traceid from the object func (tc *tracingClient) EndTrace(ctx context.Context, obj client.Object, opts ...client.PatchOption) (client.Object, error) { ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("EndTrace %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())) defer span.End() annotations := obj.GetAnnotations() if annotations == nil { return obj, nil } // get the current object and ensure that current object has the expected traceid and spanid annotations currentObjFromServer := obj.DeepCopyObject().(client.Object) err := tc.Reader.Get(ctx, client.ObjectKeyFromObject(obj), currentObjFromServer) if err != nil { span.RecordError(err) } // compare the traceid and spanid from currentobj to ensure that the traceid and spanid are not changed if currentObjFromServer.GetAnnotations()[constants.TraceIDAnnotation] != obj.GetAnnotations()[constants.TraceIDAnnotation] { tc.Logger.Info("TraceID has changed, skipping patch", "object", obj.GetName()) span.RecordError(fmt.Errorf("TraceID has changed, skipping patch: object %s", obj.GetName())) return obj, nil } // Remove the traceid and spanid annotations and create a patch original := obj.DeepCopyObject().(client.Object) patch := client.MergeFrom(original) delete(annotations, constants.TraceIDAnnotation) delete(annotations, constants.SpanIDAnnotation) obj.SetAnnotations(annotations) tc.Logger.Info("Patching object", "object", obj.GetName()) // Use the Patch function to apply the patch err = tc.Client.Patch(ctx, obj, patch, opts...) if err != nil { span.RecordError(err) } original = obj.DeepCopyObject().(client.Object) // remove the traceid and spanid conditions from the object and create a status().patch deleteConditionAsMap("TraceID", obj, tc.scheme) deleteConditionAsMap("SpanID", obj, tc.scheme) patch = client.MergeFrom(original) tc.Logger.Info("Patching object status", "object", obj.GetName()) err = tc.Client.Status().Patch(ctx, obj, patch) if err != nil { span.RecordError(err) } return obj, err } // Get adds tracing around the original client's Get method func (tc *tracingClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { // Create or retrieve the span from the context gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("Get %s %s", kind, key.Name)) defer span.End() tc.Logger.Info("Getting object", "object", key.Name) err = tc.Client.Get(ctx, key, obj, opts...) if err != nil { span.RecordError(err) } return err } func (tc *tracingClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { gvk, _ := apiutil.GVKForObject(list, tc.scheme) kind := gvk.GroupKind().Kind ctx, span := startSpanFromContextList(ctx, tc.Logger, tc.Tracer, list, kind) defer span.End() tc.Logger.Info("Getting List", "object", kind) err := tc.Client.List(ctx, list, opts...) if err != nil { span.RecordError(err) } return err } // Patch adds tracing and traceID annotation around the original client's Patch method func (tc *tracingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("Patch %s %s", kind, obj.GetName())) defer span.End() existingObj := obj.DeepCopyObject().(client.Object) if err := tc.Client.Get(ctx, client.ObjectKeyFromObject(obj), existingObj); err != nil { return err } if !predicates.HasSignificantUpdate(existingObj, obj) { tc.Logger.Info("Skipping update as object content has not changed", "object", obj.GetName()) return nil } addTraceIDAnnotation(ctx, obj) tc.Logger.Info("Patching object", "object", obj.GetName()) err = tc.Client.Patch(ctx, obj, patch, opts...) if err != nil { span.RecordError(err) } return err } // Delete adds tracing around the original client's Delete method func (tc *tracingClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("Delete %s %s", kind, obj.GetName())) defer span.End() tc.Logger.Info("Deleting object", "object", obj.GetName()) err = tc.Client.Delete(ctx, obj, opts...) if err != nil { span.RecordError(err) } return err } func (tc *tracingClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { gvk, err := apiutil.GVKForObject(obj, tc.scheme) if err != nil { return fmt.Errorf("problem getting the scheme: %w", err) } kind := gvk.GroupKind().Kind ctx, span := startSpanFromContext(ctx, tc.Logger, tc.Tracer, obj, tc.scheme, fmt.Sprintf("DeleteAllOf %s %s", kind, obj.GetName())) defer span.End() tc.Logger.Info("Deleting all of object", "object", obj.GetName()) err = tc.Client.DeleteAllOf(ctx, obj, opts...) if err != nil { span.RecordError(err) } return err }