lambda/rapidcore/standalone/telemetry/tracer.go (174 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package telemetry import ( "context" "encoding/json" "fmt" "time" "go.amzn.com/lambda/appctx" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/metering" "go.amzn.com/lambda/rapi/model" "go.amzn.com/lambda/telemetry" "github.com/sirupsen/logrus" ) // InitSubsegmentName provides name attribute for Init subsegment const InitSubsegmentName = "Initialization" // RestoreSubsegmentName provides name attribute for Restore subsegment const RestoreSubsegmentName = "Restore" // InvokeSubsegmentName provides name attribute for Invoke subsegment const InvokeSubsegmentName = "Invocation" // OverheadSubsegmentName provides name attribute for Overhead subsegment const OverheadSubsegmentName = "Overhead" type StandaloneTracer struct { startFunction func(ctx context.Context, invoke *interop.Invoke, segmentName string, timestamp int64) endFunction func(ctx context.Context, invoke *interop.Invoke, segmentName string, timestamp int64) invoke *interop.Invoke tracingHeader string rootTraceID string parent string sampled string lineage string invocationSubsegmentID string initStartTime int64 initEndTime int64 restoreStartTime int64 restoreEndTime int64 restorePresent bool } type TracingEvent struct { Message string `json:"message"` TraceID string `json:"trace_id"` SegmentName string `json:"segment_name"` SegmentID string `json:"segment_id"` Timestamp int64 `json:"timestamp"` } func (t *StandaloneTracer) Configure(invoke *interop.Invoke) { t.invoke = invoke t.tracingHeader = invoke.TraceID t.invocationSubsegmentID = "" t.rootTraceID, t.parent, t.sampled, t.lineage = telemetry.ParseTracingHeader(invoke.TraceID) if invoke.RestoreDurationNs == 0 { t.restorePresent = false } else { t.restorePresent = true t.restoreStartTime = metering.MonoToEpoch(invoke.RestoreStartTimeMonotime) t.restoreEndTime = t.restoreStartTime + invoke.RestoreDurationNs } } func (t *StandaloneTracer) CaptureInvokeSegment(ctx context.Context, criticalFunction func(context.Context) error) error { return t.withStartAndEnd(ctx, criticalFunction, "STANDALONE_FUNCTION_NAME") } func (t *StandaloneTracer) CaptureInitSubsegment(ctx context.Context, criticalFunction func(context.Context) error) error { return t.withStartAndEnd(ctx, criticalFunction, InitSubsegmentName) } func (t *StandaloneTracer) CaptureInvokeSubsegment(ctx context.Context, criticalFunction func(context.Context) error) error { t.invocationSubsegmentID = InvokeSubsegmentName return t.withStartAndEnd(ctx, criticalFunction, InvokeSubsegmentName) } func (t *StandaloneTracer) CaptureOverheadSubsegment(ctx context.Context, criticalFunction func(context.Context) error) error { return t.withStartAndEnd(ctx, criticalFunction, OverheadSubsegmentName) } func (t *StandaloneTracer) withStartAndEnd(ctx context.Context, criticalFunction func(context.Context) error, segmentName string) error { ctx = telemetry.NewTraceContext(ctx, t.rootTraceID, segmentName) t.startFunction(ctx, t.invoke, segmentName, time.Now().UnixNano()) err := criticalFunction(ctx) t.endFunction(ctx, t.invoke, segmentName, time.Now().UnixNano()) return err } func (t *StandaloneTracer) RecordInitStartTime() { t.initStartTime = time.Now().UnixNano() } func (t *StandaloneTracer) RecordInitEndTime() { t.initEndTime = time.Now().UnixNano() } func (t *StandaloneTracer) sendPrepSubsegment(ctx context.Context, subsegmentName string, startTime int64, endTime int64) { ctx = telemetry.NewTraceContext(ctx, t.rootTraceID, subsegmentName) t.startFunction(ctx, t.invoke, subsegmentName, startTime) t.endFunction(ctx, t.invoke, subsegmentName, endTime) } func (t *StandaloneTracer) SendInitSubsegmentWithRecordedTimesOnce(ctx context.Context) { t.sendPrepSubsegment(ctx, InitSubsegmentName, t.initStartTime, t.initEndTime) } func (t *StandaloneTracer) SendRestoreSubsegmentWithRecordedTimesOnce(ctx context.Context) { if t.restorePresent { t.sendPrepSubsegment(ctx, RestoreSubsegmentName, t.restoreStartTime, t.restoreEndTime) } } func (t *StandaloneTracer) MarkError(ctx context.Context) {} func (t *StandaloneTracer) AttachErrorCause(ctx context.Context, errorCause json.RawMessage) {} func (t *StandaloneTracer) WithErrorCause(ctx context.Context, appCtx appctx.ApplicationContext, criticalFunction func(ctx context.Context) error) func(ctx context.Context) error { return criticalFunction } func (t *StandaloneTracer) WithError(ctx context.Context, appCtx appctx.ApplicationContext, criticalFunction func(ctx context.Context) error) func(ctx context.Context) error { return criticalFunction } func (t *StandaloneTracer) BuildTracingHeader() func(ctx context.Context) string { // extract root trace ID and parent from context and build the tracing header return func(ctx context.Context) string { var parent string var ok bool if parent, ok = ctx.Value(telemetry.DocumentIDKey).(string); !ok || parent == "" { return t.invoke.TraceID } if t.rootTraceID == "" || t.sampled == "" { return "" } var tracingHeader = "Root=%s;Parent=%s;Sampled=%s" if t.lineage == "" { return fmt.Sprintf(tracingHeader, t.rootTraceID, parent, t.sampled) } return fmt.Sprintf(tracingHeader+";Lineage=%s", t.rootTraceID, parent, t.sampled, t.lineage) } } func (t *StandaloneTracer) BuildTracingCtxForStart() *interop.TracingCtx { if t.rootTraceID == "" || t.sampled != model.XRaySampled { return nil } return &interop.TracingCtx{ SpanID: t.parent, Type: model.XRayTracingType, Value: telemetry.BuildFullTraceID(t.rootTraceID, t.invoke.LambdaSegmentID, t.sampled), } } func (t *StandaloneTracer) BuildTracingCtxAfterInvokeComplete() *interop.TracingCtx { if t.rootTraceID == "" || t.sampled != model.XRaySampled || t.invocationSubsegmentID == "" { return nil } return &interop.TracingCtx{ SpanID: t.invocationSubsegmentID, Type: model.XRayTracingType, Value: t.tracingHeader, } } func isTracingEnabled(root, parent, sampled string) bool { return len(root) != 0 && len(parent) != 0 && sampled == "1" } func NewStandaloneTracer(api *StandaloneEventsAPI) *StandaloneTracer { startCaptureFn := func(ctx context.Context, i *interop.Invoke, segmentName string, timestamp int64) { root, parent, sampled, _ := telemetry.ParseTracingHeader(i.TraceID) if isTracingEnabled(root, parent, sampled) { e := TracingEvent{ Message: "START", TraceID: root, SegmentName: segmentName, SegmentID: parent, Timestamp: timestamp / int64(time.Millisecond), } api.LogTrace(e) log.WithFields(logrus.Fields{"trace": e}).Info("sandbox trace") } } endCaptureFn := func(ctx context.Context, i *interop.Invoke, segmentName string, timestamp int64) { root, parent, sampled, _ := telemetry.ParseTracingHeader(i.TraceID) if isTracingEnabled(root, parent, sampled) { e := TracingEvent{ Message: "END", TraceID: root, SegmentName: "", SegmentID: parent, Timestamp: timestamp / int64(time.Millisecond), } api.LogTrace(e) log.WithFields(logrus.Fields{"trace": e}).Info("sandbox trace") } } return &StandaloneTracer{ startFunction: startCaptureFn, endFunction: endCaptureFn, } }