app/app.go (175 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package app import ( "context" "fmt" "os" "strconv" "strings" "time" "github.com/elastic/apm-aws-lambda/accumulator" "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/elastic/apm-aws-lambda/extension" "github.com/elastic/apm-aws-lambda/logger" "github.com/elastic/apm-aws-lambda/logsapi" "go.elastic.co/ecszap" "go.uber.org/zap" ) const ( defaultMaxBatchSize = 50 defaultMaxBatchAge = 2 * time.Second ) // App is the main application. type App struct { extensionName string extensionClient *extension.Client logsClient *logsapi.Client apmClient *apmproxy.Client logger *zap.SugaredLogger batch *accumulator.Batch } // New returns an App or an error if the creation failed. // //nolint:govet func New(ctx context.Context, opts ...ConfigOption) (*App, error) { c := appConfig{} for _, opt := range opts { opt(&c) } app := &App{ extensionName: c.extensionName, batch: accumulator.NewBatch(defaultMaxBatchSize, defaultMaxBatchAge), } var err error if app.logger, err = buildLogger(c.logLevel); err != nil { return nil, err } apmServerAPIKey, apmServerSecretToken := loadAWSOptions(ctx, c.awsConfig, app.logger) app.extensionClient = extension.NewClient(c.awsLambdaRuntimeAPI, app.logger) if !c.disableLogsAPI { addr := "sandbox.localdomain:0" if c.logsapiAddr != "" { addr = c.logsapiAddr } subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform} if c.enableFunctionLogSubscription { subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function) } app.logsClient, err = logsapi.NewClient( logsapi.WithLogsAPIBaseURL("http://"+c.awsLambdaRuntimeAPI), logsapi.WithListenerAddress(addr), logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), logsapi.WithSubscriptionTypes(subscriptionLogStreams...), logsapi.WithInvocationLifecycler(app.batch), ) if err != nil { return nil, err } } var apmOpts []apmproxy.Option if receiverTimeout, ok, err := parseDurationTimeout(app.logger, "ELASTIC_APM_DATA_RECEIVER_TIMEOUT", "ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS"); err != nil || ok { if err != nil { return nil, err } apmOpts = append(apmOpts, apmproxy.WithReceiverTimeout(receiverTimeout)) } if dataForwarderTimeout, ok, err := parseDurationTimeout(app.logger, "ELASTIC_APM_DATA_FORWARDER_TIMEOUT", "ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS"); err != nil || ok { if err != nil { return nil, err } apmOpts = append(apmOpts, apmproxy.WithDataForwarderTimeout(dataForwarderTimeout)) } if port := os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"); port != "" { apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(":"+port)) } if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok { apmOpts = append(apmOpts, apmproxy.WithSendStrategy(strategy)) } if bufferSize := os.Getenv("ELASTIC_APM_LAMBDA_AGENT_DATA_BUFFER_SIZE"); bufferSize != "" { size, err := strconv.Atoi(bufferSize) if err != nil { return nil, err } apmOpts = append(apmOpts, apmproxy.WithAgentDataBufferSize(size)) } if verifyCertsString := os.Getenv("ELASTIC_APM_LAMBDA_VERIFY_SERVER_CERT"); verifyCertsString != "" { verifyCerts, err := strconv.ParseBool(verifyCertsString) if err != nil { return nil, err } if !verifyCerts { app.logger.Infof("Ignoring Certificates.") } apmOpts = append(apmOpts, apmproxy.WithVerifyCerts(verifyCerts)) } if encodedCertPem := os.Getenv("ELASTIC_APM_LAMBDA_SERVER_CA_CERT_PEM"); encodedCertPem != "" { certPem := strings.ReplaceAll(encodedCertPem, "\\n", "\n") app.logger.Infof("Using CA certificates from environment variable.") apmOpts = append(apmOpts, apmproxy.WithRootCerts(certPem)) } if certFile := os.Getenv("ELASTIC_APM_SERVER_CA_CERT_FILE"); certFile != "" { cert, err := os.ReadFile(certFile) if err != nil { return nil, err } app.logger.Infof("Using CA certificate loaded from file %s", certFile) apmOpts = append(apmOpts, apmproxy.WithRootCerts(string(cert))) } if acmCertArn := os.Getenv("ELASTIC_APM_SERVER_CA_CERT_ACM_ID"); acmCertArn != "" { cert, err := loadAcmCertificate(ctx, acmCertArn, c.awsConfig) if err != nil { return nil, err } app.logger.Infof("Using CA certificate %s", acmCertArn) apmOpts = append(apmOpts, apmproxy.WithRootCerts(*cert)) } apmOpts = append(apmOpts, apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")), apmproxy.WithLogger(app.logger), apmproxy.WithAPIKey(apmServerAPIKey), apmproxy.WithSecretToken(apmServerSecretToken), apmproxy.WithBatch(app.batch), ) ac, err := apmproxy.NewClient(apmOpts...) if err != nil { return nil, err } app.apmClient = ac return app, nil } func parseDurationTimeout(l *zap.SugaredLogger, flag, deprecatedFlag string) (time.Duration, bool, error) { if strValue, ok := os.LookupEnv(flag); ok { d, err := time.ParseDuration(strValue) if err != nil { return 0, false, fmt.Errorf("failed to parse %s: %w", flag, err) } return d, true, nil } if strValueSeconds, ok := os.LookupEnv(deprecatedFlag); ok { l.Warnf("%s is deprecated, please consider moving to %s", deprecatedFlag, flag) seconds, err := strconv.Atoi(strValueSeconds) if err != nil { return 0, false, fmt.Errorf("failed to parse %s: %w", deprecatedFlag, err) } return time.Duration(seconds) * time.Second, true, nil } return 0, false, nil } func parseStrategy(value string) (apmproxy.SendStrategy, bool) { switch strings.ToLower(value) { case "background": return apmproxy.Background, true case "syncflush": return apmproxy.SyncFlush, true } return "", false } func buildLogger(level string) (*zap.SugaredLogger, error) { if level == "" { level = "info" } l, err := logger.ParseLogLevel(level) if err != nil { return nil, err } return logger.New( logger.WithEncoderConfig(ecszap.NewDefaultEncoderConfig().ToZapCoreEncoderConfig()), logger.WithLevel(l), ) }