components/otelopscol/receiver/mongodbreceiver/scraper.go (160 lines of code) (raw):

// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mongodbreceiver // import "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/otelopscol/receiver/mongodbreceiver" import ( "context" "errors" "fmt" "time" "github.com/hashicorp/go-version" "go.mongodb.org/mongo-driver/bson" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/scraper/scrapererror" "go.uber.org/zap" "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/components/otelopscol/receiver/mongodbreceiver/internal/metadata" ) type mongodbScraper struct { logger *zap.Logger config *Config client client mongoVersion *version.Version mb *metadata.MetricsBuilder } func newMongodbScraper(settings receiver.Settings, config *Config) *mongodbScraper { mbConfig := metadata.DefaultMetricsBuilderConfig() mbConfig.Metrics = config.Metrics return &mongodbScraper{ logger: settings.Logger, config: config, mb: metadata.NewMetricsBuilder(mbConfig, settings), } } func (s *mongodbScraper) start(ctx context.Context, _ component.Host) error { c, err := NewClient(ctx, s.config, s.logger) if err != nil { return fmt.Errorf("create mongo client: %w", err) } s.client = c return nil } func (s *mongodbScraper) shutdown(ctx context.Context) error { if s.client != nil { return s.client.Disconnect(ctx) } return nil } func (s *mongodbScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { if s.client == nil { return pmetric.NewMetrics(), errors.New("no client was initialized before calling scrape") } if s.mongoVersion == nil { version, err := s.client.GetVersion(ctx) if err != nil { return pmetric.NewMetrics(), fmt.Errorf("unable to determine version of mongo scraping against: %w", err) } s.mongoVersion = version } errs := &scrapererror.ScrapeErrors{} s.collectMetrics(ctx, errs) return s.mb.Emit(), errs.Combine() } func (s *mongodbScraper) collectMetrics(ctx context.Context, errs *scrapererror.ScrapeErrors) { dbNames, err := s.client.ListDatabaseNames(ctx, bson.D{}) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch database names: %w", err)) return } now := pcommon.NewTimestampFromTime(time.Now()) s.mb.RecordMongodbDatabaseCountDataPoint(now, int64(len(dbNames))) s.collectAdminDatabase(ctx, now, errs) s.collectTopStats(ctx, now, errs) for _, dbName := range dbNames { s.collectDatabase(ctx, now, dbName, errs) collectionNames, err := s.client.ListCollectionNames(ctx, dbName) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch collection names: %w", err)) return } // The indexStats aggregation is only available if version is >= 3.2 // https://www.mongodb.com/docs/v3.2/reference/operator/aggregation/indexStats/ mongo32, _ := version.NewVersion("3.2") if s.mongoVersion.GreaterThanOrEqual(mongo32) { for _, collectionName := range collectionNames { s.collectIndexStats(ctx, now, dbName, collectionName, errs) } } } } func (s *mongodbScraper) collectDatabase(ctx context.Context, now pcommon.Timestamp, databaseName string, errs *scrapererror.ScrapeErrors) { dbStats, err := s.client.DBStats(ctx, databaseName) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch database stats metrics: %w", err)) } else { s.recordDBStats(now, dbStats, databaseName, errs) } serverStatus, err := s.client.ServerStatus(ctx, databaseName) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch server status metrics: %w", err)) return } s.recordNormalServerStats(now, serverStatus, databaseName, errs) rb := s.mb.NewResourceBuilder() rb.SetDatabase(databaseName) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } func (s *mongodbScraper) collectAdminDatabase(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { serverStatus, err := s.client.ServerStatus(ctx, "admin") if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch admin server status metrics: %w", err)) return } s.recordAdminStats(now, serverStatus, errs) s.mb.EmitForResource() } func (s *mongodbScraper) collectTopStats(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { topStats, err := s.client.TopStats(ctx) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch top stats metrics: %w", err)) return } s.recordOperationTime(now, topStats, errs) s.mb.EmitForResource() } func (s *mongodbScraper) collectIndexStats(ctx context.Context, now pcommon.Timestamp, databaseName string, collectionName string, errs *scrapererror.ScrapeErrors) { indexStats, err := s.client.IndexStats(ctx, databaseName, collectionName) if err != nil { errs.AddPartial(1, fmt.Errorf("failed to fetch index stats metrics: %w", err)) return } s.recordIndexStats(now, indexStats, databaseName, collectionName, errs) s.mb.EmitForResource() } func (s *mongodbScraper) recordDBStats(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { s.recordCollections(now, doc, dbName, errs) s.recordDataSize(now, doc, dbName, errs) s.recordExtentCount(now, doc, dbName, errs) s.recordIndexSize(now, doc, dbName, errs) s.recordIndexCount(now, doc, dbName, errs) s.recordObjectCount(now, doc, dbName, errs) s.recordStorageSize(now, doc, dbName, errs) } func (s *mongodbScraper) recordNormalServerStats(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) { s.recordConnections(now, doc, dbName, errs) s.recordDocumentOperations(now, doc, dbName, errs) s.recordMemoryUsage(now, doc, dbName, errs) s.recordLockAcquireCounts(now, doc, dbName, errs) s.recordLockAcquireWaitCounts(now, doc, dbName, errs) s.recordLockTimeAcquiringMicros(now, doc, dbName, errs) s.recordLockDeadlockCount(now, doc, dbName, errs) } func (s *mongodbScraper) recordAdminStats(now pcommon.Timestamp, document bson.M, errs *scrapererror.ScrapeErrors) { s.recordCacheOperations(now, document, errs) s.recordCursorCount(now, document, errs) s.recordCursorTimeoutCount(now, document, errs) s.recordGlobalLockTime(now, document, errs) s.recordNetworkCount(now, document, errs) s.recordOperations(now, document, errs) s.recordSessionCount(now, document, errs) } func (s *mongodbScraper) recordIndexStats(now pcommon.Timestamp, indexStats []bson.M, databaseName string, collectionName string, errs *scrapererror.ScrapeErrors) { s.recordIndexAccess(now, indexStats, databaseName, collectionName, errs) }