in app/run.go [144:249]
func (app *App) processEvent(
ctx context.Context,
backgroundDataSendWg *sync.WaitGroup,
) (*extension.NextEventResponse, error) {
// Reset flush state for future events.
defer app.apmClient.ResetFlush()
// Invocation context
invocationCtx, invocationCancel := context.WithCancel(ctx)
defer invocationCancel()
// call Next method of extension API. This long polling HTTP method
// will block until there's an invocation of the function
app.logger.Info("Waiting for next event...")
event, err := app.extensionClient.NextEvent(ctx)
if err != nil {
app.logger.Errorf("Error: %s", err)
status, errRuntime := app.extensionClient.ExitError(ctx, err.Error())
if errRuntime != nil {
return nil, errRuntime
}
app.logger.Infof("Exit signal sent to runtime : %s", status)
app.logger.Info("Exiting")
return nil, err
}
// Used to compute Lambda Timeout
event.Timestamp = time.Now()
app.logger.Debug("Received event.")
app.logger.Debugf("%v", extension.PrettyPrint(event))
switch event.EventType {
case extension.Invoke:
app.batch.RegisterInvocation(
event.RequestID,
event.InvokedFunctionArn,
event.DeadlineMs,
event.Timestamp,
)
case extension.Shutdown:
// platform.report metric (and some other metrics) might not have been
// reported by the logs API even till shutdown. At shutdown we will make
// a last attempt to collect and report these metrics. However, it is
// also possible that lambda has init a few execution env preemptively,
// for such cases the extension will see only a SHUTDOWN event and
// there is no need to wait for any log event.
if app.batch.Size() == 0 {
return event, nil
}
}
// APM Data Processing
backgroundDataSendWg.Add(1)
go func() {
defer backgroundDataSendWg.Done()
if err := app.apmClient.ForwardApmData(invocationCtx); err != nil {
app.logger.Error(err)
}
}()
// Lambda Service Logs Processing, also used to extract metrics from APM logs
// This goroutine should not be started if subscription failed
logProcessingDone := make(chan struct{})
if app.logsClient != nil {
go func() {
defer close(logProcessingDone)
app.logsClient.ProcessLogs(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient.LambdaDataChannel,
event.EventType == extension.Shutdown,
)
}()
} else {
app.logger.Warn("Logs collection not started due to earlier subscription failure")
}
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
flushDeadlineMs := event.DeadlineMs - 200
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))
// Create a timer that expires after durationUntilFlushDeadline
timer := time.NewTimer(durationUntilFlushDeadline)
defer timer.Stop()
// The extension relies on 3 independent mechanisms to minimize the time interval
// between the end of the execution of the lambda function and the end of the
// execution of processEvent():
// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
// 2) [Backup 1] All expected log events are processed.
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
// function to interrupt itself 200ms before the specified deadline to give the extension
// time to flush data before shutdown.
select {
case <-app.apmClient.WaitForFlush():
app.logger.Debug("APM client has sent flush signal")
case <-logProcessingDone:
app.logger.Debug("Received runtimeDone signal")
case <-timer.C:
app.logger.Info("Time expired while waiting for agent done signal or final log event")
}
return event, nil
}