in app/run.go [30:142]
func (app *App) Run(ctx context.Context) error {
// register extension with AWS Extension API
res, err := app.extensionClient.Register(ctx, app.extensionName)
if err != nil {
app.logger.Errorf("Error: %s", err)
status, errRuntime := app.extensionClient.InitError(ctx, err.Error())
if errRuntime != nil {
return errRuntime
}
app.logger.Infof("Init error signal sent to runtime : %s", status)
app.logger.Infof("Exiting")
return err
}
app.logger.Debugf("Register response: %v", extension.PrettyPrint(res))
// start http server to receive data from agent
err = app.apmClient.StartReceiver()
if err != nil {
return fmt.Errorf("failed to start the APM data receiver : %w", err)
}
defer func() {
if err := app.apmClient.Shutdown(); err != nil {
app.logger.Warnf("Error while shutting down the apm receiver: %v", err)
}
}()
// Flush all data before shutting down.
defer func() {
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
app.apmClient.FlushAPMData(flushCtx)
}()
if app.logsClient != nil {
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
// disable logs API if the service failed to start
app.logsClient = nil
} else {
// Remember to shutdown the log service if available.
defer func() {
if err := app.logsClient.Shutdown(); err != nil {
app.logger.Warnf("failed to shutdown the log service: %v", err)
}
}()
}
}
for {
select {
case <-ctx.Done():
app.logger.Info("Received a signal, exiting...")
return nil
default:
// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.
var backgroundDataSendWg sync.WaitGroup
event, err := app.processEvent(ctx, &backgroundDataSendWg)
if err != nil {
return err
}
app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if event.EventType == extension.Shutdown {
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
}
// Since we have waited for the processEvent loop to finish we
// already have received all the data we can from the agent. So, we
// flush all the data to make sure that shutdown can correctly deduce
// any pending transactions.
app.apmClient.FlushAPMData(ctx)
// At shutdown we can not expect platform.runtimeDone events to be
// reported for the remaining invocations. If we haven't received the
// transaction from agents at this point then it is safe to assume
// that the function failed. We will create proxy transaction for all
// invocations that haven't received a full transaction from the agent
// yet. If extension doesn't have enough CPU time it is possible that
// the extension might not receive the shutdown signal for timeouts
// or runtime crashes. In these cases we will miss the transaction.
//
// TODO (lahsivjar): Any partial transaction remaining will be added
// to a new batch by OnShutdown and flushed from the defer call to
// flush all data when this function exits. This causes 2 triggers
// of flush, we can optimize this by clearing all buffered channel
// then calling OnShutdown and finally flushing any remaining data.
if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
}
return nil
}
if app.apmClient.ShouldFlush() {
// Use a new cancellable context for flushing APM data to make sure
// that the underlying transport is reset for next invocation without
// waiting for grace period if it got to unhealthy state.
flushCtx, cancel := context.WithCancel(ctx)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
}
// Flush APM data now that the function invocation has completed
app.apmClient.FlushAPMData(flushCtx)
cancel()
}
}
}
}