in app/app.go [56:187]
func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
c := appConfig{}
for _, opt := range opts {
opt(&c)
}
app := &App{
extensionName: c.extensionName,
batch: accumulator.NewBatch(defaultMaxBatchSize, defaultMaxBatchAge),
}
var err error
if app.logger, err = buildLogger(c.logLevel); err != nil {
return nil, err
}
apmServerAPIKey, apmServerSecretToken := loadAWSOptions(ctx, c.awsConfig, app.logger)
app.extensionClient = extension.NewClient(c.awsLambdaRuntimeAPI, app.logger)
if !c.disableLogsAPI {
addr := "sandbox.localdomain:0"
if c.logsapiAddr != "" {
addr = c.logsapiAddr
}
subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform}
if c.enableFunctionLogSubscription {
subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function)
}
app.logsClient, err = logsapi.NewClient(
logsapi.WithLogsAPIBaseURL("http://"+c.awsLambdaRuntimeAPI),
logsapi.WithListenerAddress(addr),
logsapi.WithLogBuffer(100),
logsapi.WithLogger(app.logger),
logsapi.WithSubscriptionTypes(subscriptionLogStreams...),
logsapi.WithInvocationLifecycler(app.batch),
)
if err != nil {
return nil, err
}
}
var apmOpts []apmproxy.Option
if receiverTimeout, ok, err := parseDurationTimeout(app.logger, "ELASTIC_APM_DATA_RECEIVER_TIMEOUT", "ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS"); err != nil || ok {
if err != nil {
return nil, err
}
apmOpts = append(apmOpts, apmproxy.WithReceiverTimeout(receiverTimeout))
}
if dataForwarderTimeout, ok, err := parseDurationTimeout(app.logger, "ELASTIC_APM_DATA_FORWARDER_TIMEOUT", "ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS"); err != nil || ok {
if err != nil {
return nil, err
}
apmOpts = append(apmOpts, apmproxy.WithDataForwarderTimeout(dataForwarderTimeout))
}
if port := os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"); port != "" {
apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(":"+port))
}
if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok {
apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy))
}
if bufferSize := os.Getenv("ELASTIC_APM_LAMBDA_AGENT_DATA_BUFFER_SIZE"); bufferSize != "" {
size, err := strconv.Atoi(bufferSize)
if err != nil {
return nil, err
}
apmOpts = append(apmOpts, apmproxy.WithAgentDataBufferSize(size))
}
if verifyCertsString := os.Getenv("ELASTIC_APM_LAMBDA_VERIFY_SERVER_CERT"); verifyCertsString != "" {
verifyCerts, err := strconv.ParseBool(verifyCertsString)
if err != nil {
return nil, err
}
if !verifyCerts {
app.logger.Infof("Ignoring Certificates.")
}
apmOpts = append(apmOpts, apmproxy.WithVerifyCerts(verifyCerts))
}
if encodedCertPem := os.Getenv("ELASTIC_APM_LAMBDA_SERVER_CA_CERT_PEM"); encodedCertPem != "" {
certPem := strings.ReplaceAll(encodedCertPem, "\\n", "\n")
app.logger.Infof("Using CA certificates from environment variable.")
apmOpts = append(apmOpts, apmproxy.WithRootCerts(certPem))
}
if certFile := os.Getenv("ELASTIC_APM_SERVER_CA_CERT_FILE"); certFile != "" {
cert, err := os.ReadFile(certFile)
if err != nil {
return nil, err
}
app.logger.Infof("Using CA certificate loaded from file %s", certFile)
apmOpts = append(apmOpts, apmproxy.WithRootCerts(string(cert)))
}
if acmCertArn := os.Getenv("ELASTIC_APM_SERVER_CA_CERT_ACM_ID"); acmCertArn != "" {
cert, err := loadAcmCertificate(ctx, acmCertArn, c.awsConfig)
if err != nil {
return nil, err
}
app.logger.Infof("Using CA certificate %s", acmCertArn)
apmOpts = append(apmOpts, apmproxy.WithRootCerts(*cert))
}
apmOpts = append(apmOpts,
apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")),
apmproxy.WithLogger(app.logger),
apmproxy.WithAPIKey(apmServerAPIKey),
apmproxy.WithSecretToken(apmServerSecretToken),
apmproxy.WithBatch(app.batch),
)
ac, err := apmproxy.NewClient(apmOpts...)
if err != nil {
return nil, err
}
app.apmClient = ac
return app, nil
}