lambda/telemetry/events_api.go (104 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package telemetry
import (
"fmt"
"time"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/metering"
)
func GetRuntimeDoneInvokeMetrics(runtimeStartedTime int64, invokeResponseMetrics *interop.InvokeResponseMetrics, runtimeDoneTime int64) *interop.RuntimeDoneInvokeMetrics {
// time taken from sending the invoke to the sandbox until the runtime calls GET /next
duration := CalculateDuration(runtimeStartedTime, runtimeDoneTime)
if invokeResponseMetrics != nil && invokeResponseMetrics.RuntimeCalledResponse && runtimeStartedTime != -1 {
return &interop.RuntimeDoneInvokeMetrics{
ProducedBytes: invokeResponseMetrics.ProducedBytes,
DurationMs: duration,
}
}
// when we get a reset before runtime called /response
if runtimeStartedTime != -1 {
return &interop.RuntimeDoneInvokeMetrics{
ProducedBytes: int64(0),
DurationMs: duration,
}
}
// We didn't have time to register the invokeReceiveTime, which means we crash/reset very early,
// too early for the runtime to actual run. In such case, the runtimeDone event shouldn't be sent
// Not returning Nil even in this improbable case guarantees that we will always have some metrics to send to FluxPump
return &interop.RuntimeDoneInvokeMetrics{
ProducedBytes: int64(0),
DurationMs: float64(0),
}
}
const (
InitInsideInitPhase interop.InitPhase = "init"
InitInsideInvokePhase interop.InitPhase = "invoke"
)
func InitPhaseFromLifecyclePhase(phase interop.LifecyclePhase) (interop.InitPhase, error) {
switch phase {
case interop.LifecyclePhaseInit:
return InitInsideInitPhase, nil
case interop.LifecyclePhaseInvoke:
return InitInsideInvokePhase, nil
default:
return interop.InitPhase(""), fmt.Errorf("unexpected lifecycle phase: %v", phase)
}
}
func GetRuntimeDoneSpans(runtimeStartedTime int64, invokeResponseMetrics *interop.InvokeResponseMetrics) []interop.Span {
if invokeResponseMetrics != nil && invokeResponseMetrics.RuntimeCalledResponse && runtimeStartedTime != -1 {
// time span from when the invoke is received in the sandbox to the moment the runtime calls PUT /response
responseLatencyMsSpan := interop.Span{
Name: "responseLatency",
Start: GetEpochTimeInISO8601FormatFromMonotime(runtimeStartedTime),
DurationMs: CalculateDuration(runtimeStartedTime, invokeResponseMetrics.StartReadingResponseMonoTimeMs),
}
// time span from when the runtime called PUT /response to the moment the body of the response is fully sent
responseDurationMsSpan := interop.Span{
Name: "responseDuration",
Start: GetEpochTimeInISO8601FormatFromMonotime(invokeResponseMetrics.StartReadingResponseMonoTimeMs),
DurationMs: CalculateDuration(invokeResponseMetrics.StartReadingResponseMonoTimeMs, invokeResponseMetrics.FinishReadingResponseMonoTimeMs),
}
return []interop.Span{responseLatencyMsSpan, responseDurationMsSpan}
}
return []interop.Span{}
}
// CalculateDuration calculates duration between two moments.
// The result is milliseconds with microsecond precision.
// Two assumptions here:
// 1. the passed values are nanoseconds
// 2. endNs > startNs
func CalculateDuration(startNs, endNs int64) float64 {
microseconds := int64(endNs-startNs) / int64(time.Microsecond)
return float64(microseconds) / 1000
}
const (
InitTypeOnDemand interop.InitType = "on-demand"
InitTypeProvisionedConcurrency interop.InitType = "provisioned-concurrency"
InitTypeInitCaching interop.InitType = "snap-start"
)
func InferInitType(initCachingEnabled bool, sandboxType interop.SandboxType) interop.InitType {
initSource := InitTypeOnDemand
// ToDo: Unify this selection of SandboxType by using the START message
// after having a roadmap on the combination of INIT modes
if initCachingEnabled {
initSource = InitTypeInitCaching
} else if sandboxType == interop.SandboxPreWarmed {
initSource = InitTypeProvisionedConcurrency
}
return initSource
}
func GetEpochTimeInISO8601FormatFromMonotime(monotime int64) string {
return time.Unix(0, metering.MonoToEpoch(monotime)).Format("2006-01-02T15:04:05.000Z")
}
const (
RuntimeDoneSuccess = "success"
RuntimeDoneError = "error"
)
type NoOpEventsAPI struct{}
func (s *NoOpEventsAPI) SetCurrentRequestID(interop.RequestID) {}
func (s *NoOpEventsAPI) SendInitStart(interop.InitStartData) error { return nil }
func (s *NoOpEventsAPI) SendInitRuntimeDone(interop.InitRuntimeDoneData) error { return nil }
func (s *NoOpEventsAPI) SendInitReport(interop.InitReportData) error { return nil }
func (s *NoOpEventsAPI) SendRestoreRuntimeDone(interop.RestoreRuntimeDoneData) error { return nil }
func (s *NoOpEventsAPI) SendInvokeStart(interop.InvokeStartData) error { return nil }
func (s *NoOpEventsAPI) SendInvokeRuntimeDone(interop.InvokeRuntimeDoneData) error { return nil }
func (s *NoOpEventsAPI) SendExtensionInit(interop.ExtensionInitData) error { return nil }
func (s *NoOpEventsAPI) SendEnd(interop.EndData) error { return nil }
func (s *NoOpEventsAPI) SendReportSpan(interop.Span) error { return nil }
func (s *NoOpEventsAPI) SendReport(interop.ReportData) error { return nil }
func (s *NoOpEventsAPI) SendFault(interop.FaultData) error { return nil }
func (s *NoOpEventsAPI) SendImageErrorLog(interop.ImageErrorLogData) {}
func (s *NoOpEventsAPI) FetchTailLogs(string) (string, error) { return "", nil }
func (s *NoOpEventsAPI) GetRuntimeDoneSpans(
runtimeStartedTime int64,
invokeResponseMetrics *interop.InvokeResponseMetrics,
runtimeOverheadStartedTime int64,
runtimeReadyTime int64,
) []interop.Span {
return []interop.Span{}
}