in logsapi/event.go [105:176]
func (lc *Client) handleEvent(ctx context.Context,
logEvent LogEvent,
requestID string,
invokedFnArn string,
dataChan chan []byte,
isShutdown bool,
) bool {
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
switch logEvent.Type {
case PlatformStart:
lc.invocationLifecycler.OnPlatformStart(logEvent.Record.RequestID)
case PlatformRuntimeDone:
if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone(
logEvent.Record.RequestID,
logEvent.Record.Status,
logEvent.Time,
); err != nil {
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
}
// For invocation events the platform.runtimeDone would be the last possible event.
if !isShutdown && logEvent.Record.RequestID == requestID {
lc.logger.Debugf(
"Processed runtime done event for reqID %s as the last log event for the invocation",
logEvent.Record.RequestID,
)
return true
}
case PlatformReport:
fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID)
if err != nil {
lc.logger.Warnf("Failed to process platform report: %v", err)
} else {
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent)
if err != nil {
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
} else {
select {
case dataChan <- processedMetrics:
case <-ctx.Done():
}
}
}
// For shutdown event the platform report metrics for the previous log event
// would be the last possible log event. After processing this metric the
// invocation lifecycler's cache should be empty.
if isShutdown && lc.invocationLifecycler.Size() == 0 {
lc.logger.Debugf(
"Processed platform report event for reqID %s as the last log event before shutdown",
logEvent.Record.RequestID,
)
return true
}
case PlatformLogsDropped:
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
case FunctionLog:
processedLog, err := ProcessFunctionLog(
lc.invocationLifecycler.PlatformStartReqID(),
invokedFnArn,
logEvent,
)
if err != nil {
lc.logger.Warnf("Error processing function log : %v", err)
} else {
select {
case dataChan <- processedLog:
case <-ctx.Done():
}
}
}
return false
}