receiver/mongodbatlasreceiver/receiver.go (301 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver" import ( "context" "fmt" "strconv" "strings" "time" "go.mongodb.org/atlas/mongodbatlas" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/scraper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/metadata" ) type mongodbatlasreceiver struct { log *zap.Logger cfg *Config client *internal.MongoDBAtlasClient lastRun time.Time mb *metadata.MetricsBuilder stopperChan chan struct{} } type timeconstraints struct { start string end string resolution string } func newMongoDBAtlasReceiver(settings receiver.Settings, cfg *Config) *mongodbatlasreceiver { client := internal.NewMongoDBAtlasClient(cfg.PublicKey, string(cfg.PrivateKey), cfg.BackOffConfig, settings.Logger) for _, p := range cfg.Projects { p.populateIncludesAndExcludes() } return &mongodbatlasreceiver{ log: settings.Logger, cfg: cfg, client: client, mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, settings), stopperChan: make(chan struct{}), } } func newMongoDBAtlasScraper(recv *mongodbatlasreceiver) (scraper.Metrics, error) { return scraper.NewMetrics(recv.scrape, scraper.WithShutdown(recv.shutdown)) } func (s *mongodbatlasreceiver) scrape(ctx context.Context) (pmetric.Metrics, error) { now := time.Now() if err := s.poll(ctx, s.timeConstraints(now)); err != nil { return pmetric.Metrics{}, err } s.lastRun = now return s.mb.Emit(), nil } func (s *mongodbatlasreceiver) timeConstraints(now time.Time) timeconstraints { var start time.Time if s.lastRun.IsZero() { start = now.Add(s.cfg.CollectionInterval * -1) } else { start = s.lastRun } return timeconstraints{ start.UTC().Format(time.RFC3339), now.UTC().Format(time.RFC3339), s.cfg.Granularity, } } func (s *mongodbatlasreceiver) shutdown(context.Context) error { return s.client.Shutdown() } // poll decides whether to poll all projects or a specific project based on the configuration. func (s *mongodbatlasreceiver) poll(ctx context.Context, time timeconstraints) error { if len(s.cfg.Projects) == 0 { return s.pollAllProjects(ctx, time) } return s.pollProjects(ctx, time) } // pollAllProjects handles polling across all projects within the organizations. func (s *mongodbatlasreceiver) pollAllProjects(ctx context.Context, time timeconstraints) error { orgs, err := s.client.Organizations(ctx) if err != nil { return fmt.Errorf("error retrieving organizations: %w", err) } for _, org := range orgs { proj, err := s.client.Projects(ctx, org.ID) if err != nil { s.log.Error("error retrieving projects", zap.String("orgID", org.ID), zap.Error(err)) continue } for _, project := range proj { // Since there is no specific ProjectConfig for these projects, pass nil. if err := s.processProject(ctx, time, org.Name, project, nil); err != nil { s.log.Error("error processing project", zap.String("projectID", project.ID), zap.Error(err)) } } } return nil } // pollProject handles polling for specific projects as configured. func (s *mongodbatlasreceiver) pollProjects(ctx context.Context, time timeconstraints) error { for _, projectCfg := range s.cfg.Projects { project, err := s.client.GetProject(ctx, projectCfg.Name) if err != nil { s.log.Error("error retrieving project", zap.String("projectName", projectCfg.Name), zap.Error(err)) continue } org, err := s.client.GetOrganization(ctx, project.OrgID) if err != nil { s.log.Error("error retrieving organization from project", zap.String("projectName", projectCfg.Name), zap.Error(err)) continue } if err := s.processProject(ctx, time, org.Name, project, projectCfg); err != nil { s.log.Error("error processing project", zap.String("projectID", project.ID), zap.Error(err)) } } return nil } func (s *mongodbatlasreceiver) processProject(ctx context.Context, time timeconstraints, orgName string, project *mongodbatlas.Project, projectCfg *ProjectConfig) error { nodeClusterMap, providerMap, err := s.getNodeClusterNameMap(ctx, project.ID) if err != nil { return fmt.Errorf("error collecting clusters from project %s: %w", project.ID, err) } processes, err := s.client.Processes(ctx, project.ID) if err != nil { return fmt.Errorf("error retrieving MongoDB Atlas processes for project %s: %w", project.ID, err) } for _, process := range processes { clusterName := nodeClusterMap[process.UserAlias] providerValues := providerMap[clusterName] if !shouldProcessCluster(projectCfg, clusterName) { // Skip processing for this cluster continue } if err := s.extractProcessMetrics(ctx, time, orgName, project, process, clusterName, providerValues); err != nil { return fmt.Errorf("error when polling process metrics from MongoDB Atlas for process %s: %w", process.ID, err) } if err := s.extractProcessDatabaseMetrics(ctx, time, orgName, project, process, clusterName, providerValues); err != nil { return fmt.Errorf("error when polling process database metrics from MongoDB Atlas for process %s: %w", process.ID, err) } if err := s.extractProcessDiskMetrics(ctx, time, orgName, project, process, clusterName, providerValues); err != nil { return fmt.Errorf("error when polling process disk metrics from MongoDB Atlas for process %s: %w", process.ID, err) } } return nil } // shouldProcessCluster checks whether a given cluster should be processed based on the project configuration. func shouldProcessCluster(projectCfg *ProjectConfig, clusterName string) bool { if projectCfg == nil { // If there is no project config, process all clusters. return true } _, isIncluded := projectCfg.includesByClusterName[clusterName] _, isExcluded := projectCfg.excludesByClusterName[clusterName] // Return false immediately if the cluster is excluded. if isExcluded { return false } // If IncludeClusters is empty, or the cluster is explicitly included, return true. return len(projectCfg.IncludeClusters) == 0 || isIncluded } type providerValues struct { RegionName string ProviderName string } func (s *mongodbatlasreceiver) getNodeClusterNameMap( ctx context.Context, projectID string, ) (map[string]string, map[string]providerValues, error) { providerMap := make(map[string]providerValues) clusterMap := make(map[string]string) clusters, err := s.client.GetClusters(ctx, projectID) if err != nil { return nil, nil, err } for _, cluster := range clusters { // URI in the form mongodb://host1.mongodb.net:27017,host2.mongodb.net:27017,host3.mongodb.net:27017 nodes := strings.Split(strings.TrimPrefix(cluster.MongoURI, "mongodb://"), ",") for _, node := range nodes { // Remove the port from the node n, _, _ := strings.Cut(node, ":") clusterMap[n] = cluster.Name } providerMap[cluster.Name] = providerValues{ RegionName: cluster.ProviderSettings.RegionName, ProviderName: cluster.ProviderSettings.ProviderName, } } return clusterMap, providerMap, nil } func (s *mongodbatlasreceiver) extractProcessMetrics( ctx context.Context, time timeconstraints, orgName string, project *mongodbatlas.Project, process *mongodbatlas.Process, clusterName string, providerValues providerValues, ) error { if err := s.client.ProcessMetrics( ctx, s.mb, project.ID, process.Hostname, process.Port, time.start, time.end, time.resolution, ); err != nil { return fmt.Errorf("error when polling process metrics from MongoDB Atlas: %w", err) } rb := s.mb.NewResourceBuilder() rb.SetMongodbAtlasOrgName(orgName) rb.SetMongodbAtlasProjectName(project.Name) rb.SetMongodbAtlasProjectID(project.ID) rb.SetMongodbAtlasHostName(process.Hostname) rb.SetMongodbAtlasUserAlias(process.UserAlias) rb.SetMongodbAtlasClusterName(clusterName) rb.SetMongodbAtlasProcessPort(strconv.Itoa(process.Port)) rb.SetMongodbAtlasProcessTypeName(process.TypeName) rb.SetMongodbAtlasProcessID(process.ID) rb.SetMongodbAtlasRegionName(providerValues.RegionName) rb.SetMongodbAtlasProviderName(providerValues.ProviderName) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) return nil } func (s *mongodbatlasreceiver) extractProcessDatabaseMetrics( ctx context.Context, time timeconstraints, orgName string, project *mongodbatlas.Project, process *mongodbatlas.Process, clusterName string, providerValues providerValues, ) error { processDatabases, err := s.client.ProcessDatabases( ctx, project.ID, process.Hostname, process.Port, ) if err != nil { return fmt.Errorf("error retrieving process databases: %w", err) } for _, db := range processDatabases { if err := s.client.ProcessDatabaseMetrics( ctx, s.mb, project.ID, process.Hostname, process.Port, db.DatabaseName, time.start, time.end, time.resolution, ); err != nil { return fmt.Errorf("error when polling database metrics from MongoDB Atlas: %w", err) } rb := s.mb.NewResourceBuilder() rb.SetMongodbAtlasOrgName(orgName) rb.SetMongodbAtlasProjectName(project.Name) rb.SetMongodbAtlasProjectID(project.ID) rb.SetMongodbAtlasHostName(process.Hostname) rb.SetMongodbAtlasUserAlias(process.UserAlias) rb.SetMongodbAtlasClusterName(clusterName) rb.SetMongodbAtlasProcessPort(strconv.Itoa(process.Port)) rb.SetMongodbAtlasProcessTypeName(process.TypeName) rb.SetMongodbAtlasProcessID(process.ID) rb.SetMongodbAtlasDbName(db.DatabaseName) rb.SetMongodbAtlasRegionName(providerValues.RegionName) rb.SetMongodbAtlasProviderName(providerValues.ProviderName) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } return nil } func (s *mongodbatlasreceiver) extractProcessDiskMetrics( ctx context.Context, time timeconstraints, orgName string, project *mongodbatlas.Project, process *mongodbatlas.Process, clusterName string, providerValues providerValues, ) error { for _, disk := range s.client.ProcessDisks(ctx, project.ID, process.Hostname, process.Port) { if err := s.client.ProcessDiskMetrics( ctx, s.mb, project.ID, process.Hostname, process.Port, disk.PartitionName, time.start, time.end, time.resolution, ); err != nil { return fmt.Errorf("error when polling disk metrics from MongoDB Atlas: %w", err) } rb := s.mb.NewResourceBuilder() rb.SetMongodbAtlasOrgName(orgName) rb.SetMongodbAtlasProjectName(project.Name) rb.SetMongodbAtlasProjectID(project.ID) rb.SetMongodbAtlasHostName(process.Hostname) rb.SetMongodbAtlasUserAlias(process.UserAlias) rb.SetMongodbAtlasClusterName(clusterName) rb.SetMongodbAtlasProcessPort(strconv.Itoa(process.Port)) rb.SetMongodbAtlasProcessTypeName(process.TypeName) rb.SetMongodbAtlasProcessID(process.ID) rb.SetMongodbAtlasDiskPartition(disk.PartitionName) rb.SetMongodbAtlasRegionName(providerValues.RegionName) rb.SetMongodbAtlasProviderName(providerValues.ProviderName) s.mb.EmitForResource(metadata.WithResource(rb.Emit())) } return nil }