module/apmgoredis/client.go (110 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmgoredis // import "go.elastic.co/apm/module/apmgoredis/v2" import ( "context" "strings" "github.com/go-redis/redis" "go.elastic.co/apm/v2" ) // Client is the interface returned by Wrap. // // Client implements redis.UniversalClient type Client interface { redis.UniversalClient // Client() returns the wrapped *redis.Client, // or nil if a non-normal client is wrapped RedisClient() *redis.Client // ClusterClient returns the wrapped *redis.ClusterClient, // or nil if a non-cluster client is wrapped. Cluster() *redis.ClusterClient // Ring returns the wrapped *redis.Ring, // or nil if a non-ring client is wrapped. RingClient() *redis.Ring // WithContext returns a shallow copy of the client with // its context changed to ctx and will add instrumentation // with client.WrapProcess and client.WrapProcessPipeline // // To report commands as spans, ctx must contain a transaction or span. WithContext(ctx context.Context) Client } // Wrap wraps client such that executed commands are reported as spans to Elastic APM, // using the client's associated context. // A context-specific client may be obtained by using Client.WithContext. func Wrap(client redis.UniversalClient) Client { switch client.(type) { case *redis.Client: return contextClient{Client: client.(*redis.Client)} case *redis.ClusterClient: return contextClusterClient{ClusterClient: client.(*redis.ClusterClient)} case *redis.Ring: return contextRingClient{Ring: client.(*redis.Ring)} } return client.(Client) } type contextClient struct { *redis.Client } func (c contextClient) WithContext(ctx context.Context) Client { c.Client = c.Client.WithContext(ctx) c.WrapProcess(process(ctx)) c.WrapProcessPipeline(processPipeline(ctx)) return c } func (c contextClient) RedisClient() *redis.Client { return c.Client } func (c contextClient) Cluster() *redis.ClusterClient { return nil } func (c contextClient) RingClient() *redis.Ring { return nil } type contextClusterClient struct { *redis.ClusterClient } func (c contextClusterClient) RedisClient() *redis.Client { return nil } func (c contextClusterClient) Cluster() *redis.ClusterClient { return c.ClusterClient } func (c contextClusterClient) RingClient() *redis.Ring { return nil } func (c contextClusterClient) WithContext(ctx context.Context) Client { c.ClusterClient = c.ClusterClient.WithContext(ctx) c.WrapProcess(process(ctx)) c.WrapProcessPipeline(processPipeline(ctx)) return c } type contextRingClient struct { *redis.Ring } func (c contextRingClient) RedisClient() *redis.Client { return nil } func (c contextRingClient) Cluster() *redis.ClusterClient { return nil } func (c contextRingClient) RingClient() *redis.Ring { return c.Ring } func (c contextRingClient) WithContext(ctx context.Context) Client { c.Ring = c.Ring.WithContext(ctx) c.WrapProcess(process(ctx)) c.WrapProcessPipeline(processPipeline(ctx)) return c } func process(ctx context.Context) func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { return func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error { return func(cmd redis.Cmder) error { spanName := strings.ToUpper(cmd.Name()) span, _ := apm.StartSpanOptions(ctx, spanName, "db.redis", apm.SpanOptions{ ExitSpan: true, }) defer span.End() return oldProcess(cmd) } } } func processPipeline(ctx context.Context) func(oldProcess func(cmds []redis.Cmder) error) func(cmds []redis.Cmder) error { return func(oldProcess func(cmds []redis.Cmder) error) func(cmds []redis.Cmder) error { return func(cmds []redis.Cmder) error { pipelineSpan, ctx := apm.StartSpanOptions(ctx, "(pipeline)", "db.redis", apm.SpanOptions{ ExitSpan: true, }) defer pipelineSpan.End() for i := len(cmds); i > 0; i-- { cmdName := strings.ToUpper(cmds[i-1].Name()) if cmdName == "" { cmdName = "(empty command)" } span, _ := apm.StartSpan(ctx, cmdName, "db.redis") defer span.End() } return oldProcess(cmds) } } }