module/apmazure/storage.go (139 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmazure // import "go.elastic.co/apm/module/apmazure/v2" import ( "context" "errors" "strings" "github.com/Azure/azure-pipeline-go/pipeline" "go.elastic.co/apm/module/apmhttp/v2" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/stacktrace" ) func init() { stacktrace.RegisterLibraryPackage( "github.com/Azure/azure-pipeline-go", "github.com/Azure/azure-storage-blob-go/azblob", "github.com/Azure/azure-storage-file-go/azfile", "github.com/Azure/azure-storage-queue-go/azqueue", ) } // WrapPipeline wraps the provided pipeline.Pipeline, returning a new one that // instruments requests and responses. func WrapPipeline(next pipeline.Pipeline, options ...ServerOption) pipeline.Pipeline { p := &apmPipeline{next: next} for _, opt := range options { opt(p) } if p.tracer == nil { p.tracer = apm.DefaultTracer() } return p } // ServerOption sets options for tracing requests. type ServerOption func(*apmPipeline) // WithTracer returns a ServerOption which sets t as the tracer // to use for tracing server requests. func WithTracer(t *apm.Tracer) ServerOption { if t == nil { panic("t == nil") } return func(h *apmPipeline) { h.tracer = t } } type apmPipeline struct { next pipeline.Pipeline tracer *apm.Tracer } func (p *apmPipeline) Do( ctx context.Context, methodFactory pipeline.Factory, req pipeline.Request, ) (pipeline.Response, error) { rpc, err := newAzureRPC(req) if err != nil { return p.next.Do(ctx, methodFactory, req) } var span *apm.Span if rpc._type() == "messaging" && (req.Method == "GET" || req.Method == "") { // A new transaction is created when one or more messages are // received from a queue tx := p.tracer.StartTransaction(rpc.name(), rpc._type()) ctx := apm.ContextWithTransaction(req.Context(), tx) r := req.Request.WithContext(ctx) req.Request = r defer tx.End() span = tx.StartExitSpan(rpc.name(), rpc._type(), apm.SpanFromContext(ctx)) } else { span, ctx = apm.StartSpanOptions(ctx, rpc.name(), rpc._type(), apm.SpanOptions{ ExitSpan: true, }) } defer span.End() if !span.Dropped() { ctx = apm.ContextWithSpan(ctx, span) req.Request = apmhttp.RequestWithContext(ctx, req.Request) span.Context.SetHTTPRequest(req.Request) } else { return p.next.Do(ctx, methodFactory, req) } span.Action = rpc.operation() span.Subtype = rpc.subtype() resource := rpc.subtype() + "/" + rpc.storageAccountName() if rpc.subtype() == "azurequeue" { resource = rpc.subtype() + "/" + rpc.targetName() } span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Resource: resource, }) span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Type: rpc.subtype(), Name: rpc.targetName(), }) resp, err := p.next.Do(ctx, methodFactory, req) if err != nil { apm.CaptureError(ctx, err).Send() } // We may still have a response even if err != nil // eg., the client library considers 4XX as an error but still returns // the response to us. if resp.Response() != nil { span.Context.SetHTTPStatusCode(resp.Response().StatusCode) } return resp, err } type azureRPC interface { name() string _type() string subtype() string targetName() string storageAccountName() string resource() string operation() string } func newAzureRPC(req pipeline.Request) (azureRPC, error) { split := strings.Split(req.Host, ".") accountName, storage := split[0], split[1] var rpc azureRPC switch storage { case "blob": rpc = &blobRPC{ resourceName: strings.TrimPrefix(req.URL.Path, "/"), accountName: accountName, req: req, } case "queue": rpc = &queueRPC{ resourceName: strings.TrimPrefix(req.URL.Path, "/"), accountName: accountName, req: req, queueName: queueNameFromURL(req.URL.Path), } case "file": rpc = &fileRPC{ resourceName: strings.TrimPrefix(req.URL.Path, "/"), accountName: accountName, req: req, } } if rpc == nil { return nil, errors.New("unsupported service") } return rpc, nil } func queueNameFromURL(urlPath string) string { urlPath = strings.TrimPrefix(urlPath, "/") if i := strings.Index(urlPath, "/"); i >= 0 { return urlPath[:i] } return "" }