app/run.go (153 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package app
import (
"context"
"fmt"
"sync"
"time"
"github.com/elastic/apm-aws-lambda/extension"
)
// Run runs the app.
func (app *App) Run(ctx context.Context) error {
// register extension with AWS Extension API
res, err := app.extensionClient.Register(ctx, app.extensionName)
if err != nil {
app.logger.Errorf("Error: %s", err)
status, errRuntime := app.extensionClient.InitError(ctx, err.Error())
if errRuntime != nil {
return errRuntime
}
app.logger.Infof("Init error signal sent to runtime : %s", status)
app.logger.Infof("Exiting")
return err
}
app.logger.Debugf("Register response: %v", extension.PrettyPrint(res))
// start http server to receive data from agent
err = app.apmClient.StartReceiver()
if err != nil {
return fmt.Errorf("failed to start the APM data receiver : %w", err)
}
defer func() {
if err := app.apmClient.Shutdown(); err != nil {
app.logger.Warnf("Error while shutting down the apm receiver: %v", err)
}
}()
// Flush all data before shutting down.
defer func() {
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
app.apmClient.FlushAPMData(flushCtx)
}()
if app.logsClient != nil {
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
// disable logs API if the service failed to start
app.logsClient = nil
} else {
// Remember to shutdown the log service if available.
defer func() {
if err := app.logsClient.Shutdown(); err != nil {
app.logger.Warnf("failed to shutdown the log service: %v", err)
}
}()
}
}
for {
select {
case <-ctx.Done():
app.logger.Info("Received a signal, exiting...")
return nil
default:
// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.
var backgroundDataSendWg sync.WaitGroup
event, err := app.processEvent(ctx, &backgroundDataSendWg)
if err != nil {
return err
}
app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if event.EventType == extension.Shutdown {
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
}
// Since we have waited for the processEvent loop to finish we
// already have received all the data we can from the agent. So, we
// flush all the data to make sure that shutdown can correctly deduce
// any pending transactions.
app.apmClient.FlushAPMData(ctx)
// At shutdown we can not expect platform.runtimeDone events to be
// reported for the remaining invocations. If we haven't received the
// transaction from agents at this point then it is safe to assume
// that the function failed. We will create proxy transaction for all
// invocations that haven't received a full transaction from the agent
// yet. If extension doesn't have enough CPU time it is possible that
// the extension might not receive the shutdown signal for timeouts
// or runtime crashes. In these cases we will miss the transaction.
//
// TODO (lahsivjar): Any partial transaction remaining will be added
// to a new batch by OnShutdown and flushed from the defer call to
// flush all data when this function exits. This causes 2 triggers
// of flush, we can optimize this by clearing all buffered channel
// then calling OnShutdown and finally flushing any remaining data.
if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
}
return nil
}
if app.apmClient.ShouldFlush() {
// Use a new cancellable context for flushing APM data to make sure
// that the underlying transport is reset for next invocation without
// waiting for grace period if it got to unhealthy state.
flushCtx, cancel := context.WithCancel(ctx)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
}
// Flush APM data now that the function invocation has completed
app.apmClient.FlushAPMData(flushCtx)
cancel()
}
}
}
}
func (app *App) processEvent(
ctx context.Context,
backgroundDataSendWg *sync.WaitGroup,
) (*extension.NextEventResponse, error) {
// Reset flush state for future events.
defer app.apmClient.ResetFlush()
// Invocation context
invocationCtx, invocationCancel := context.WithCancel(ctx)
defer invocationCancel()
// call Next method of extension API. This long polling HTTP method
// will block until there's an invocation of the function
app.logger.Info("Waiting for next event...")
event, err := app.extensionClient.NextEvent(ctx)
if err != nil {
app.logger.Errorf("Error: %s", err)
status, errRuntime := app.extensionClient.ExitError(ctx, err.Error())
if errRuntime != nil {
return nil, errRuntime
}
app.logger.Infof("Exit signal sent to runtime : %s", status)
app.logger.Info("Exiting")
return nil, err
}
// Used to compute Lambda Timeout
event.Timestamp = time.Now()
app.logger.Debug("Received event.")
app.logger.Debugf("%v", extension.PrettyPrint(event))
switch event.EventType {
case extension.Invoke:
app.batch.RegisterInvocation(
event.RequestID,
event.InvokedFunctionArn,
event.DeadlineMs,
event.Timestamp,
)
case extension.Shutdown:
// platform.report metric (and some other metrics) might not have been
// reported by the logs API even till shutdown. At shutdown we will make
// a last attempt to collect and report these metrics. However, it is
// also possible that lambda has init a few execution env preemptively,
// for such cases the extension will see only a SHUTDOWN event and
// there is no need to wait for any log event.
if app.batch.Size() == 0 {
return event, nil
}
}
// APM Data Processing
backgroundDataSendWg.Add(1)
go func() {
defer backgroundDataSendWg.Done()
if err := app.apmClient.ForwardApmData(invocationCtx); err != nil {
app.logger.Error(err)
}
}()
// Lambda Service Logs Processing, also used to extract metrics from APM logs
// This goroutine should not be started if subscription failed
logProcessingDone := make(chan struct{})
if app.logsClient != nil {
go func() {
defer close(logProcessingDone)
app.logsClient.ProcessLogs(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient.LambdaDataChannel,
event.EventType == extension.Shutdown,
)
}()
} else {
app.logger.Warn("Logs collection not started due to earlier subscription failure")
}
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
flushDeadlineMs := event.DeadlineMs - 200
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))
// Create a timer that expires after durationUntilFlushDeadline
timer := time.NewTimer(durationUntilFlushDeadline)
defer timer.Stop()
// The extension relies on 3 independent mechanisms to minimize the time interval
// between the end of the execution of the lambda function and the end of the
// execution of processEvent():
// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
// 2) [Backup 1] All expected log events are processed.
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
// function to interrupt itself 200ms before the specified deadline to give the extension
// time to flush data before shutdown.
select {
case <-app.apmClient.WaitForFlush():
app.logger.Debug("APM client has sent flush signal")
case <-logProcessingDone:
app.logger.Debug("Received runtimeDone signal")
case <-timer.C:
app.logger.Info("Time expired while waiting for agent done signal or final log event")
}
return event, nil
}