x-pack/metricbeat/module/azure/client_batch.go (242 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package azure
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/elastic-agent-libs/logp"
)
// BatchClient represents the azure batch client which will make use of the azure sdk go metrics related clients
type BatchClient struct {
*BaseClient
ResourceConfigurations ConcurrentResourceConfig
}
// Resource definitions grouping criteria
type ResDefGroupingCriteria struct {
Namespace string
SubscriptionID string
Location string
Names string
Aggregations string
TimeGrain string
Dimensions string
}
// concurrentMapResourceMetrics function type will map the configuration options to Batch Client metrics (depending on the metricset)
type concurrentMapResourceMetrics func(client *BatchClient, resources []*armresources.GenericResourceExpanded, resourceConfig ResourceConfig, wg *sync.WaitGroup)
// NewBatchClient instantiates the Azure monitoring batch client
func NewBatchClient(config Config) (*BatchClient, error) {
azureMonitorService, err := NewService(config)
if err != nil {
return nil, err
}
logger := logp.NewLogger("azure monitor client")
client := &BatchClient{
BaseClient: &BaseClient{
AzureMonitorService: azureMonitorService,
Config: config,
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
},
}
client.ResourceConfigurations.MetricDefinitions = MetricDefinitions{
Update: true,
Metrics: make(map[string][]Metric),
}
client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval
client.ResourceConfigurations.MetricDefinitionsChan = nil
client.ResourceConfigurations.ErrorChan = nil
return client, nil
}
// InitResources function will retrieve and validate the resources configured by the users and then map the information configured to client metrics.
// the mapMetric function sent in this case will handle the mapping part as different metric and aggregation options work for different metricsets
func (client *BatchClient) InitResources(fn concurrentMapResourceMetrics) error {
if len(client.Config.Resources) == 0 {
return fmt.Errorf("no resource options defined")
}
// check if refresh interval has been set and if it has expired
if !client.ResourceConfigurations.Expired() {
client.Log.Debug("MetricDefinitions are not expired. Writing metrics to MetricDefinitionsChan")
client.ResourceConfigurations.MetricDefinitionsChan = make(chan []Metric)
client.ResourceConfigurations.ErrorChan = make(chan error, 1)
// MetricDefinitions do not need update
client.ResourceConfigurations.MetricDefinitions.Update = false
go func() {
defer close(client.ResourceConfigurations.MetricDefinitionsChan)
defer close(client.ResourceConfigurations.ErrorChan)
for _, metrics := range client.ResourceConfigurations.MetricDefinitions.Metrics {
client.ResourceConfigurations.MetricDefinitionsChan <- metrics
}
}()
return nil
}
// MetricDefinitions need update
client.ResourceConfigurations.MetricDefinitions.Update = true
// Initialize a WaitGroup to track all goroutines
var wg sync.WaitGroup
//reset client resources
client.Resources = []Resource{}
for _, resourceConfig := range client.Config.Resources {
// retrieve azure resources information
resourceList, err := client.AzureMonitorService.GetResourceDefinitions(resourceConfig.Id, resourceConfig.Group, resourceConfig.Type, resourceConfig.Query)
if err != nil {
err = fmt.Errorf("failed to retrieve resources: %w", err)
return err
}
if len(resourceList) == 0 {
err = fmt.Errorf("failed to retrieve resources: No resources returned using the configuration options resource ID %s, resource group %s, resource type %s, resource query %s",
resourceConfig.Id, resourceConfig.Group, resourceConfig.Type, resourceConfig.Query)
client.Log.Error(err)
continue
}
// create the channels if they are not already created by a previous itteration
if client.ResourceConfigurations.MetricDefinitionsChan == nil && client.ResourceConfigurations.ErrorChan == nil {
client.ResourceConfigurations.MetricDefinitionsChan = make(chan []Metric)
client.ResourceConfigurations.ErrorChan = make(chan error, 1)
}
// Map resources to the client
for _, resource := range resourceList {
if !containsResource(*resource.ID, client.Resources) {
client.Resources = append(client.Resources, Resource{
Id: *resource.ID,
Name: *resource.Name,
Location: *resource.Location,
Type: *resource.Type,
Group: getResourceGroupFromId(*resource.ID),
Tags: mapTags(resource.Tags),
Subscription: client.Config.SubscriptionId})
}
}
// Collects and stores metrics definitions for the cloud resources.
wg.Add(1)
fn(client, resourceList, resourceConfig, &wg)
client.Log.Infof("Finished collection with %d metric definitions", len(resourceList))
}
go func() {
wg.Wait() // Wait for all the resource collection goroutines to finish
// Once all the goroutines are done, close the channels
client.Log.Debug("All collections finished. Closing channels ")
close(client.ResourceConfigurations.MetricDefinitionsChan)
close(client.ResourceConfigurations.ErrorChan)
}()
return nil
}
// GetMetricsInBatch will query the batch API for each group
func (client *BatchClient) GetMetricsInBatch(groupedMetrics map[ResDefGroupingCriteria][]Metric, referenceTime time.Time, reporter mb.ReporterV2) []Metric {
var result []Metric
for criteria, metricsDefinitions := range groupedMetrics {
// Same end time for all metrics in the same batch.
interval := client.Config.Period
// // Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}.
endTime := referenceTime
timespanDuration := max(asDuration(criteria.TimeGrain), interval)
startTime := endTime.Add(timespanDuration * -1)
// Limit batch size to 50 resources (if you have more, you can split the batch)
filter := ""
if len(metricsDefinitions[0].Dimensions) > 0 {
var filterList []string
for _, dim := range metricsDefinitions[0].Dimensions {
filterList = append(filterList, dim.Name+" eq '"+dim.Value+"'")
}
filter = strings.Join(filterList, " AND ")
}
for i := 0; i < len(metricsDefinitions); i += BatchApiResourcesLimit {
end := i + BatchApiResourcesLimit
if end > len(metricsDefinitions) {
end = len(metricsDefinitions)
}
// Slice the metrics to form the batch request
batchMetrics := metricsDefinitions[i:end]
// Slice the Metric Names by batches of 20 due to batch api limitation
names := strings.Split(criteria.Names, ",")
for j := 0; j < len(names); j += metricNameLimit {
endMetric := j + metricNameLimit
if endMetric > len(names) {
endMetric = len(names)
}
// Make the batch API call (adjust parameters as needed)
response, err := client.AzureMonitorService.QueryResources(
getResourceIDs(batchMetrics), // Get the resource IDs from the batch
criteria.SubscriptionID,
criteria.Namespace,
criteria.TimeGrain,
startTime.Format("2006-01-02T15:04:05.000Z07:00"),
endTime.Format("2006-01-02T15:04:05.000Z07:00"),
names[j:endMetric],
strings.ToLower(batchMetrics[0].Aggregations),
filter,
criteria.Location,
)
if err != nil {
err = fmt.Errorf("error while listing metric values by resource ID %s and namespace %s: %w", metricsDefinitions[0].ResourceSubId, metricsDefinitions[0].Namespace, err)
client.Log.Error(err)
reporter.Error(err)
continue
}
// Process the response as needed
for i, v := range response {
client.MetricRegistry.Update(metricsDefinitions[i], MetricCollectionInfo{
timeGrain: *response[i].Interval,
timestamp: referenceTime,
})
values := mapBatchMetricValues(client, v)
metricsDefinitions[i].Values = append(metricsDefinitions[i].Values, values...)
if metricsDefinitions[i].TimeGrain == "" {
metricsDefinitions[i].TimeGrain = *response[i].Interval
}
}
result = append(result, metricsDefinitions...)
}
}
}
return result
}
// GroupAndStoreMetrics groups received metricsDefinitions and stores them in a in memory store
func (client *BatchClient) GroupAndStoreMetrics(metricsDefinitions []Metric, referenceTime time.Time, store map[ResDefGroupingCriteria]*MetricStore) {
for _, metric := range metricsDefinitions {
criteria := ResDefGroupingCriteria{
Namespace: metric.Namespace,
SubscriptionID: metric.SubscriptionId,
Location: metric.Location,
Names: strings.Join(metric.Names, ","),
TimeGrain: metric.TimeGrain,
Dimensions: getDimensionKey(metric.Dimensions),
}
//
// Before fetching the metric values, check if the metric
// has been collected within the time grain.
//
// Why do we need this?
//
// Some metricsets contains metrics with long time grains (e.g. 1 hour).
//
// If we collect the metric values every 5 minutes, we will end up fetching
// the same data over and over again for all metrics with a time grain
// larger than 5 minutes.
//
// The registry keeps track of the last timestamp the metricset collected
// the metric values and the time grain used.
//
// By comparing the last collection time with the current time, and
// the time grain of the metric, we can determine if the metric needs
// to be collected again, or if we can skip it.
//
if !client.MetricRegistry.NeedsUpdate(referenceTime, metric) {
continue
}
if _, exists := store[criteria]; !exists {
store[criteria] = &MetricStore{}
}
store[criteria].AddMetric(metric)
}
}
// CreateMetric function will create a client metric based on the resource and metrics configured
func (client *BatchClient) CreateMetric(resourceId string, subResourceId string, namespace string, location string, subscriptionId string, metrics []string, aggregations string, dimensions []Dimension, timeGrain string) Metric {
if subResourceId == "" {
subResourceId = resourceId
}
met := Metric{
ResourceId: resourceId,
ResourceSubId: subResourceId,
Namespace: namespace,
Names: metrics,
Dimensions: dimensions,
Aggregations: aggregations,
TimeGrain: timeGrain,
Location: location,
SubscriptionId: subscriptionId,
}
if prevMetrics, ok := client.ResourceConfigurations.MetricDefinitions.Metrics[resourceId]; ok {
for _, prevMet := range prevMetrics {
if len(prevMet.Values) != 0 && matchMetrics(prevMet, met) {
met.Values = prevMet.Values
}
}
}
return met
}
// MapMetricByPrimaryAggregation will map the primary aggregation of the metric definition to the client metric
func (client *BatchClient) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, location string, subscriptionId string, subResourceId string, namespace string, dim []Dimension, timeGrain string) []Metric {
clientMetrics := make([]Metric, 0)
metricGroups := make(map[string][]armmonitor.MetricDefinition)
for _, met := range metrics {
metricGroups[string(*met.PrimaryAggregationType)] = append(metricGroups[string(*met.PrimaryAggregationType)], met)
}
for key, metricGroup := range metricGroups {
var metricNames []string
for _, metricName := range metricGroup {
metricNames = append(metricNames, *metricName.Name.Value)
}
clientMetrics = append(clientMetrics, client.CreateMetric(resourceId, subResourceId, namespace, location, subscriptionId, metricNames, key, dim, timeGrain))
}
return clientMetrics
}
// NewMockBatchClient instantiates a new batch client with the mock azure service
func NewMockBatchClient() *BatchClient {
azureMockService := new(MockService)
logger := logp.NewLogger("test azure monitor")
client := &BatchClient{
BaseClient: &BaseClient{
AzureMonitorService: azureMockService,
Config: Config{},
Log: logger,
MetricRegistry: NewMetricRegistry(logger),
},
}
return client
}