func()

in pkg/resmgr/hostmover/batch_scorer.go [172:344]


func (s *batchScorer) sortOnce() error {
	log.Debug("sorting batch hosts by host scores")

	// Make call to host pool manager to get the list of batch hosts
	var batchHosts []string
	ctx, cancelFunc := context.WithTimeout(context.Background(), _timeout)
	defer cancelFunc()
	resp, err := s.hostClient.ListHostPools(
		ctx,
		&hostsvc.ListHostPoolsRequest{})

	if err != nil {
		log.WithError(err).Error("error in ListHostPools")
		return err
	}

	poolFound := false
	for _, p := range resp.GetPools() {
		if p.GetName() == common.SharedHostPoolID {
			batchHosts = append(batchHosts, p.GetHosts()...)
			poolFound = true
		}
	}
	log.WithFields(log.Fields{
		"number of batch hosts": len(batchHosts),
		"hosts":                 batchHosts,
	}).Info("all batch hosts from host manager")

	if !poolFound {
		return fmt.Errorf("pool %q not "+
			"found", common.SharedHostPoolID)
	}

	hostTasks := s.rmTracker.TasksByHosts(batchHosts, resmgr.TaskType_BATCH)
	currentTime := time.Now()

	var noTasksHosts []string
	for _, batchHost := range batchHosts {
		if _, ok := hostTasks[batchHost]; !ok {
			noTasksHosts = append(noTasksHosts, batchHost)
		}
	}

	log.WithFields(log.Fields{
		"number of hosts": len(batchHosts),
		"hosts":           batchHosts,
	}).Info("all batch hosts which does not have tasks")

	hostMetrics := make([]*batchHostMetrics, len(hostTasks))
	index := 0
	for host, tasksPerHost := range hostTasks {
		hostMetrics[index] = newBatchHostMetrics(host)

		for _, task := range tasksPerHost {
			if task.GetCurrentState().State == pbtask.TaskState_LAUNCHED ||
				task.GetCurrentState().State == pbtask.TaskState_STARTING ||
				task.GetCurrentState().State == pbtask.TaskState_RUNNING {
				// Update metrics
				if task.Task().Controller {
					hostMetrics[index].numControllers++
				}

				if !task.Task().Preemptible {
					hostMetrics[index].numNonpreemptible++
				}

				hostMetrics[index].numTasks++

				if tasksPriority, ok := hostMetrics[index].tasksPriority[task.Respool().ID()]; !ok {
					tasksPriority = make([]uint32, 0)
					tasksPriority = append(tasksPriority, task.Task().Priority)
					hostMetrics[index].tasksPriority[task.Respool().ID()] = tasksPriority
				} else {
					tasksPriority = append(tasksPriority, task.Task().Priority)
					hostMetrics[index].tasksPriority[task.Respool().ID()] = tasksPriority
				}

				hostMetrics[index].tasksAvgRunTime += float64(currentTime.Sub(task.RunTimeStats().StartTime))
			}
		}
		if len(tasksPerHost) > 0 {
			hostMetrics[index].tasksAvgRunTime /= float64(len(tasksPerHost))
		}

		index++
	}

	// Normalize the task priorities
	// The priorities in each resource pool are normalized to
	// same scale and aggregated
	poolMaxPriorities := make(map[string]float64)
	clusterMaxPriority := 0.0

	for _, metrics := range hostMetrics {
		for poolID, priorities := range metrics.tasksPriority {
			for _, priority := range priorities {
				clusterMaxPriority = math.Max(clusterMaxPriority, float64(priority))

				if _, ok := poolMaxPriorities[poolID]; !ok {
					poolMaxPriorities[poolID] = float64(priority)
				} else {
					poolMaxPriorities[poolID] = math.Max(poolMaxPriorities[poolID], float64(priority))
				}
			}
		}
	}

	// Aggregated normalized priority of all the tasks
	aggrPriorities := make([]float64, len(hostMetrics))
	for index, metrics := range hostMetrics {
		for poolID, priorities := range metrics.tasksPriority {
			poolMaxPriority := poolMaxPriorities[poolID]
			ratio := 0.0
			if poolMaxPriority != 0 {
				ratio = clusterMaxPriority / poolMaxPriority
			}

			for _, priority := range priorities {
				aggrPriorities[index] += float64(priority) * ratio
			}
		}
	}

	// Sort the hosts by the metrics
	sort.Slice(hostMetrics, func(i, j int) bool {
		if hostMetrics[i].numNonpreemptible != hostMetrics[j].numNonpreemptible {
			return hostMetrics[i].numNonpreemptible < hostMetrics[j].numNonpreemptible
		}
		if hostMetrics[i].numControllers != hostMetrics[j].numControllers {
			return hostMetrics[i].numControllers < hostMetrics[j].numControllers
		}
		if hostMetrics[i].numTasks != hostMetrics[j].numTasks {
			return hostMetrics[i].numTasks < hostMetrics[j].numTasks
		}
		if aggrPriorities[i] != aggrPriorities[j] {
			return aggrPriorities[i] < aggrPriorities[j]
		}
		if hostMetrics[i].tasksAvgRunTime != hostMetrics[j].tasksAvgRunTime {
			return hostMetrics[i].tasksAvgRunTime < hostMetrics[j].tasksAvgRunTime
		}
		return true
	})

	//hosts := make([]string, 0, len(hostMetrics))
	var hosts []string
	for _, metrics := range hostMetrics {
		if metrics.numNonpreemptible == 0 {
			hosts = append(hosts, metrics.host)
		}
	}

	log.WithFields(log.Fields{
		"no task hosts":      noTasksHosts,
		"len no task hosts ": len(noTasksHosts),
		"taks hosts":         hosts,
		"len task hosts":     len(hosts),
	}).Info("hosts breakdown before appending")

	// Append hosts with no task running on top of the list
	hosts = append(noTasksHosts, hosts...)
	// copy to target hosts array
	s.hostsLock.Lock()
	defer s.hostsLock.Unlock()

	log.WithFields(log.Fields{
		"all hosts":          hosts,
		"len no task hosts ": len(hosts),
	}).Info("final hosts list")

	s.orderedHosts = hosts

	return nil
}