components/otelopscol/receiver/mongodbreceiver/metrics.go (488 lines of code) (raw):

// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mongodbreceiver // import "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/otelopscol/receiver/mongodbreceiver" import ( "errors" "fmt" "reflect" "github.com/hashicorp/go-version" "go.mongodb.org/mongo-driver/bson" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/scraper/scrapererror" "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/otelopscol/receiver/mongodbreceiver/internal/metadata" ) var errKeyNotFound = errors.New("could not find key for metric") var operationsMap = map[string]metadata.AttributeOperation{ "insert": metadata.AttributeOperationInsert, "queries": metadata.AttributeOperationQuery, "update": metadata.AttributeOperationUpdate, "remove": metadata.AttributeOperationDelete, "getmore": metadata.AttributeOperationGetmore, "commands": metadata.AttributeOperationCommand, } var documentMap = map[string]metadata.AttributeOperation{ "inserted": metadata.AttributeOperationInsert, "updated": metadata.AttributeOperationUpdate, "deleted": metadata.AttributeOperationDelete, } var lockTypeMap = map[string]metadata.AttributeLockType{ "ParallelBatchWriterMode": metadata.AttributeLockTypeParallelBatchWriteMode, "ReplicationStateTransition": metadata.AttributeLockTypeReplicationStateTransition, "Global": metadata.AttributeLockTypeGlobal, "Database": metadata.AttributeLockTypeDatabase, "Collection": metadata.AttributeLockTypeCollection, "Mutex": metadata.AttributeLockTypeMutex, "Metadata": metadata.AttributeLockTypeMetadata, "oplog": metadata.AttributeLockTypeOplog, } var lockModeMap = map[string]metadata.AttributeLockMode{ "R": metadata.AttributeLockModeShared, "W": metadata.AttributeLockModeExclusive, "r": metadata.AttributeLockModeIntentShared, "w": metadata.AttributeLockModeIntentExclusive, } const ( collectMetricError = "failed to collect metric %s: %w" collectMetricWithAttributes = "failed to collect metric %s with attribute(s) %s: %w" ) // DBStats func (s *mongodbScraper) recordCollections(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"collections"} metricName := "mongodb.collection.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbCollectionCountDataPoint(now, val, dbName) } func (s *mongodbScraper) recordDataSize(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"dataSize"} metricName := "mongodb.data.size" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbDataSizeDataPoint(now, val, dbName) } func (s *mongodbScraper) recordStorageSize(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"storageSize"} metricName := "mongodb.storage.size" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbStorageSizeDataPoint(now, val, dbName) } func (s *mongodbScraper) recordObjectCount(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"objects"} metricName := "mongodb.object.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbObjectCountDataPoint(now, val, dbName) } func (s *mongodbScraper) recordIndexCount(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"indexes"} metricName := "mongodb.index.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbIndexCountDataPoint(now, val, dbName) } func (s *mongodbScraper) recordIndexSize(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { metricPath := []string{"indexSize"} metricName := "mongodb.index.size" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbIndexSizeDataPoint(now, val, dbName) } func (s *mongodbScraper) recordExtentCount(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { // Mongo version 4.4+ no longer returns numExtents since it is part of the obsolete MMAPv1 // https://www.mongodb.com/docs/manual/release-notes/4.4-compatibility/#mmapv1-cleanup mongo44, _ := version.NewVersion("4.4") if s.mongoVersion.LessThan(mongo44) { metricPath := []string{"numExtents"} metricName := "mongodb.extent.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, dbName, err)) return } s.mb.RecordMongodbExtentCountDataPoint(now, val, dbName) } } // ServerStatus func (s *mongodbScraper) recordConnections(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { mongo40, _ := version.NewVersion("4.0") for ctVal, ct := range metadata.MapAttributeConnectionType { // Mongo version 4.0 added active // reference: https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.connections.active if s.mongoVersion.LessThan(mongo40) && ctVal == "active" { continue } metricPath := []string{"connections", ctVal} metricName := "mongodb.connection.count" metricAttributes := fmt.Sprintf("%s, %s", ctVal, dbName) val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbConnectionCountDataPoint(now, val, dbName, ct) } } func (s *mongodbScraper) recordMemoryUsage(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { for mtVal, mt := range metadata.MapAttributeMemoryType { metricPath := []string{"mem", mtVal} metricName := "mongodb.memory.usage" metricAttributes := fmt.Sprintf("%s, %s", mtVal, dbName) val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } // convert from mebibytes to bytes memUsageBytes := val * int64(1048576) s.mb.RecordMongodbMemoryUsageDataPoint(now, memUsageBytes, dbName, mt) } } func (s *mongodbScraper) recordDocumentOperations(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { for operationKey, metadataKey := range documentMap { metricPath := []string{"metrics", "document", operationKey} metricName := "mongodb.document.operation.count" metricAttributes := fmt.Sprintf("%s, %s", operationKey, dbName) val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbDocumentOperationCountDataPoint(now, val, dbName, metadataKey) } } func (s *mongodbScraper) recordSessionCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { // Collect session count for version 3.0+ // https://www.mongodb.com/docs/v3.0/reference/command/serverStatus/#serverStatus.wiredTiger.session mongo30, _ := version.NewVersion("3.0") if s.mongoVersion.LessThan(mongo30) { return } storageEngine, err := dig(doc, []string{"storageEngine", "name"}) if err != nil { errs.AddPartial(1, errors.New("failed to find storage engine for session count")) return } if storageEngine != "wiredTiger" { // mongodb is using a different storage engine and this metric can not be collected return } metricPath := []string{"wiredTiger", "session", "open session count"} metricName := "mongodb.session.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricError, metricName, err)) return } s.mb.RecordMongodbSessionCountDataPoint(now, val) } // Admin Stats func (s *mongodbScraper) recordOperations(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { for operationVal, operation := range metadata.MapAttributeOperation { metricPath := []string{"opcounters", operationVal} metricName := "mongodb.operation.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, operationVal, err)) continue } s.mb.RecordMongodbOperationCountDataPoint(now, val, operation) } } func (s *mongodbScraper) recordCacheOperations(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { // Collect Cache Hits & Misses if wiredTiger storage engine is used // WiredTiger.cache metrics are available in 3.0+ // https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.wiredTiger.cache mongo30, _ := version.NewVersion("3.0") if s.mongoVersion.LessThan(mongo30) { return } storageEngine, err := dig(doc, []string{"storageEngine", "name"}) if err != nil { errs.AddPartial(1, errors.New("failed to find storage engine for cache operations")) return } if storageEngine != "wiredTiger" { // mongodb is using a different storage engine and this metric can not be collected return } metricPath := []string{"wiredTiger", "cache", "pages read into cache"} metricName := "mongodb.cache.operations" cacheMissVal, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(2, fmt.Errorf(collectMetricWithAttributes, metricName, "miss, hit", err)) return } s.mb.RecordMongodbCacheOperationsDataPoint(now, cacheMissVal, metadata.AttributeTypeMiss) cacheHitPath := []string{"wiredTiger", "cache", "pages requested from the cache"} cacheHitName := "mongodb.cache.operations" cacheHitVal, err := collectMetric(doc, cacheHitPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, cacheHitName, "hit", err)) return } cacheHits := cacheHitVal - cacheMissVal s.mb.RecordMongodbCacheOperationsDataPoint(now, cacheHits, metadata.AttributeTypeHit) } func (s *mongodbScraper) recordGlobalLockTime(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { metricPath := []string{"globalLock", "totalTime"} metricName := "mongodb.global_lock.time" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricError, metricName, err)) return } heldTimeMilliseconds := val / 1000 s.mb.RecordMongodbGlobalLockTimeDataPoint(now, heldTimeMilliseconds) } func (s *mongodbScraper) recordCursorCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { metricPath := []string{"metrics", "cursor", "open", "total"} metricName := "mongodb.cursor.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricError, metricName, err)) return } s.mb.RecordMongodbCursorCountDataPoint(now, val) } func (s *mongodbScraper) recordCursorTimeoutCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { metricPath := []string{"metrics", "cursor", "timedOut"} metricName := "mongodb.cursor.timeout.count" val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricError, metricName, err)) return } s.mb.RecordMongodbCursorTimeoutCountDataPoint(now, val) } func (s *mongodbScraper) recordNetworkCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { networkRecorderMap := map[string]func(pcommon.Timestamp, int64){ "bytesIn": s.mb.RecordMongodbNetworkIoReceiveDataPoint, "bytesOut": s.mb.RecordMongodbNetworkIoTransmitDataPoint, "numRequests": s.mb.RecordMongodbNetworkRequestCountDataPoint, } for networkKey, recorder := range networkRecorderMap { metricPath := []string{"network", networkKey} val, err := collectMetric(doc, metricPath) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricError, networkKey, err)) continue } recorder(now, val) } } // Lock Metrics are only supported by MongoDB v3.2+ func (s *mongodbScraper) recordLockAcquireCounts(now pcommon.Timestamp, doc bson.M, dBName string, errs *scrapererror.ScrapeErrors) { mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.LessThan(mongo32) { return } mongo42, _ := version.NewVersion("4.2") for lockTypeKey, lockTypeAttribute := range lockTypeMap { for lockModeKey, lockModeAttribute := range lockModeMap { // Continue if the lock type is not supported by current server's MongoDB version if s.mongoVersion.LessThan(mongo42) && (lockTypeKey == "ParallelBatchWriterMode" || lockTypeKey == "ReplicationStateTransition") { continue } metricPath := []string{"locks", lockTypeKey, "acquireCount", lockModeKey} metricName := "mongodb.lock.acquire.count" metricAttributes := fmt.Sprintf("%s, %s, %s", dBName, lockTypeAttribute.String(), lockModeAttribute.String()) val, err := collectMetric(doc, metricPath) // MongoDB only publishes this lock metric is it is available. // Do not raise error when key is not found if errors.Is(err, errKeyNotFound) { continue } if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbLockAcquireCountDataPoint(now, val, dBName, lockTypeAttribute, lockModeAttribute) } } } func (s *mongodbScraper) recordLockAcquireWaitCounts(now pcommon.Timestamp, doc bson.M, dBName string, errs *scrapererror.ScrapeErrors) { mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.LessThan(mongo32) { return } mongo42, _ := version.NewVersion("4.2") for lockTypeKey, lockTypeAttribute := range lockTypeMap { for lockModeKey, lockModeAttribute := range lockModeMap { // Continue if the lock type is not supported by current server's MongoDB version if s.mongoVersion.LessThan(mongo42) && (lockTypeKey == "ParallelBatchWriterMode" || lockTypeKey == "ReplicationStateTransition") { continue } metricPath := []string{"locks", lockTypeKey, "acquireWaitCount", lockModeKey} metricName := "mongodb.lock.acquire.wait_count" metricAttributes := fmt.Sprintf("%s, %s, %s", dBName, lockTypeAttribute.String(), lockModeAttribute.String()) val, err := collectMetric(doc, metricPath) // MongoDB only publishes this lock metric is it is available. // Do not raise error when key is not found if errors.Is(err, errKeyNotFound) { continue } if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbLockAcquireWaitCountDataPoint(now, val, dBName, lockTypeAttribute, lockModeAttribute) } } } func (s *mongodbScraper) recordLockTimeAcquiringMicros(now pcommon.Timestamp, doc bson.M, dBName string, errs *scrapererror.ScrapeErrors) { mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.LessThan(mongo32) { return } mongo42, _ := version.NewVersion("4.2") for lockTypeKey, lockTypeAttribute := range lockTypeMap { for lockModeKey, lockModeAttribute := range lockModeMap { // Continue if the lock type is not supported by current server's MongoDB version if s.mongoVersion.LessThan(mongo42) && (lockTypeKey == "ParallelBatchWriterMode" || lockTypeKey == "ReplicationStateTransition") { continue } metricPath := []string{"locks", lockTypeKey, "timeAcquiringMicros", lockModeKey} metricName := "mongodb.lock.acquire.time" metricAttributes := fmt.Sprintf("%s, %s, %s", dBName, lockTypeAttribute.String(), lockModeAttribute.String()) val, err := collectMetric(doc, metricPath) // MongoDB only publishes this lock metric is it is available. // Do not raise error when key is not found if errors.Is(err, errKeyNotFound) { continue } if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbLockAcquireTimeDataPoint(now, val, dBName, lockTypeAttribute, lockModeAttribute) } } } func (s *mongodbScraper) recordLockDeadlockCount(now pcommon.Timestamp, doc bson.M, dBName string, errs *scrapererror.ScrapeErrors) { mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.LessThan(mongo32) { return } mongo42, _ := version.NewVersion("4.2") for lockTypeKey, lockTypeAttribute := range lockTypeMap { for lockModeKey, lockModeAttribute := range lockModeMap { // Continue if the lock type is not supported by current server's MongoDB version if s.mongoVersion.LessThan(mongo42) && (lockTypeKey == "ParallelBatchWriterMode" || lockTypeKey == "ReplicationStateTransition") { continue } metricPath := []string{"locks", lockTypeKey, "deadlockCount", lockModeKey} metricName := "mongodb.lock.deadlock.count" metricAttributes := fmt.Sprintf("%s, %s, %s", dBName, lockTypeAttribute.String(), lockModeAttribute.String()) val, err := collectMetric(doc, metricPath) // MongoDB only publishes this lock metric is it is available. // Do not raise error when key is not found if errors.Is(err, errKeyNotFound) { continue } if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) continue } s.mb.RecordMongodbLockDeadlockCountDataPoint(now, val, dBName, lockTypeAttribute, lockModeAttribute) } } } // Index Stats func (s *mongodbScraper) recordIndexAccess(now pcommon.Timestamp, documents []bson.M, dbName string, collectionName string, errs *scrapererror.ScrapeErrors) { // Collect the index access given a collection and database if version is >= 3.2 // https://www.mongodb.com/docs/v3.2/reference/operator/aggregation/indexStats/ mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.GreaterThanOrEqual(mongo32) { metricName := "mongodb.index.access.count" var indexAccessTotal int64 for _, doc := range documents { metricAttributes := fmt.Sprintf("%s, %s", dbName, collectionName) indexAccess, ok := doc["accesses"].(bson.M)["ops"] if !ok { err := errors.New("could not find key for index access metric") errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) return } indexAccessValue, err := parseInt(indexAccess) if err != nil { errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err)) return } indexAccessTotal += indexAccessValue } s.mb.RecordMongodbIndexAccessCountDataPoint(now, indexAccessTotal, dbName, collectionName) } } // Top Stats func (s *mongodbScraper) recordOperationTime(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) { metricName := "mongodb.operation.time" collectionPathNames, err := digForCollectionPathNames(doc) if err != nil { errs.AddPartial(len(operationsMap), fmt.Errorf(collectMetricError, metricName, err)) return } operationTimeValues, err := aggregateOperationTimeValues(doc, collectionPathNames, operationsMap) if err != nil { errs.AddPartial(len(operationsMap), fmt.Errorf(collectMetricError, metricName, err)) return } for operationName, metadataOperationName := range operationsMap { operationValue, ok := operationTimeValues[operationName] if !ok { err := errors.New("could not find key for operation name") errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, operationName, err)) continue } s.mb.RecordMongodbOperationTimeDataPoint(now, operationValue, metadataOperationName) } } func aggregateOperationTimeValues(document bson.M, collectionPathNames []string, operationMap map[string]metadata.AttributeOperation) (map[string]int64, error) { operationTotals := map[string]int64{} for _, collectionPathName := range collectionPathNames { for operationName := range operationMap { value, err := getOperationTimeValues(document, collectionPathName, operationName) if err != nil { return nil, err } operationTotals[operationName] += value } } return operationTotals, nil } func getOperationTimeValues(document bson.M, collectionPathName, operation string) (int64, error) { rawValue, err := dig(document, []string{"totals", collectionPathName, operation, "time"}) if err != nil { return 0, err } return parseInt(rawValue) } func digForCollectionPathNames(document bson.M) ([]string, error) { docTotals, ok := document["totals"].(bson.M) if !ok { return nil, errKeyNotFound } var collectionPathNames []string for collectionPathName := range docTotals { if collectionPathName != "note" { collectionPathNames = append(collectionPathNames, collectionPathName) } } return collectionPathNames, nil } func collectMetric(document bson.M, path []string) (int64, error) { metric, err := dig(document, path) if err != nil { return 0, err } return parseInt(metric) } func dig(document bson.M, path []string) (interface{}, error) { curItem, remainingPath := path[0], path[1:] value := document[curItem] if value == nil { return 0, errKeyNotFound } if len(remainingPath) == 0 { return value, nil } return dig(value.(bson.M), remainingPath) } func parseInt(val interface{}) (int64, error) { switch v := val.(type) { case int: return int64(v), nil case int32: return int64(v), nil case int64: return v, nil case float64: return int64(v), nil default: return 0, fmt.Errorf("could not parse value as int: %v", reflect.TypeOf(val)) } }