x-pack/metricbeat/module/azure/storage/client_helper_concurrent.go (63 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package storage
import (
"fmt"
"sync"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/azure"
)
// concurrentMapMetrics fetches concurrently metric definitions and writes them in MetricDefinitionsChan channel
func concurrentMapMetrics(client *azure.BatchClient, resources []*armresources.GenericResourceExpanded, resourceConfig azure.ResourceConfig, wg *sync.WaitGroup) {
// list all storage account namespaces for this metricset
namespaces := []string{defaultStorageAccountNamespace}
// if serviceType is configured, add only the selected serviceType namespaces
if len(resourceConfig.ServiceType) > 0 {
for _, selectedServiceNamespace := range resourceConfig.ServiceType {
namespaces = append(namespaces, fmt.Sprintf("%s/%s%s", defaultStorageAccountNamespace, selectedServiceNamespace, serviceTypeNamespaceExtension))
}
} else {
for _, service := range storageServiceNamespaces {
namespaces = append(namespaces, fmt.Sprintf("%s%s", defaultStorageAccountNamespace, service))
}
}
go func() {
defer wg.Done()
for _, resource := range resources {
res, err := getStorageMappedResourceDefinitions(client, *resource.ID, *resource.Location, client.Config.SubscriptionId, namespaces)
if err != nil {
client.ResourceConfigurations.ErrorChan <- err // Send error and stop processing
return
}
client.ResourceConfigurations.MetricDefinitionsChan <- res
}
}()
}
// getStorageMappedResourceDefinitions fetches metric definitions and maps the metric related configuration to relevant azure monitor api parameters
func getStorageMappedResourceDefinitions(client *azure.BatchClient, resourceId string, location string, subscriptionId string, namespaces []string) ([]azure.Metric, error) {
var metrics []azure.Metric
for _, namespace := range namespaces {
// resourceID will be different for a serviceType namespace, format will be resourceID/service/default
var resourceID = resourceId
if i := retrieveServiceNamespace(namespace); i != "" {
resourceID += i + resourceIDExtension
}
// get all metric definitions supported by the namespace provided
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitionsWithRetry(resourceID, namespace)
if err != nil {
return nil, err
}
if len(metricDefinitions.Value) == 0 {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s", resourceID, namespace)
}
var filteredMetricDefinitions []armmonitor.MetricDefinition
for _, metricDefinition := range metricDefinitions.Value {
filteredMetricDefinitions = append(filteredMetricDefinitions, *metricDefinition)
}
// some metrics do not support the default PT5M timegrain so they will have to be grouped in a different API call, else call will fail
groupedMetrics := groupOnTimeGrain(filteredMetricDefinitions)
for time, groupedMetricList := range groupedMetrics {
// metrics will have to be grouped by allowed dimensions
dimMetrics := groupMetricsByAllowedDimensions(groupedMetricList)
for dimension, mets := range dimMetrics {
var dimensions []azure.Dimension
if dimension != azure.NoDimension {
dimensions = []azure.Dimension{{Name: dimension, Value: "*"}}
}
metrics = append(metrics, client.MapMetricByPrimaryAggregation(mets, resourceId, location, subscriptionId, resourceID, namespace, dimensions, time)...)
}
}
}
return metrics, nil
}