func()

in x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go [542:675]


func (m *MetricSet) createEvents(svcCloudwatch cloudwatch.GetMetricDataAPIClient, svcResourceAPI resourcegroupstaggingapi.GetResourcesAPIClient, listMetricWithStatsTotal []metricsWithStatistics, resourceTypeTagFilters map[string][]aws.Tag, infoAPImap map[string]string, regionName string, startTime time.Time, endTime time.Time) (map[string]mb.Event, error) {
	// Initialize events for each identifier.
	events := make(map[string]mb.Event)

	// Construct metricDataQueries
	metricDataQueries := createMetricDataQueries(listMetricWithStatsTotal, m.DataGranularity)
	m.logger.Debugf("Number of MetricDataQueries = %d", len(metricDataQueries))
	if len(metricDataQueries) == 0 {
		return events, nil
	}

	// Use metricDataQueries to make GetMetricData API calls
	metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
	m.logger.Debugf("Number of metricDataResults = %d", len(metricDataResults))
	if err != nil {
		return events, fmt.Errorf("getMetricDataResults failed: %w", err)
	}

	// Create events when there is no tags_filter or resource_type specified.
	if len(resourceTypeTagFilters) == 0 {
		for _, metricDataResult := range metricDataResults {
			if len(metricDataResult.Values) == 0 {
				continue
			}
			labels := strings.Split(*metricDataResult.Label, aws.LabelConst.LabelSeparator)
			for valI, metricDataResultValue := range metricDataResult.Values {
				if len(labels) != aws.LabelConst.LabelLengthTotal {
					// when there is no identifier value in label, use id+label+region+accountID+namespace+index instead
					identifier := labels[aws.LabelConst.AccountIdIdx] + labels[aws.LabelConst.AccountLabelIdx] + regionName + m.MonitoringAccountID + labels[aws.LabelConst.NamespaceIdx] + fmt.Sprint("-", valI)
					if _, ok := events[identifier]; !ok {
						if labels[aws.LabelConst.AccountIdIdx] != "" {
							events[identifier] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
						} else {
							events[identifier] = aws.InitEvent(regionName, m.MonitoringAccountName, m.MonitoringAccountID, metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
						}
					}
					events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
					continue
				}

				identifierValue := labels[aws.LabelConst.IdentifierValueIdx] + fmt.Sprint("-", valI)
				if _, ok := events[identifierValue]; !ok {
					events[identifierValue] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], metricDataResult.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
				}
				events[identifierValue] = insertRootFields(events[identifierValue], metricDataResultValue, labels)
			}
		}
		return events, nil
	}

	// Create events with tags
	for resourceType, tagsFilter := range resourceTypeTagFilters {
		m.logger.Debugf("resourceType = %s", resourceType)
		m.logger.Debugf("tagsFilter = %s", tagsFilter)
		resourceTagMap, err := aws.GetResourcesTags(svcResourceAPI, []string{resourceType})
		if err != nil {
			// If GetResourcesTags failed, continue report event just without tags.
			m.logger.Info(fmt.Errorf("getResourcesTags failed, skipping region %s: %w", regionName, err))
		}

		if len(tagsFilter) != 0 && len(resourceTagMap) == 0 {
			continue
		}

		// filter resourceTagMap
		for identifier, tags := range resourceTagMap {
			if exists := aws.CheckTagFiltersExist(tagsFilter, tags); !exists {
				m.logger.Debugf("In region %s, service %s tags does not match tags_filter", regionName, identifier)
				delete(resourceTagMap, identifier)
				continue
			}
			m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier)
		}

		for _, output := range metricDataResults {
			if len(output.Values) == 0 {
				continue
			}

			labels := strings.Split(*output.Label, aws.LabelConst.LabelSeparator)
			for valI, metricDataResultValue := range output.Values {
				if len(labels) != aws.LabelConst.LabelLengthTotal {
					// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
					if len(tagsFilter) != 0 {
						continue
					}

					// when there is no identifier value in label, use id+label+region+accountID+namespace+index instead
					identifier := labels[aws.LabelConst.AccountIdIdx] + labels[aws.LabelConst.AccountLabelIdx] + regionName + m.MonitoringAccountID + labels[aws.LabelConst.NamespaceIdx] + fmt.Sprint("-", valI)
					if _, ok := events[identifier]; !ok {
						if labels[aws.LabelConst.AccountIdIdx] != "" {
							events[identifier] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
						} else {
							events[identifier] = aws.InitEvent(regionName, m.MonitoringAccountName, m.MonitoringAccountID, output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
						}
					}
					events[identifier] = insertRootFields(events[identifier], metricDataResultValue, labels)
					continue
				}

				identifierValue := labels[aws.LabelConst.IdentifierValueIdx]
				uniqueIdentifierValue := identifierValue + fmt.Sprint("-", valI)

				// add tags to event based on identifierValue
				// Check if identifier includes dimensionSeparator (comma in this case),
				// split the identifier and check for each sub-identifier.
				// For example, identifier might be [storageType, s3BucketName].
				// And tags are only store under s3BucketName in resourceTagMap.
				subIdentifiers := strings.Split(identifierValue, dimensionSeparator)
				for _, subIdentifier := range subIdentifiers {

					if len(infoAPImap) > 0 { // If infoAPImap includes data
						if valAPIName, ok := infoAPImap[subIdentifier]; ok {
							subIdentifier = valAPIName
						}
					}

					if _, ok := events[uniqueIdentifierValue]; !ok {
						// when tagsFilter is not empty but no entry in
						// resourceTagMap for this identifier, do not initialize
						// an event for this identifier.
						if len(tagsFilter) != 0 && resourceTagMap[subIdentifier] == nil {
							continue
						}
						events[uniqueIdentifierValue] = aws.InitEvent(regionName, labels[aws.LabelConst.AccountLabelIdx], labels[aws.LabelConst.AccountIdIdx], output.Timestamps[valI], labels[aws.LabelConst.PeriodLabelIdx])
					}
					events[uniqueIdentifierValue] = insertRootFields(events[uniqueIdentifierValue], metricDataResultValue, labels)
					insertTags(events, uniqueIdentifierValue, subIdentifier, resourceTagMap)
				}
			}
		}
	}
	return events, nil
}