func()

in app/run.go [30:142]


func (app *App) Run(ctx context.Context) error {
	// register extension with AWS Extension API
	res, err := app.extensionClient.Register(ctx, app.extensionName)
	if err != nil {
		app.logger.Errorf("Error: %s", err)

		status, errRuntime := app.extensionClient.InitError(ctx, err.Error())
		if errRuntime != nil {
			return errRuntime
		}

		app.logger.Infof("Init error signal sent to runtime : %s", status)
		app.logger.Infof("Exiting")
		return err
	}
	app.logger.Debugf("Register response: %v", extension.PrettyPrint(res))

	// start http server to receive data from agent
	err = app.apmClient.StartReceiver()
	if err != nil {
		return fmt.Errorf("failed to start the APM data receiver : %w", err)
	}
	defer func() {
		if err := app.apmClient.Shutdown(); err != nil {
			app.logger.Warnf("Error while shutting down the apm receiver: %v", err)
		}
	}()

	// Flush all data before shutting down.
	defer func() {
		flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		app.apmClient.FlushAPMData(flushCtx)
	}()

	if app.logsClient != nil {
		if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
			app.logger.Warnf("Error while subscribing to the Logs API: %v", err)

			// disable logs API if the service failed to start
			app.logsClient = nil
		} else {
			// Remember to shutdown the log service if available.
			defer func() {
				if err := app.logsClient.Shutdown(); err != nil {
					app.logger.Warnf("failed to shutdown the log service: %v", err)
				}
			}()
		}
	}

	for {
		select {
		case <-ctx.Done():
			app.logger.Info("Received a signal, exiting...")
			return nil
		default:
			// Use a wait group to ensure the background go routine sending to the APM server
			// completes before signaling that the extension is ready for the next invocation.
			var backgroundDataSendWg sync.WaitGroup
			event, err := app.processEvent(ctx, &backgroundDataSendWg)
			if err != nil {
				return err
			}
			app.logger.Debug("Waiting for background data send to end")
			backgroundDataSendWg.Wait()
			if event.EventType == extension.Shutdown {
				app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
				if app.logsClient != nil {
					// Flush buffered logs if any
					app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
				}
				// Since we have waited for the processEvent loop to finish we
				// already have received all the data we can from the agent. So, we
				// flush all the data to make sure that shutdown can correctly deduce
				// any pending transactions.
				app.apmClient.FlushAPMData(ctx)
				// At shutdown we can not expect platform.runtimeDone events to be
				// reported for the remaining invocations. If we haven't received the
				// transaction from agents at this point then it is safe to assume
				// that the function failed. We will create proxy transaction for all
				// invocations that haven't received a full transaction from the agent
				// yet. If extension doesn't have enough CPU time it is possible that
				// the extension might not receive the shutdown signal for timeouts
				// or runtime crashes. In these cases we will miss the transaction.
				//
				// TODO (lahsivjar): Any partial transaction remaining will be added
				// to a new batch by OnShutdown and flushed from the defer call to
				// flush all data when this function exits. This causes 2 triggers
				// of flush, we can optimize this by clearing all buffered channel
				// then calling OnShutdown and finally flushing any remaining data.
				if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
					app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
				}
				return nil
			}
			if app.apmClient.ShouldFlush() {
				// Use a new cancellable context for flushing APM data to make sure
				// that the underlying transport is reset for next invocation without
				// waiting for grace period if it got to unhealthy state.
				flushCtx, cancel := context.WithCancel(ctx)
				if app.logsClient != nil {
					// Flush buffered logs if any
					app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
				}
				// Flush APM data now that the function invocation has completed
				app.apmClient.FlushAPMData(flushCtx)
				cancel()
			}
		}
	}
}