x-pack/metricbeat/module/azure/monitor/client_helper_concurrent.go (65 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package monitor
import (
"fmt"
"sync"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/azure"
)
// concurrentMapMetrics fetches concurrently metric definitions and writes them in MetricDefinitionsChan channel
func concurrentMapMetrics(client *azure.BatchClient, resources []*armresources.GenericResourceExpanded, resourceConfig azure.ResourceConfig, wg *sync.WaitGroup) {
go func() {
defer wg.Done()
for _, resource := range resources {
res, err := getMappedResourceDefinitions(client, *resource.ID, *resource.Location, client.Config.SubscriptionId, resourceConfig)
if err != nil {
client.ResourceConfigurations.ErrorChan <- err // Send error and stop processing
return
}
client.ResourceConfigurations.MetricDefinitionsChan <- res
}
}()
}
// getMappedResourceDefinitions fetches metric definitions and maps the metric related configuration to relevant azure monitor api parameters
func getMappedResourceDefinitions(client *azure.BatchClient, resourceId string, location string, subscriptionId string, resourceConfig azure.ResourceConfig) ([]azure.Metric, error) {
var metrics []azure.Metric
// We use this map to avoid calling the metrics definition function for the same namespace and same resource
// multiple times.
namespaceMetrics := make(map[string]armmonitor.MetricDefinitionCollection)
for _, metric := range resourceConfig.Metrics {
var err error
metricDefinitions, exists := namespaceMetrics[metric.Namespace]
if !exists {
metricDefinitions, err = client.AzureMonitorService.GetMetricDefinitionsWithRetry(resourceId, metric.Namespace)
if err != nil {
return nil, err
}
namespaceMetrics[metric.Namespace] = metricDefinitions
}
if len(metricDefinitions.Value) == 0 {
if metric.IgnoreUnsupported {
client.Log.Infof(missingMetricDefinitions, resourceId, metric.Namespace)
continue
}
return nil, fmt.Errorf(missingMetricDefinitions, resourceId, metric.Namespace)
}
// validate metric names and filter on the supported metrics
supportedMetricNames, err := filterMetricNames(resourceId, metric, metricDefinitions.Value)
if err != nil {
return nil, err
}
//validate aggregations and filter on supported aggregations
metricGroups, err := filterOnSupportedAggregations(supportedMetricNames, metric, metricDefinitions.Value)
if err != nil {
return nil, err
}
// map dimensions
var dim []azure.Dimension
if len(metric.Dimensions) > 0 {
for _, dimension := range metric.Dimensions {
dim = append(dim, azure.Dimension(dimension))
}
}
for key, metricGroup := range metricGroups {
var metricNames []string
for _, metricName := range metricGroup {
metricNames = append(metricNames, *metricName.Name.Value)
}
metrics = append(metrics, client.CreateMetric(resourceId, "", metric.Namespace, location, subscriptionId, metricNames, key, dim, metric.Timegrain))
}
}
return metrics, nil
}