logsapi/event.go (138 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package logsapi import ( "context" "time" ) // LogEventType represents the log type that is received in the log messages type LogEventType string const ( // PlatformRuntimeDone event is sent when lambda function is finished it's execution PlatformRuntimeDone LogEventType = "platform.runtimeDone" PlatformFault LogEventType = "platform.fault" PlatformReport LogEventType = "platform.report" PlatformLogsDropped LogEventType = "platform.logsDropped" PlatformStart LogEventType = "platform.start" PlatformEnd LogEventType = "platform.end" FunctionLog LogEventType = "function" ) // LogEvent represents an event received from the Logs API type LogEvent struct { Time time.Time `json:"time"` Type LogEventType `json:"type"` StringRecord string Record LogEventRecord } // LogEventRecord is a sub-object in a Logs API event type LogEventRecord struct { RequestID string `json:"requestId"` Status string `json:"status"` Metrics PlatformMetrics `json:"metrics"` } // ProcessLogs consumes log events until there are no more log events that // can be consumed or ctx is canceled. For INVOKE event this state is // reached when runtimeDone event for the current requestID is processed // whereas for SHUTDOWN event this state is reached when the platformReport // event for the previous requestID is processed. func (lc *Client) ProcessLogs( ctx context.Context, requestID string, invokedFnArn string, dataChan chan []byte, isShutdown bool, ) { for { select { case logEvent := <-lc.logsChannel: if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { return } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine") return } } } func (lc *Client) FlushData( ctx context.Context, requestID string, invokedFnArn string, dataChan chan []byte, isShutdown bool, ) { lc.logger.Infof("flushing %d buffered logs", len(lc.logsChannel)) for { select { case logEvent := <-lc.logsChannel: if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { return } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs flushing") return default: if len(lc.logsChannel) == 0 { lc.logger.Debug("Flush ended for logs - no data in buffer") return } } } } func (lc *Client) handleEvent(ctx context.Context, logEvent LogEvent, requestID string, invokedFnArn string, dataChan chan []byte, isShutdown bool, ) bool { lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID) switch logEvent.Type { case PlatformStart: lc.invocationLifecycler.OnPlatformStart(logEvent.Record.RequestID) case PlatformRuntimeDone: if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone( logEvent.Record.RequestID, logEvent.Record.Status, logEvent.Time, ); err != nil { lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err) } // For invocation events the platform.runtimeDone would be the last possible event. if !isShutdown && logEvent.Record.RequestID == requestID { lc.logger.Debugf( "Processed runtime done event for reqID %s as the last log event for the invocation", logEvent.Record.RequestID, ) return true } case PlatformReport: fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID) if err != nil { lc.logger.Warnf("Failed to process platform report: %v", err) } else { lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent) if err != nil { lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) } else { select { case dataChan <- processedMetrics: case <-ctx.Done(): } } } // For shutdown event the platform report metrics for the previous log event // would be the last possible log event. After processing this metric the // invocation lifecycler's cache should be empty. if isShutdown && lc.invocationLifecycler.Size() == 0 { lc.logger.Debugf( "Processed platform report event for reqID %s as the last log event before shutdown", logEvent.Record.RequestID, ) return true } case PlatformLogsDropped: lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) case FunctionLog: processedLog, err := ProcessFunctionLog( lc.invocationLifecycler.PlatformStartReqID(), invokedFnArn, logEvent, ) if err != nil { lc.logger.Warnf("Error processing function log : %v", err) } else { select { case dataChan <- processedLog: case <-ctx.Done(): } } } return false }