module/apmlambda/lambda.go (118 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmlambda // import "go.elastic.co/apm/module/apmlambda/v2" import ( "log" "net" "net/rpc" "os" "unicode/utf8" "github.com/aws/aws-lambda-go/lambda/messages" "github.com/aws/aws-lambda-go/lambdacontext" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/stacktrace" ) const ( // TODO(axw) make this configurable via environment payloadLimit = 1024 ) var ( // nonBlocking is passed to Tracer.Flush so it does not block functions. nonBlocking = make(chan struct{}) // Globals below used during tracing, to avoid reallocating for each // invocation. Only one invocation will happen at a time. lambdaContext struct { RequestID string `json:"request_id,omitempty"` Region string `json:"region,omitempty"` XAmznTraceID string `json:"x_amzn_trace_id,omitempty"` FunctionVersion string `json:"function_version,omitempty"` MemoryLimit int `json:"memory_limit,omitempty"` Request string `json:"request,omitempty"` Response string `json:"response,omitempty"` } ) func init() { close(nonBlocking) lambdaContext.FunctionVersion = lambdacontext.FunctionVersion lambdaContext.MemoryLimit = lambdacontext.MemoryLimitInMB lambdaContext.Region = os.Getenv("AWS_REGION") } // Function is type exposed via net/rpc, to match the signature implemented // by the aws-lambda-go package. type Function struct { client *rpc.Client tracer *apm.Tracer } // Ping pings the function implementation. func (f *Function) Ping(req *messages.PingRequest, response *messages.PingResponse) error { return f.client.Call("Function.Ping", req, response) } // Invoke invokes the Lambda function. This is our main trace point. func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.InvokeResponse) error { tx := f.tracer.StartTransaction(lambdacontext.FunctionName, "function") defer f.tracer.Flush(nonBlocking) defer tx.End() defer func() { if v := recover(); v != nil { e := f.tracer.Recovered(v) e.SetTransaction(tx) e.Send() } }() // TODO(axw) define a schema for lambda/serverless/FaaS. lambdaContext.RequestID = req.RequestId lambdaContext.XAmznTraceID = req.XAmznTraceId lambdaContext.Request = formatPayload(req.Payload) lambdaContext.Response = "" err := f.client.Call("Function.Invoke", req, response) if err != nil { e := f.tracer.NewError(err) e.SetTransaction(tx) e.Send() return err } if response.Payload != nil { lambdaContext.Response = formatPayload(response.Payload) } if response.Error != nil { e := f.tracer.NewError(invokeResponseError{response.Error}) e.SetTransaction(tx) e.Send() } return nil } type invokeResponseError struct { err *messages.InvokeResponse_Error } func (e invokeResponseError) Error() string { return e.err.Message } func (e invokeResponseError) Type() string { return e.err.Type } func (e invokeResponseError) StackTrace() []stacktrace.Frame { frames := make([]stacktrace.Frame, len(e.err.StackTrace)) for i, f := range e.err.StackTrace { frames[i] = stacktrace.Frame{ Function: f.Label, File: f.Path, Line: int(f.Line), } } return frames } func formatPayload(payload []byte) string { if len(payload) > payloadLimit { payload = payload[:payloadLimit] } if !utf8.Valid(payload) { return "" } return string(payload) } func init() { pipeClient, pipeServer := net.Pipe() rpcClient := rpc.NewClient(pipeClient) go rpc.DefaultServer.ServeConn(pipeServer) origPort := os.Getenv("_LAMBDA_SERVER_PORT") lis, err := net.Listen("tcp", "localhost:"+origPort) if err != nil { log.Fatal(err) } srv := rpc.NewServer() srv.Register(&Function{ client: rpcClient, tracer: apm.DefaultTracer(), }) go srv.Accept(lis) // Setting _LAMBDA_SERVER_PORT causes lambda.Start // to listen on any free port. We don't care which; // we don't use it. os.Setenv("_LAMBDA_SERVER_PORT", "0") } // TODO(axw) Start() function, which wraps a given function // such that its context is updated with the transaction.