func()

in internal/hostmetrics/cloudmetricreader/cloudmetricreader.go [169:247]


func (r *CloudMetricReader) createDiskMetrics(ctx context.Context, disks []*instancepb.Disk, config *configpb.Configuration, refresh time.Time) []*metricspb.Metric {
	var metrics []*metricspb.Metric
	if len(disks) == 0 {
		return metrics
	}

	var deviceNames, deviceNamesFilter []string
	for _, disk := range disks {
		name := disk.GetDiskName()
		if name == "" {
			name = disk.GetDeviceName()
		}
		deviceNames = append(deviceNames, name)
		deviceNamesFilter = append(deviceNamesFilter, fmt.Sprintf("metric.device_name='%s'", name))
		if disk.GetProvisionedIops() > 0 {
			log.CtxLogger(ctx).Infow("Adding Guaranteed IOps for disk", "disk", name,
				"iops", disk.GetProvisionedIops())
			metrics = append(metrics, buildMetric(ctx, metricDiskGuaranteedIops, float64(disk.GetProvisionedIops()), refresh, name))
		}
		if disk.GetProvisionedThroughput() > 0 {
			log.CtxLogger(ctx).Infow("Adding Guaranteed Throughput for disk", "disk", name,
				"throughput", disk.GetProvisionedThroughput())
			metrics = append(metrics, buildMetric(ctx, metricDiskGuaranteedThroughput, float64(disk.GetProvisionedThroughput()), refresh, name))
		}
	}

	diskDeltaMetrics := []sapMetricKey{metricDiskReadBytesCount, metricDiskWriteBytesCount, metricDiskReadOpsCount, metricDiskWriteOpsCount}
	diskRateMetrics := []sapMetricKey{metricDiskReadOpsCountRate, metricDiskWriteOpsCountRate}
	diskMaxMetrics := []sapMetricKey{metricDiskMaxReadOpsCount, metricDiskMaxWriteOpsCount}

	var diskDeltaData []*mrpb.TimeSeriesData
	var diskRateData []*mrpb.TimeSeriesData
	var diskMaxData []*mrpb.TimeSeriesData
	cp := config.GetCloudProperties()
	if cp != nil {
		filter := fmt.Sprintf("resource.instance_id='%s' && (%s)", cp.GetInstanceId(), strings.Join(deviceNamesFilter, " || "))
		diskDeltaData = r.queryTimeSeriesData(ctx, diskDeltaMetrics, cp.GetProjectId(), queryCallOptions{
			filter:          filter,
			start:           refresh.Add(-3 * time.Minute),
			end:             refresh,
			alignmentPeriod: "60s",
			alignment:       "delta",
		})
		diskRateData = r.queryTimeSeriesData(ctx, diskRateMetrics, cp.GetProjectId(), queryCallOptions{
			filter:          filter,
			start:           refresh.Add(-3 * time.Minute),
			end:             refresh,
			alignmentPeriod: "60s",
			alignment:       "rate",
		})
		diskMaxData = r.queryTimeSeriesData(ctx, diskMaxMetrics, cp.GetProjectId(), queryCallOptions{
			filter:          filter,
			start:           refresh.Add(-3 * time.Minute),
			end:             refresh,
			alignmentPeriod: "60s",
			aggregation:     "max",
		})
	}
	diskDeltaMetricValues := parseTimeSeriesDataByDisk(ctx, deviceNames, diskDeltaMetrics, diskDeltaData)
	log.CtxLogger(ctx).Debugw("Disk delta metric values", "values", diskDeltaMetricValues)
	diskRateMetricValues := parseTimeSeriesDataByDisk(ctx, deviceNames, diskRateMetrics, diskRateData)
	log.CtxLogger(ctx).Debugw("Disk rate metric values", "values", diskRateMetricValues)
	diskMaxMetricValues := parseTimeSeriesDataByDisk(ctx, deviceNames, diskMaxMetrics, diskMaxData)
	log.CtxLogger(ctx).Debugw("Disk max metric values", "values", diskMaxMetricValues)

	for _, deviceID := range deviceNames {
		for _, k := range diskDeltaMetrics {
			if v, ok := diskDeltaMetricValues[deviceID][k]; ok {
				metrics = append(metrics, buildMetric(ctx, k, v, refresh, deviceID))
			}
		}
		readOpsRate := diskRateMetricValues[deviceID][metricDiskReadOpsCountRate]
		writeOpsRate := diskRateMetricValues[deviceID][metricDiskWriteOpsCountRate]
		maxReadOps := diskMaxMetricValues[deviceID][metricDiskMaxReadOpsCount]
		maxWriteOps := diskMaxMetricValues[deviceID][metricDiskMaxWriteOpsCount]
		metrics = append(metrics, buildVolumeUtilization(readOpsRate, writeOpsRate, maxReadOps, maxWriteOps, refresh, deviceID))
	}
	return metrics
}