in pkg/scheduler/placement/placement.go [106:217]
func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
m.RLock()
defer m.RUnlock()
var queueName string
var err error
var remainingRules = len(m.rules)
for _, checkRule := range m.rules {
remainingRules--
log.Log(log.SchedApplication).Debug("Executing rule for placing application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
queueName, err = checkRule.placeApplication(app, m.queueFn)
if err != nil {
log.Log(log.SchedApplication).Error("rule execution failed",
zap.String("ruleName", checkRule.getName()),
zap.Error(err))
app.SetQueuePath("")
return err
}
// if no queue found even after the last rule, try to place in the default queue
if remainingRules == 0 && queueName == "" {
log.Log(log.Config).Info("No rule matched, placing application in default queue",
zap.String("application", app.ApplicationID),
zap.String("defaultQueue", common.DefaultPlacementQueue))
// get the queue object
queue := m.queueFn(common.DefaultPlacementQueue)
if queue != nil {
// default queue exist
queueName = common.DefaultPlacementQueue
}
}
// no queue name next rule
if queueName == "" {
continue
}
// We have the recovery queue bail out: only if we are doing forced placement
// Recovery rule is last in the list. Recovery queue cannot be returned by other rules.
// We do not want to trigger any checks for this queue.
if queueName == common.RecoveryQueueFull && app.IsCreateForced() {
log.Log(log.SchedApplication).Info("Placing application in recovery queue",
zap.String("application", app.ApplicationID))
break
}
// queueName returned make sure ACL allows access and set the queueName in the app
queue := m.queueFn(queueName)
// walk up the tree if the queue does not exist
if queue == nil {
current := queueName
for queue == nil {
current = current[0:strings.LastIndex(current, configs.DOT)]
// check if the queue exist
queue = m.queueFn(current)
}
// Check if the user is allowed to submit to this queueName, if not next rule
if !queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.SchedApplication).Debug("Submit access denied on queue",
zap.String("queueName", queue.GetQueuePath()),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
} else {
// Check if this final queue is a leaf queue, if not next rule
if !queue.IsLeafQueue() {
log.Log(log.SchedApplication).Debug("Rule returned parent queue",
zap.String("queueName", queueName),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
// Check if the user is allowed to submit to this queueName, if not next rule
if !queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.SchedApplication).Debug("Submit access denied on queue",
zap.String("queueName", queueName),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
// Check if the queue in Draining state, and if so, proceed to the next rule
if queue.IsDraining() {
log.Log(log.SchedApplication).Debug("Cannot place application in draining queue",
zap.String("queueName", queueName),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
}
// we have a queue that allows submitting and can be created: app placed
log.Log(log.SchedApplication).Info("Rule result for placing application",
zap.String("application", app.ApplicationID),
zap.String("ruleName", checkRule.getName()),
zap.String("queueName", queueName))
break
}
// no more rules to check no queueName found reject placement
if queueName == "" {
app.SetQueuePath("")
return RejectedError
}
// Add the queue into the application, overriding what was submitted
app.SetQueuePath(queueName)
return nil
}