x-pack/metricbeat/module/gcp/metrics/dataproc/metadata.go (190 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package dataproc import ( "context" "errors" "fmt" "strings" "sync" "time" compute "cloud.google.com/go/compute/apiv1" "cloud.google.com/go/compute/apiv1/computepb" monitoringpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "google.golang.org/api/dataproc/v1" "google.golang.org/api/iterator" "google.golang.org/api/option" "github.com/elastic/beats/v7/x-pack/metricbeat/module/gcp" "github.com/elastic/elastic-agent-libs/logp" ) // NewMetadataService returns the specific Metadata service for a GCP Dataproc cluster func NewMetadataService(projectID string, regions []string, organizationID, organizationName string, projectName string, collectUserLabels bool, opt ...option.ClientOption) (gcp.MetadataService, error) { return &metadataCollector{ projectID: projectID, projectName: projectName, organizationID: organizationID, organizationName: organizationName, regions: regions, collectUserLabels: collectUserLabels, opt: opt, clusters: make(map[string]*dataproc.Cluster), logger: logp.NewLogger("metrics-dataproc"), }, nil } // dataprocMetadata is an object to store data in between the extraction and the writing in the destination (to uncouple // reading and writing in the same method) type dataprocMetadata struct { region string clusterID string clusterName string machineType string User map[string]string Metadata map[string]string Metrics interface{} System interface{} } type metadataCollector struct { projectID string projectName string organizationID string organizationName string regions []string collectUserLabels bool opt []option.ClientOption // NOTE: clusters holds data used for all metrics collected in a given period // this avoids calling the remote endpoint for each metric, which would take a long time overall clusters map[string]*dataproc.Cluster logger *logp.Logger } // Metadata implements googlecloud.MetadataCollector to the known set of labels from a Dataproc TimeSeries single point of data. func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.TimeSeries) (gcp.MetadataCollectorData, error) { stackdriverLabels := gcp.NewStackdriverMetadataServiceForTimeSeries(resp, s.organizationID, s.organizationName, s.projectName) metadataCollectorData, err := stackdriverLabels.Metadata(ctx, resp) if err != nil { return gcp.MetadataCollectorData{}, err } if resp.Resource != nil && resp.Resource.Labels != nil { _, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, resp.Resource.Labels["cluster_uuid"]) } _, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, resp.Resource.Labels["cluster_name"]) if s.collectUserLabels { metadata, err := s.instanceMetadata(ctx, s.instanceID(resp), s.instanceRegion(resp)) if err != nil { return metadataCollectorData, err } if metadata.machineType != "" { lastIndex := strings.LastIndex(metadata.machineType, "/") _, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudMachineTypeKey, metadata.machineType[lastIndex+1:]) } metadata.Metrics = metadataCollectorData.Labels[gcp.LabelMetrics] metadata.System = metadataCollectorData.Labels[gcp.LabelSystem] if metadata.User != nil { metadataCollectorData.Labels[gcp.LabelUser] = metadata.User } } return metadataCollectorData, nil } // instanceMetadata returns the labels of an instance func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, region string) (*dataprocMetadata, error) { cluster, err := s.instance(ctx, instanceID) if err != nil { return nil, fmt.Errorf("error trying to get data from instance '%s' %w", instanceID, err) } metadata := &dataprocMetadata{ clusterID: instanceID, region: region, } if cluster == nil { s.logger.Debugf("couldn't get instance '%s' call ListInstances API", instanceID) return metadata, nil } if cluster.ClusterName != "" { parts := strings.Split(cluster.ClusterName, "/") metadata.clusterName = parts[len(parts)-1] } if cluster.Labels != nil { metadata.User = cluster.Labels } return metadata, nil } // instance returns data from an instance ID from the cache. func (s *metadataCollector) instance(ctx context.Context, instanceID string) (*dataproc.Cluster, error) { s.getInstances(ctx) instance, ok := s.clusters[instanceID] if ok { return instance, nil } s.clusters = make(map[string]*dataproc.Cluster) return nil, nil } func (s *metadataCollector) instanceID(ts *monitoringpb.TimeSeries) string { if ts.Resource != nil && ts.Resource.Labels != nil { return ts.Resource.Labels["cluster_uuid"] } return "" } func (s *metadataCollector) instanceRegion(ts *monitoringpb.TimeSeries) string { if ts.Resource != nil && ts.Resource.Labels != nil { return ts.Resource.Labels["region"] } return "" } func (s *metadataCollector) getInstances(ctx context.Context) { if len(s.clusters) > 0 { return } var wg sync.WaitGroup var mu sync.Mutex regionsToQuery := s.regions if len(regionsToQuery) == 0 { regions, err := s.fetchAvailableRegions(ctx) if err != nil { s.logger.Errorf("error fetching available regions: %v", err) return } regionsToQuery = regions } dataprocService, err := dataproc.NewService(ctx, s.opt...) if err != nil { s.logger.Errorf("error creating dataproc service %v", err) return } clustersService := dataproc.NewProjectsRegionsClustersService(dataprocService) s.logger.Debugf("querying dataproc clusters across %d regions: %v", len(regionsToQuery), regionsToQuery) queryCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() for _, region := range regionsToQuery { wg.Add(1) go func(region string) { defer wg.Done() listCall := clustersService.List(s.projectID, region).Fields("clusters.labels", "clusters.clusterUuid").Context(queryCtx) resp, err := listCall.Do() if err != nil { s.logger.Errorf("dataproc ListClusters error in region %s: %v", region, err) return } for _, cluster := range resp.Clusters { mu.Lock() s.clusters[cluster.ClusterUuid] = cluster mu.Unlock() } }(region) } wg.Wait() s.logger.Debugf("completed fetching dataproc clusters, found %d clusters", len(s.clusters)) } // fetchAvailableRegions gets all available GCP regions func (s *metadataCollector) fetchAvailableRegions(ctx context.Context) ([]string, error) { restClient, err := compute.NewRegionsRESTClient(ctx, s.opt...) if err != nil { return nil, fmt.Errorf("error getting client from compute regions service: %w", err) } defer restClient.Close() regionsIt := restClient.List(ctx, &computepb.ListRegionsRequest{ Project: s.projectID, }) var regions []string for { region, err := regionsIt.Next() if errors.Is(err, iterator.Done) { break } if err != nil { return nil, fmt.Errorf("error getting next region from regions iterator: %w", err) } // Only include regions that are UP if region.GetStatus() == "UP" { regions = append(regions, region.GetName()) } } s.logger.Debugf("found %d available regions: %v", len(regions), regions) return regions, nil }