pkg/placement/plugins/mimir/strategy.go (138 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mimir
import (
"math"
log "github.com/sirupsen/logrus"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/pkg/placement/config"
"github.com/uber/peloton/pkg/placement/plugins"
common "github.com/uber/peloton/pkg/placement/plugins/mimir/common"
"github.com/uber/peloton/pkg/placement/plugins/mimir/lib/algorithms"
"github.com/uber/peloton/pkg/placement/plugins/mimir/lib/model/placement"
)
var _offersFactor = map[resmgr.TaskType]float64{
resmgr.TaskType_UNKNOWN: 1.0,
resmgr.TaskType_BATCH: 1.0,
resmgr.TaskType_STATELESS: 1.0,
resmgr.TaskType_DAEMON: 1.0,
resmgr.TaskType_STATEFUL: 1.0,
}
// New will create a new strategy using Mimir-lib to do the placement logic.
func New(placer algorithms.Placer, config *config.PlacementConfig) plugins.Strategy {
log.Info("Using Mimir placement strategy.")
return &mimir{
placer: placer,
config: config,
}
}
// mimir is a placement strategy that uses the mimir library to decide on how to assign tasks to offers.
// TODO: mimir plugin should use plugins.Config as what batch plugin does,
// instead of directly using config.PlacementConfig.
type mimir struct {
placer algorithms.Placer
config *config.PlacementConfig
}
func (mimir *mimir) convertAssignments(
tasks []plugins.Task) (
[]*placement.Assignment,
map[*placement.Entity]int) {
// Convert the Peloton assignments to mimir assignments and keep a map
// from entities to Peloton assignments.
assignments := make([]*placement.Assignment, 0, len(tasks))
entitiesToAssignments := make(map[*placement.Entity]int, len(tasks))
for idx, p := range tasks {
entity := p.ToMimirEntity()
assignments = append(assignments, placement.NewAssignment(entity))
entitiesToAssignments[entity] = idx
}
return assignments, entitiesToAssignments
}
func (mimir *mimir) convertHosts(hosts []plugins.Host) (
[]*placement.Group,
map[*placement.Group]int,
) {
// Convert the hosts to groups and keep a map from groups to hosts
groups := make([]*placement.Group, 0, len(hosts))
groupsToHosts := make(map[*placement.Group]int, len(hosts))
for i, host := range hosts {
group := host.ToMimirGroup()
groupsToHosts[group] = i
groups = append(groups, group)
}
return groups, groupsToHosts
}
func (mimir *mimir) getPlacements(
tasks []plugins.Task,
assignments []*placement.Assignment,
entitiesToAssignments map[*placement.Entity]int,
groupsToHosts map[*placement.Group]int,
) map[int]int {
placements := map[int]int{}
for i, assignment := range assignments {
taskIdx := entitiesToAssignments[assignment.Entity]
task := tasks[taskIdx]
if assignment.Failed {
task.SetPlacementFailure(assignment.Transcript.String())
placements[i] = -1
continue
}
hostIndex := groupsToHosts[assignment.AssignedGroup]
placements[i] = hostIndex
}
return placements
}
// GetTaskPlacements is an implementation of the placement.Strategy interface.
func (mimir *mimir) GetTaskPlacements(
tasks []plugins.Task,
hosts []plugins.Host,
) map[int]int {
assignments, entitiesToAssignments := mimir.convertAssignments(tasks)
groups, groupsToHosts := mimir.convertHosts(hosts)
scopeSet := placement.NewScopeSet(groups)
log.WithFields(log.Fields{
"peloton_assignments": tasks,
"peloton_hosts": hosts,
}).Debug("GetTaskPlacements Mimir strategy called")
// Place the assignments onto the groups
mimir.placer.Place(assignments, groups, scopeSet)
for _, assignment := range assignments {
if assignment.AssignedGroup != nil {
log.WithField("group", common.DumpGroup(assignment.AssignedGroup)).
WithField("entity", common.DumpEntity(assignment.Entity)).
WithField("transcript", assignment.Transcript.String()).
Debug("Placed Mimir assignment")
} else {
log.WithField("entity", common.DumpEntity(assignment.Entity)).
WithField("transcript", assignment.Transcript.String()).
Debug("Did not place Mimir assignment")
}
}
placements := mimir.getPlacements(tasks, assignments, entitiesToAssignments, groupsToHosts)
log.WithFields(log.Fields{
"placements": placements,
"assignments": tasks,
"hosts": hosts,
}).Debug("GetTaskPlacements Mimir strategy returned")
return placements
}
// GroupTasksByPlacementNeeds is an implementation of the placement.Strategy interface.
// Constructs host-filter for a set of assignments that have the same
// scheduling constraints, resource constraints and revocability.
func (mimir *mimir) GroupTasksByPlacementNeeds(
tasks []plugins.Task,
) []*plugins.TasksByPlacementNeeds {
if len(tasks) == 0 {
return nil
}
pluginsConfig := &plugins.Config{
TaskType: mimir.config.TaskType,
UseHostPool: mimir.config.UseHostPool,
}
tasksByNeeds := plugins.GroupByPlacementNeeds(tasks, pluginsConfig)
factor := _offersFactor[mimir.config.TaskType]
for _, group := range tasksByNeeds {
maxOffers := mimir.config.OfferDequeueLimit
neededOffers := math.Ceil(float64(len(group.Tasks)) * factor)
if float64(maxOffers) > neededOffers {
maxOffers = int(neededOffers)
}
group.PlacementNeeds.MaxHosts = uint32(maxOffers)
for _, taskIdx := range group.Tasks {
task := tasks[taskIdx]
if hostname := task.PreferredHost(); hostname != "" {
group.PlacementNeeds.HostHints[task.PelotonID()] = hostname
}
}
}
return tasksByNeeds
}
// ConcurrencySafe is an implementation of the placement.Strategy interface.
func (mimir *mimir) ConcurrencySafe() bool {
return false
}