lambda/rapidcore/standalone/telemetry/events_api.go (216 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package telemetry import ( "encoding/json" "sort" "sync" "time" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/telemetry" ) type EventType = string const ( PlatformInitStart = EventType("platform.initStart") PlatformInitRuntimeDone = EventType("platform.initRuntimeDone") PlatformInitReport = EventType("platform.initReport") PlatformRestoreRuntimeDone = EventType("platform.restoreRuntimeDone") PlatformStart = EventType("platform.start") PlatformRuntimeDone = EventType("platform.runtimeDone") PlatformExtension = EventType("platform.extension") PlatformEnd = EventType("platform.end") PlatformReport = EventType("platform.report") PlatformFault = EventType("platform.fault") ) /* SandboxEvent represents a generic sandbox event. For example: { "time": "2021-03-16T13:10:42.358Z", "type": "platform.extension", "platformEvent": { "name": "foo bar", "state": "Ready", "events": ["INVOKE", "SHUTDOWN"]} } Or: { "time": "2021-03-16T13:10:42.358Z", "type": "extension", "logMessage": "raw agent console output" } FluxPump produces entries with a single field 'record', containing either an object or a string. We make the distinction explicit by providing separate fields for the two cases, 'PlatformEvent' and 'LogMessage'. Either one of the two would be populated, but not both. This makes code cleaner, but requires test client to merge two fields back, producing a single 'record' entry again -- to match the FluxPump format that tests actually check. */ type SandboxEvent struct { Time string `json:"time"` Type EventType `json:"type"` PlatformEvent map[string]interface{} `json:"platformEvent,omitempty"` LogMessage string `json:"logMessage,omitempty"` } type tailLogs struct { Events []SandboxEvent `json:"events,omitempty"` } type StandaloneEventsAPI struct { lock sync.Mutex requestID interop.RequestID eventLog EventLog } func (s *StandaloneEventsAPI) LogTrace(entry TracingEvent) { s.lock.Lock() defer s.lock.Unlock() s.eventLog.Traces = append(s.eventLog.Traces, entry) } func (s *StandaloneEventsAPI) EventLog() *EventLog { return &s.eventLog } func (s *StandaloneEventsAPI) SetCurrentRequestID(requestID interop.RequestID) { s.requestID = requestID } func (s *StandaloneEventsAPI) SendInitStart(data interop.InitStartData) error { record := map[string]interface{}{ "initializationType": data.InitializationType, "runtimeVersion": data.RuntimeVersion, "runtimeArn": data.RuntimeVersionArn, "runtimeVersionArn": data.RuntimeVersionArn, "functionArn": data.FunctionArn, "functionName": data.FunctionName, "functionVersion": data.FunctionVersion, "instanceId": data.InstanceID, "instanceMaxMemory": data.InstanceMaxMemory, "phase": data.Phase, } s.addTracingToRecord(data.Tracing, record) return s.sendPlatformEvent(PlatformInitStart, record) } func (s *StandaloneEventsAPI) SendInitRuntimeDone(data interop.InitRuntimeDoneData) error { record := map[string]interface{}{ "initializationType": data.InitializationType, "status": data.Status, "phase": data.Phase, } s.addTracingToRecord(data.Tracing, record) if data.ErrorType != nil { record["errorType"] = data.ErrorType } return s.sendPlatformEvent(PlatformInitRuntimeDone, record) } func (s *StandaloneEventsAPI) SendInitReport(data interop.InitReportData) error { record := map[string]interface{}{ "initializationType": data.InitializationType, "metrics": data.Metrics, "phase": data.Phase, } s.addTracingToRecord(data.Tracing, record) return s.sendPlatformEvent(PlatformInitReport, record) } func (s *StandaloneEventsAPI) SendRestoreRuntimeDone(data interop.RestoreRuntimeDoneData) error { record := map[string]interface{}{"status": data.Status} s.addTracingToRecord(data.Tracing, record) if data.ErrorType != nil { record["errorType"] = data.ErrorType } return s.sendPlatformEvent(PlatformRestoreRuntimeDone, record) } func (s *StandaloneEventsAPI) SendInvokeStart(data interop.InvokeStartData) error { record := map[string]interface{}{ "version": data.Version, "requestId": data.RequestID, } s.addTracingToRecord(data.Tracing, record) return s.sendPlatformEvent(PlatformStart, record) } func (s *StandaloneEventsAPI) SendInvokeRuntimeDone(data interop.InvokeRuntimeDoneData) error { record := map[string]interface{}{ "requestId": s.requestID, "status": data.Status, "metrics": data.Metrics, "internalMetrics": data.InternalMetrics, "spans": data.Spans, } if data.ErrorType != nil { record["errorType"] = data.ErrorType } s.addTracingToRecord(data.Tracing, record) return s.sendPlatformEvent(PlatformRuntimeDone, record) } func (s *StandaloneEventsAPI) SendExtensionInit(data interop.ExtensionInitData) error { sort.Strings(data.Subscriptions) record := map[string]interface{}{ "name": data.AgentName, "state": data.State, "events": data.Subscriptions, } if len(data.ErrorType) > 0 { record["errorType"] = data.ErrorType } return s.sendPlatformEvent(PlatformExtension, record) } func (s *StandaloneEventsAPI) SendImageErrorLog(interop.ImageErrorLogData) { // Called on bootstrap exec errors for OCI error modes, e.g. InvalidEntrypoint etc. } func (s *StandaloneEventsAPI) SendEnd(data interop.EndData) error { record := map[string]interface{}{ "requestId": data.RequestID, } return s.sendPlatformEvent(PlatformEnd, record) } func (s *StandaloneEventsAPI) SendReportSpan(interop.Span) error { return nil } func (s *StandaloneEventsAPI) SendReport(data interop.ReportData) error { record := map[string]interface{}{ "requestId": s.requestID, "status": data.Status, "metrics": data.Metrics, "spans": data.Spans, "tracing": data.Tracing, } if data.ErrorType != nil { record["errorType"] = data.ErrorType } return s.sendPlatformEvent(PlatformReport, record) } func (s *StandaloneEventsAPI) SendFault(data interop.FaultData) error { record := map[string]interface{}{ "fault": data.String(), } return s.sendPlatformEvent(PlatformFault, record) } func (s *StandaloneEventsAPI) FetchTailLogs(string) (string, error) { s.lock.Lock() defer s.lock.Unlock() if len(s.eventLog.Events) == 0 { return "", nil } logs := tailLogs{Events: s.eventLog.Events} logsBytes, err := json.Marshal(logs) if err != nil { return "", err } s.eventLog.Events = nil return string(logsBytes), nil } func (s *StandaloneEventsAPI) GetRuntimeDoneSpans( runtimeStartedTime int64, invokeResponseMetrics *interop.InvokeResponseMetrics, runtimeOverheadStartedTime int64, runtimeReadyTime int64, ) []interop.Span { spans := telemetry.GetRuntimeDoneSpans(runtimeStartedTime, invokeResponseMetrics) return spans } func (s *StandaloneEventsAPI) sendPlatformEvent(eventType string, record map[string]interface{}) error { e := SandboxEvent{ Time: time.Now().Format(time.RFC3339), Type: eventType, PlatformEvent: record, } s.appendEvent(e) s.logEvent(e) return nil } func (s *StandaloneEventsAPI) sendLogEvent(eventType, logMessage string) error { e := SandboxEvent{ Time: time.Now().Format(time.RFC3339), Type: eventType, LogMessage: logMessage, } s.appendEvent(e) s.logEvent(e) return nil } func (s *StandaloneEventsAPI) appendEvent(event SandboxEvent) { s.lock.Lock() defer s.lock.Unlock() s.eventLog.Events = append(s.eventLog.Events, event) } func (s *StandaloneEventsAPI) logEvent(e SandboxEvent) { log.WithField("event", e).Info("sandbox event") } func (s *StandaloneEventsAPI) addTracingToRecord(tracingData *interop.TracingCtx, record map[string]interface{}) { if tracingData != nil { record["tracing"] = map[string]string{ "spanId": tracingData.SpanID, "type": string(tracingData.Type), "value": tracingData.Value, } } }