func()

in app/run.go [144:249]


func (app *App) processEvent(
	ctx context.Context,
	backgroundDataSendWg *sync.WaitGroup,
) (*extension.NextEventResponse, error) {
	// Reset flush state for future events.
	defer app.apmClient.ResetFlush()

	// Invocation context
	invocationCtx, invocationCancel := context.WithCancel(ctx)
	defer invocationCancel()

	// call Next method of extension API.  This long polling HTTP method
	// will block until there's an invocation of the function
	app.logger.Info("Waiting for next event...")
	event, err := app.extensionClient.NextEvent(ctx)
	if err != nil {
		app.logger.Errorf("Error: %s", err)

		status, errRuntime := app.extensionClient.ExitError(ctx, err.Error())
		if errRuntime != nil {
			return nil, errRuntime
		}

		app.logger.Infof("Exit signal sent to runtime : %s", status)
		app.logger.Info("Exiting")
		return nil, err
	}

	// Used to compute Lambda Timeout
	event.Timestamp = time.Now()
	app.logger.Debug("Received event.")
	app.logger.Debugf("%v", extension.PrettyPrint(event))

	switch event.EventType {
	case extension.Invoke:
		app.batch.RegisterInvocation(
			event.RequestID,
			event.InvokedFunctionArn,
			event.DeadlineMs,
			event.Timestamp,
		)
	case extension.Shutdown:
		// platform.report metric (and some other metrics) might not have been
		// reported by the logs API even till shutdown. At shutdown we will make
		// a last attempt to collect and report these metrics. However, it is
		// also possible that lambda has init a few execution env preemptively,
		// for such cases the extension will see only a SHUTDOWN event and
		// there is no need to wait for any log event.
		if app.batch.Size() == 0 {
			return event, nil
		}
	}

	// APM Data Processing
	backgroundDataSendWg.Add(1)
	go func() {
		defer backgroundDataSendWg.Done()
		if err := app.apmClient.ForwardApmData(invocationCtx); err != nil {
			app.logger.Error(err)
		}
	}()

	// Lambda Service Logs Processing, also used to extract metrics from APM logs
	// This goroutine should not be started if subscription failed
	logProcessingDone := make(chan struct{})
	if app.logsClient != nil {
		go func() {
			defer close(logProcessingDone)
			app.logsClient.ProcessLogs(
				invocationCtx,
				event.RequestID,
				event.InvokedFunctionArn,
				app.apmClient.LambdaDataChannel,
				event.EventType == extension.Shutdown,
			)
		}()
	} else {
		app.logger.Warn("Logs collection not started due to earlier subscription failure")
	}

	// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
	flushDeadlineMs := event.DeadlineMs - 200
	durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))

	// Create a timer that expires after durationUntilFlushDeadline
	timer := time.NewTimer(durationUntilFlushDeadline)
	defer timer.Stop()

	// The extension relies on 3 independent mechanisms to minimize the time interval
	// between the end of the execution of the lambda function and the end of the
	// execution of processEvent():
	// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
	// 2) [Backup 1] All expected log events are processed.
	// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
	// function to interrupt itself 200ms before the specified deadline to give the extension
	// time to flush data before shutdown.
	select {
	case <-app.apmClient.WaitForFlush():
		app.logger.Debug("APM client has sent flush signal")
	case <-logProcessingDone:
		app.logger.Debug("Received runtimeDone signal")
	case <-timer.C:
		app.logger.Info("Time expired while waiting for agent done signal or final log event")
	}
	return event, nil
}