in pkg/resmgr/hostmover/batch_scorer.go [172:344]
func (s *batchScorer) sortOnce() error {
log.Debug("sorting batch hosts by host scores")
// Make call to host pool manager to get the list of batch hosts
var batchHosts []string
ctx, cancelFunc := context.WithTimeout(context.Background(), _timeout)
defer cancelFunc()
resp, err := s.hostClient.ListHostPools(
ctx,
&hostsvc.ListHostPoolsRequest{})
if err != nil {
log.WithError(err).Error("error in ListHostPools")
return err
}
poolFound := false
for _, p := range resp.GetPools() {
if p.GetName() == common.SharedHostPoolID {
batchHosts = append(batchHosts, p.GetHosts()...)
poolFound = true
}
}
log.WithFields(log.Fields{
"number of batch hosts": len(batchHosts),
"hosts": batchHosts,
}).Info("all batch hosts from host manager")
if !poolFound {
return fmt.Errorf("pool %q not "+
"found", common.SharedHostPoolID)
}
hostTasks := s.rmTracker.TasksByHosts(batchHosts, resmgr.TaskType_BATCH)
currentTime := time.Now()
var noTasksHosts []string
for _, batchHost := range batchHosts {
if _, ok := hostTasks[batchHost]; !ok {
noTasksHosts = append(noTasksHosts, batchHost)
}
}
log.WithFields(log.Fields{
"number of hosts": len(batchHosts),
"hosts": batchHosts,
}).Info("all batch hosts which does not have tasks")
hostMetrics := make([]*batchHostMetrics, len(hostTasks))
index := 0
for host, tasksPerHost := range hostTasks {
hostMetrics[index] = newBatchHostMetrics(host)
for _, task := range tasksPerHost {
if task.GetCurrentState().State == pbtask.TaskState_LAUNCHED ||
task.GetCurrentState().State == pbtask.TaskState_STARTING ||
task.GetCurrentState().State == pbtask.TaskState_RUNNING {
// Update metrics
if task.Task().Controller {
hostMetrics[index].numControllers++
}
if !task.Task().Preemptible {
hostMetrics[index].numNonpreemptible++
}
hostMetrics[index].numTasks++
if tasksPriority, ok := hostMetrics[index].tasksPriority[task.Respool().ID()]; !ok {
tasksPriority = make([]uint32, 0)
tasksPriority = append(tasksPriority, task.Task().Priority)
hostMetrics[index].tasksPriority[task.Respool().ID()] = tasksPriority
} else {
tasksPriority = append(tasksPriority, task.Task().Priority)
hostMetrics[index].tasksPriority[task.Respool().ID()] = tasksPriority
}
hostMetrics[index].tasksAvgRunTime += float64(currentTime.Sub(task.RunTimeStats().StartTime))
}
}
if len(tasksPerHost) > 0 {
hostMetrics[index].tasksAvgRunTime /= float64(len(tasksPerHost))
}
index++
}
// Normalize the task priorities
// The priorities in each resource pool are normalized to
// same scale and aggregated
poolMaxPriorities := make(map[string]float64)
clusterMaxPriority := 0.0
for _, metrics := range hostMetrics {
for poolID, priorities := range metrics.tasksPriority {
for _, priority := range priorities {
clusterMaxPriority = math.Max(clusterMaxPriority, float64(priority))
if _, ok := poolMaxPriorities[poolID]; !ok {
poolMaxPriorities[poolID] = float64(priority)
} else {
poolMaxPriorities[poolID] = math.Max(poolMaxPriorities[poolID], float64(priority))
}
}
}
}
// Aggregated normalized priority of all the tasks
aggrPriorities := make([]float64, len(hostMetrics))
for index, metrics := range hostMetrics {
for poolID, priorities := range metrics.tasksPriority {
poolMaxPriority := poolMaxPriorities[poolID]
ratio := 0.0
if poolMaxPriority != 0 {
ratio = clusterMaxPriority / poolMaxPriority
}
for _, priority := range priorities {
aggrPriorities[index] += float64(priority) * ratio
}
}
}
// Sort the hosts by the metrics
sort.Slice(hostMetrics, func(i, j int) bool {
if hostMetrics[i].numNonpreemptible != hostMetrics[j].numNonpreemptible {
return hostMetrics[i].numNonpreemptible < hostMetrics[j].numNonpreemptible
}
if hostMetrics[i].numControllers != hostMetrics[j].numControllers {
return hostMetrics[i].numControllers < hostMetrics[j].numControllers
}
if hostMetrics[i].numTasks != hostMetrics[j].numTasks {
return hostMetrics[i].numTasks < hostMetrics[j].numTasks
}
if aggrPriorities[i] != aggrPriorities[j] {
return aggrPriorities[i] < aggrPriorities[j]
}
if hostMetrics[i].tasksAvgRunTime != hostMetrics[j].tasksAvgRunTime {
return hostMetrics[i].tasksAvgRunTime < hostMetrics[j].tasksAvgRunTime
}
return true
})
//hosts := make([]string, 0, len(hostMetrics))
var hosts []string
for _, metrics := range hostMetrics {
if metrics.numNonpreemptible == 0 {
hosts = append(hosts, metrics.host)
}
}
log.WithFields(log.Fields{
"no task hosts": noTasksHosts,
"len no task hosts ": len(noTasksHosts),
"taks hosts": hosts,
"len task hosts": len(hosts),
}).Info("hosts breakdown before appending")
// Append hosts with no task running on top of the list
hosts = append(noTasksHosts, hosts...)
// copy to target hosts array
s.hostsLock.Lock()
defer s.hostsLock.Unlock()
log.WithFields(log.Fields{
"all hosts": hosts,
"len no task hosts ": len(hosts),
}).Info("final hosts list")
s.orderedHosts = hosts
return nil
}