func collectAndSendRemoteMetrics()

in internal/workloadmanager/remote.go [68:151]


func collectAndSendRemoteMetrics(ctx context.Context, params Parameters) int {
	rc := params.Config.GetCollectionConfiguration().GetWorkloadValidationRemoteCollection()
	// make sure collection via gcloud or ssh is defined
	if rc.GetRemoteCollectionSsh() == nil && rc.GetRemoteCollectionGcloud() == nil {
		log.CtxLogger(ctx).Error("One of remote_collection_gcloud or remote_collection_ssh must be defined for remote collection")
		return 0
	}

	tempFile, err := createWorkloadValidationFile(ctx, params.WorkloadConfig)
	if err != nil {
		log.CtxLogger(ctx).Errorw("Could not create temporary file for workload validation", "error", err)
		return 0
	}
	defer os.Remove(tempFile.Name())

	wp := workerpool.New(int(params.Config.GetCollectionConfiguration().GetWorkloadValidationRemoteCollection().GetConcurrentCollections()))
	mu := &sync.Mutex{}
	metricsSent := 0
	var routines []*recovery.RecoverableRoutine
	for _, inst := range rc.GetRemoteCollectionInstances() {
		inst := inst
		ch := make(chan WorkloadMetrics)
		wp.Submit(func() {
			log.CtxLogger(ctx).Infow("Collecting metrics from", "instance", inst)
			var r *recovery.RecoverableRoutine
			if rc.GetRemoteCollectionSsh() != nil {
				r = &recovery.RecoverableRoutine{
					Routine: collectRemoteSSH,
					RoutineArg: collectOptions{
						exists:     params.Exists,
						execute:    params.Execute,
						configPath: tempFile.Name(),
						rc:         rc,
						i:          inst,
						wm:         ch,
					},
					ErrorCode:           usagemetrics.RemoteCollectSSHFailure,
					UsageLogger:         *usagemetrics.Logger,
					ExpectedMinDuration: time.Minute,
				}
			} else if rc.GetRemoteCollectionGcloud() != nil {
				r = &recovery.RecoverableRoutine{
					Routine: collectRemoteGcloud,
					RoutineArg: collectOptions{
						exists:     params.Exists,
						execute:    params.Execute,
						configPath: tempFile.Name(),
						rc:         rc,
						i:          inst,
						wm:         ch,
					},
					ErrorCode:           usagemetrics.RemoteCollectGcloudFailure,
					UsageLogger:         *usagemetrics.Logger,
					ExpectedMinDuration: time.Minute,
				}
			}
			if r != nil {
				routines = append(routines, r)
				r.StartRoutine(ctx)
			}
			wm := <-ch
			// lock so we can update the metricsSent
			mu.Lock()
			defer mu.Unlock()
			remoteCp := &ipb.CloudProperties{
				ProjectId:    inst.GetProjectId(),
				InstanceId:   inst.GetInstanceId(),
				Zone:         inst.GetZone(),
				InstanceName: inst.GetInstanceName(),
			}
			metricsSent += sendMetrics(ctx, sendMetricsParams{
				wm:                    wm,
				cp:                    remoteCp,
				bareMetal:             params.Config.GetBareMetal(),
				sendToCloudMonitoring: params.Config.GetSupportConfiguration().GetSendWorkloadValidationMetricsToCloudMonitoring().GetValue(),
				timeSeriesCreator:     params.TimeSeriesCreator,
				backOffIntervals:      params.BackOffs,
				wlmService:            params.WLMService,
			})
		})
	}
	wp.StopWait()
	return metricsSent
}