func()

in clusterloader2/pkg/measurement/common/slos/api_responsiveness_prometheus.go [169:249]


func (a *apiResponsivenessGatherer) gatherAPICalls(executor common.QueryExecutor, startTime, endTime time.Time, config *measurement.Config) (*apiCallMetrics, error) {
	measurementDuration := endTime.Sub(startTime)
	promDuration := measurementutil.ToPrometheusTime(measurementDuration)
	apiserverSLI := measurementutil.GetApiserverSLI(config.ClusterVersion)
	apiserverLatency := measurementutil.GetApiserverLatency(config.ClusterVersion)

	useSimple, err := util.GetBoolOrDefault(config.Params, "useSimpleLatencyQuery", false)
	if err != nil {
		return nil, err
	}

	var latencySamples []*model.Sample
	if useSimple {
		quantiles := []float64{0.5, 0.9, 0.99}
		for _, q := range quantiles {
			query := fmt.Sprintf(simpleLatencyQuery, q, apiserverSLI, filters, promDuration)
			samples, err := executor.Query(query, endTime)
			if err != nil {
				return nil, err
			}
			// Underlying code assumes presence of 'quantile' label, so adding it manually.
			for _, sample := range samples {
				sample.Metric["quantile"] = model.LabelValue(fmt.Sprintf("%.2f", q))
			}
			latencySamples = append(latencySamples, samples...)

			query = fmt.Sprintf(watchListLatencyQuery, q, watchListLatencyMetricName, promDuration)
			samples, err = executor.Query(query, endTime)
			if err != nil {
				return nil, err
			}
			for _, sample := range samples {
				sample.Metric["quantile"] = model.LabelValue(fmt.Sprintf("%.2f", q))
				sample.Metric["verb"] = "WATCHLIST"
			}
			latencySamples = append(latencySamples, samples...)
		}
	} else {
		// Latency measurement is based on 5m window aggregation,
		// therefore first 5 minutes of the test should be skipped.
		latencyMeasurementDuration := measurementDuration - latencyWindowSize
		if latencyMeasurementDuration < time.Minute {
			latencyMeasurementDuration = time.Minute
		}
		duration := measurementutil.ToPrometheusTime(latencyMeasurementDuration)

		query := fmt.Sprintf(latencyQuery, apiserverLatency, filters, duration)
		latencySamples, err = executor.Query(query, endTime)
		if err != nil {
			return nil, err
		}
	}

	query := fmt.Sprintf(countQuery, apiserverSLI, filters, promDuration)
	countSamples, err := executor.Query(query, endTime)
	if err != nil {
		return nil, err
	}
	query = fmt.Sprintf(countQuery, watchListLatencyMetricName, "", promDuration)
	watchListCountSamples, err := executor.Query(query, endTime)
	if err != nil {
		return nil, err
	}
	for _, sample := range watchListCountSamples {
		sample.Metric["verb"] = "WATCHLIST"
		countSamples = append(countSamples, sample)
	}

	countFastSamples := make([]*model.Sample, 0)
	filters := []string{filterGetAndMutating, filterNamespaceList, filterClusterList}
	for _, filter := range filters {
		query := fmt.Sprintf(countFastQuery, apiserverSLI, filter, promDuration)
		samples, err := executor.Query(query, endTime)
		if err != nil {
			return nil, err
		}
		countFastSamples = append(countFastSamples, samples...)
	}

	return newFromSamples(latencySamples, countSamples, countFastSamples)
}