module/apmpgx/tracer.go (138 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmpgx // import "go.elastic.co/apm/module/apmpgx/v2" import ( "context" "errors" "fmt" "strings" "time" "go.elastic.co/apm/module/apmsql/v2" "go.elastic.co/apm/v2" "github.com/jackc/pgx/v4" ) const ( //querySpanType is setting action for query expression trace in APM server. querySpanType = "db.postgresql.query" //querySpanType is setting action for copy expression trace in APM server. copySpanType = "db.postgresql.copy" //querySpanType is setting action for batch expression trace in APM server. batchSpanType = "db.postgresql.batch" ) var ( // ErrUnsupportedPgxVersion is indicating that data doesn't contain value for "time" key. This field appeared in pgx v4.17 ErrUnsupportedPgxVersion = errors.New("this version of pgx is unsupported, please upgrade to v4.17+") // ErrInvalidType is indicating that field type doesn't meet expected one ErrInvalidType = errors.New("invalid field type") ) // tracer struct contains pgx.ConnConfig inside and pgx.Logger implementation. type tracer struct { // cfg is the pgx.ConnConfig which used for setting metadata in spans such as host, port, etc. cfg *pgx.ConnConfig // logger used for writing data to log. If it's nil, then data won't be written to log, and only spans will be created. logger pgx.Logger } // Instrument is getting pgx.ConnConfig and wrap logger into tracer. // It's safe to pass nil logger into pgx.ConnConfig, if so, then only spans will be created func Instrument(cfg *pgx.ConnConfig) { cfg.Logger = &tracer{ cfg: cfg, logger: cfg.Logger, } } // Log is getting type of SQL expression from msg and run suitable trace. // If logger in tracer struct isn't nil, than log will be // written to your logger that implements pgx.Logger interface. func (t *tracer) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { if t.logger != nil { t.logger.Log(ctx, level, msg, data) } switch msg { case "Query", "Exec": t.QueryTrace(ctx, data) case "CopyFrom": t.CopyTrace(ctx, data) case "SendBatch": t.BatchTrace(ctx, data) } } // QueryTrace traces query and creates spans for them. func (t *tracer) QueryTrace(ctx context.Context, data map[string]interface{}) { statement, ok := data["sql"].(string) if !ok { apm.CaptureError(ctx, fmt.Errorf("%w: expect string, got: %T", ErrInvalidType, data["sql"], ), ).Send() return } span, ok := t.startSpan(ctx, apmsql.QuerySignature(statement), querySpanType, statement, data) if !ok { return } defer span.End() } // CopyTrace traces copy queries and creates spans for them. func (t *tracer) CopyTrace(ctx context.Context, data map[string]interface{}) { tableName, ok := data["tableName"].(pgx.Identifier) if !ok { return } var columnNames []string switch data["columnNames"].(type) { case pgx.Identifier: columnNames = data["columnNames"].(pgx.Identifier) case []string: columnNames = data["columnNames"].([]string) default: return } statement := fmt.Sprintf("COPY TO %s(%s)", strings.Join(tableName, ", "), strings.Join(columnNames, ", "), ) span, ok := t.startSpan(ctx, fmt.Sprintf("COPY TO %s", strings.Join(tableName, ", ")), copySpanType, statement, data, ) if !ok { return } defer span.End() } // BatchTrace traces batch execution and creates spans for the whole batch. func (t *tracer) BatchTrace(ctx context.Context, data map[string]interface{}) { span, ok := t.startSpan(ctx, "BATCH", batchSpanType, "", data) if !ok { return } defer span.End() if batchLen, ok := data["batchLen"].(int); ok { span.Context.SetLabel("batch.length", batchLen) } } func (t *tracer) startSpan(ctx context.Context, spanName, spanType, statement string, data map[string]interface{}) (*apm.Span, bool) { stop := time.Now() duration, ok := data["time"].(time.Duration) if !ok { apm.CaptureError(ctx, ErrUnsupportedPgxVersion).Send() return nil, false } span, _ := apm.StartSpanOptions(ctx, spanName, spanType, apm.SpanOptions{ Start: stop.Add(-duration), ExitSpan: true, }) if span.Dropped() { span.End() return nil, false } span.Duration = duration span.Context.SetDatabase(apm.DatabaseSpanContext{ Instance: t.cfg.Database, Statement: statement, Type: "sql", User: t.cfg.User, }) span.Context.SetDestinationAddress(t.cfg.Host, int(t.cfg.Port)) span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Type: "postgresql", Name: t.cfg.Database, }) span.Context.SetDestinationService(apm.DestinationServiceSpanContext{ Name: "postgresql", Resource: "postgresql", }) if err, ok := data["err"].(error); ok { e := apm.CaptureError(ctx, err) e.Timestamp = stop e.SetSpan(span) e.Send() } return span, true }