module/apmmongo/monitor.go (144 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmmongo // import "go.elastic.co/apm/module/apmmongo/v2"
import (
"context"
"reflect"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/bson/bsonrw"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo"
"go.elastic.co/apm/v2"
)
var (
extjPool = bsonrw.NewExtJSONValueWriterPool()
swPool = sync.Pool{
New: func() interface{} {
return &bsonrw.SliceWriter{}
},
}
)
func init() {
apm.RegisterTypeErrorDetailer(
reflect.TypeOf(mongo.CommandError{}),
apm.ErrorDetailerFunc(func(err error, details *apm.ErrorDetails) {
commandErr := err.(mongo.CommandError)
details.Code.String = commandErr.Name
if len(commandErr.Labels) > 0 {
details.SetAttr("labels", commandErr.Labels)
}
}),
)
}
// CommandMonitor returns a new event.CommandMonitor which will report a span
// for each command executed within a context containing a sampled transaction.
func CommandMonitor(opts ...Option) *event.CommandMonitor {
cm := commandMonitor{
bsonRegistry: bson.DefaultRegistry,
spans: make(map[commandKey]*apm.Span),
}
for _, o := range opts {
o(&cm)
}
return &event.CommandMonitor{
Started: cm.started,
Succeeded: cm.succeeded,
Failed: cm.failed,
}
}
type commandMonitor struct {
// TODO(axw) record number of active commands and report as a
// metric so users can, for example, identify unclosed cursors.
bsonRegistry *bsoncodec.Registry
mu sync.Mutex
spans map[commandKey]*apm.Span
}
type commandKey struct {
connectionID string
requestID int64
}
func (c *commandMonitor) started(ctx context.Context, event *event.CommandStartedEvent) {
spanName := event.CommandName
if collectionName, ok := collectionName(event.CommandName, event.Command); ok {
spanName = collectionName + "." + spanName
}
span, _ := apm.StartSpanOptions(ctx, spanName, "db.mongodb.query", apm.SpanOptions{
ExitSpan: true,
})
if span.Dropped() {
return
}
var statement string
if len(event.Command) > 0 {
// Encode the command as MongoDB Extended JSON
// for the "statement" in database span context.
sw := swPool.Get().(*bsonrw.SliceWriter)
ejvw := extjPool.Get(sw, false /* non-canonical */, false /* don't escape HTML */)
ec := bsoncodec.EncodeContext{Registry: c.bsonRegistry}
if enc, err := bson.NewEncoderWithContext(ec, ejvw); err == nil {
if err := enc.Encode(event.Command); err == nil {
statement = string(*sw)
}
}
*sw = (*sw)[:0]
extjPool.Put(ejvw)
swPool.Put(sw)
}
span.Context.SetDatabase(apm.DatabaseSpanContext{
Instance: event.DatabaseName,
Type: "mongodb",
Statement: statement,
})
// The command/event monitoring API does not provide a means of associating
// arbitrary data with a request, so we must maintain our own map.
//
// https://jira.mongodb.org/browse/GODRIVER-837
key := commandKey{connectionID: event.ConnectionID, requestID: event.RequestID}
c.mu.Lock()
c.spans[key] = span
c.mu.Unlock()
}
func (c *commandMonitor) succeeded(ctx context.Context, event *event.CommandSucceededEvent) {
c.finished(ctx, &event.CommandFinishedEvent)
}
func (c *commandMonitor) failed(ctx context.Context, event *event.CommandFailedEvent) {
c.finished(ctx, &event.CommandFinishedEvent)
}
func (c *commandMonitor) finished(ctx context.Context, event *event.CommandFinishedEvent) {
key := commandKey{connectionID: event.ConnectionID, requestID: event.RequestID}
c.mu.Lock()
span, ok := c.spans[key]
if !ok {
c.mu.Unlock()
return
}
delete(c.spans, key)
c.mu.Unlock()
span.Duration = time.Duration(event.DurationNanos)
span.End()
}
func collectionName(commandName string, command bson.Raw) (string, bool) {
switch commandName {
case
// Aggregation Commands
"aggregate",
"count",
"distinct",
"mapReduce",
// Geospatial Commands
"geoNear",
"geoSearch",
// Query and Write Operation Commands
"delete",
"find",
"findAndModify",
"insert",
"parallelCollectionScan",
"update",
// Administration Commands
"compact",
"convertToCapped",
"create",
"createIndexes",
"drop",
"dropIndexes",
"killCursors",
"listIndexes",
"reIndex",
// Diagnostic Commands
"collStats":
collectionValue := command.Lookup(commandName)
return collectionValue.StringValueOK()
case "getMore":
collectionValue := command.Lookup("collection")
return collectionValue.StringValueOK()
}
return "", false
}
// Option sets options for tracing MongoDB commands.
type Option func(*commandMonitor)