module/apmawssdkgo/session.go (141 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo/v2" import ( "context" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/stacktrace" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" ) func init() { stacktrace.RegisterLibraryPackage( "github.com/aws/aws-sdk-go", ) } // WrapSession wraps the provided AWS session with handlers that hook into the // AWS SDK's request lifecycle. Supported services are listed in serviceTypeMap // variable below. func WrapSession(s *session.Session) *session.Session { s.Handlers.Build.PushFrontNamed(request.NamedHandler{ Name: "go.elastic.co/apm/module/apmawssdkgo/build", Fn: build, }) s.Handlers.Send.PushFrontNamed(request.NamedHandler{ Name: "go.elastic.co/apm/module/apmawssdkgo/send", Fn: send, }) s.Handlers.Complete.PushBackNamed(request.NamedHandler{ Name: "go.elastic.co/apm/module/apmawssdkgo/complete", Fn: complete, }) return s } // We add AWS spans to context using a separate context key, to avoid // modifying spans not created by the "build" handler. type awsSpanKey struct{} const ( serviceS3 = "s3" serviceDynamoDB = "dynamodb" serviceSQS = "sqs" serviceSNS = "sns" ) var ( serviceTypeMap = map[string]string{ serviceS3: "storage", serviceDynamoDB: "db", serviceSQS: "messaging", serviceSNS: "messaging", } ) type service interface { spanName() string resource() string targetName() string setAdditional(*apm.Span) } func build(req *request.Request) { spanSubtype := req.ClientInfo.ServiceName spanType, ok := serviceTypeMap[spanSubtype] if !ok { // Not a supported service. return } if spanSubtype == serviceSNS && !supportedSNSMethod(req) { return } if spanSubtype == serviceSQS && !supportedSQSMethod(req) { return } ctx := req.Context() tx := apm.TransactionFromContext(ctx) if tx == nil { return } // The span name is added in the `send()` function, after other // handlers have generated the necessary information on the request. span := tx.StartExitSpan("", spanType, apm.SpanFromContext(ctx)) if span.Dropped() { span.End() return } switch spanSubtype { case serviceSQS: addMessageAttributesSQS(req, span, tx.ShouldPropagateLegacyHeader()) case serviceSNS: addMessageAttributesSNS(req, span, tx.ShouldPropagateLegacyHeader()) } ctx = apm.ContextWithSpan(ctx, span) ctx = context.WithValue(ctx, awsSpanKey{}, span) req.SetContext(ctx) } func send(req *request.Request) { if req.RetryCount > 0 { return } span, ok := req.Context().Value(awsSpanKey{}).(*apm.Span) if !ok { return } var ( svc service err error ) spanSubtype := req.ClientInfo.ServiceName switch spanSubtype { case serviceS3: svc = newS3(req) case serviceDynamoDB: svc = newDynamoDB(req) case serviceSQS: if svc, err = newSQS(req); err != nil { // Unsupported method type or queue name. return } case serviceSNS: if svc, err = newSNS(req); err != nil { // Unsupported method type or queue name. return } default: // Unsupported type return } span.Name = svc.spanName() span.Subtype = spanSubtype span.Action = req.Operation.Name span.Context.SetHTTPRequest(req.HTTPRequest) span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Name: spanSubtype, Resource: svc.resource(), }) span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Type: spanSubtype, Name: svc.targetName(), }) if region := req.Config.Region; region != nil { span.Context.SetDestinationCloud(apm.DestinationCloudSpanContext{ Region: *region, }) } svc.setAdditional(span) } func complete(req *request.Request) { ctx := req.Context() span, ok := ctx.Value(awsSpanKey{}).(*apm.Span) if !ok { return } defer span.End() span.Context.SetHTTPStatusCode(req.HTTPResponse.StatusCode) if err := req.Error; err != nil { apm.CaptureError(ctx, err).Send() } }