pkg/scheduler/placement/placement.go (157 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package placement
import (
"fmt"
"strings"
"sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
)
type AppPlacementManager struct {
name string
rules []rule
initialised bool
queueFn func(string) *objects.Queue
sync.RWMutex
}
func NewPlacementManager(rules []configs.PlacementRule, queueFunc func(string) *objects.Queue) *AppPlacementManager {
m := &AppPlacementManager{}
if queueFunc == nil {
log.Log(log.Config).Info("Placement manager created without queue function: not active")
return m
}
m.queueFn = queueFunc
if len(rules) > 0 {
if err := m.initialise(rules); err != nil {
log.Log(log.Config).Info("Placement manager created without rules: not active",
zap.Error(err))
}
}
return m
}
// Update the rules for an active placement manager
// Note that this will only be called when the manager is created earlier and the config is updated.
func (m *AppPlacementManager) UpdateRules(rules []configs.PlacementRule) error {
if len(rules) > 0 {
log.Log(log.Config).Info("Building new rule list for placement manager")
if err := m.initialise(rules); err != nil {
log.Log(log.Config).Info("Placement manager rules not reloaded",
zap.Error(err))
return err
}
}
// if there are no rules in the config we should turn off the placement manager
if len(rules) == 0 && m.initialised {
m.Lock()
defer m.Unlock()
log.Log(log.Config).Info("Placement manager rules removed on config reload")
m.initialised = false
m.rules = make([]rule, 0)
}
return nil
}
// Return the state of the placement manager
func (m *AppPlacementManager) IsInitialised() bool {
m.RLock()
defer m.RUnlock()
return m.initialised
}
// Initialise the rules from a parsed config.
func (m *AppPlacementManager) initialise(rules []configs.PlacementRule) error {
log.Log(log.Config).Info("Building new rule list for placement manager")
// build temp list from new config
tempRules, err := m.buildRules(rules)
if err != nil {
return err
}
m.Lock()
defer m.Unlock()
if m.queueFn == nil {
return fmt.Errorf("placement manager queue function nil")
}
log.Log(log.Config).Info("Activated rule set in placement manager")
m.rules = tempRules
// all done manager is initialised
m.initialised = true
for rule := range m.rules {
log.Log(log.Config).Debug("rule set",
zap.Int("ruleNumber", rule),
zap.String("ruleName", m.rules[rule].getName()))
}
return nil
}
// Build the rule set based on the config.
// If the rule set is correct and can be used the new set is returned.
// If any error is encountered a nil array is returned and the error set
func (m *AppPlacementManager) buildRules(rules []configs.PlacementRule) ([]rule, error) {
// catch an empty list
if len(rules) == 0 {
return nil, fmt.Errorf("placement manager rule list request is empty")
}
// build temp list from new config
var newRules []rule
for _, conf := range rules {
buildRule, err := newRule(conf)
if err != nil {
return nil, err
}
newRules = append(newRules, buildRule)
}
return newRules, nil
}
func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
// Placement manager not initialised cannot place application, just return
m.RLock()
defer m.RUnlock()
if !m.initialised {
return nil
}
var queueName string
var err error
for _, checkRule := range m.rules {
log.Log(log.Config).Debug("Executing rule for placing application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
queueName, err = checkRule.placeApplication(app, m.queueFn)
if err != nil {
log.Log(log.Config).Error("rule execution failed",
zap.String("ruleName", checkRule.getName()),
zap.Error(err))
app.SetQueuePath("")
return err
}
// queueName returned make sure ACL allows access and create the queueName if not exist
if queueName != "" {
// get the queue object
queue := m.queueFn(queueName)
// walk up the tree if the queue does not exist
if queue == nil {
current := queueName
for queue == nil {
current = current[0:strings.LastIndex(current, configs.DOT)]
// check if the queue exist
queue = m.queueFn(current)
}
// Check if the user is allowed to submit to this queueName, if not next rule
if !queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.Config).Debug("Submit access denied on queue",
zap.String("queueName", queue.GetQueuePath()),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
} else {
// Check if this final queue is a leaf queue, if not next rule
if !queue.IsLeafQueue() {
log.Log(log.Config).Debug("Rule returned parent queue",
zap.String("queueName", queueName),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
// Check if the user is allowed to submit to this queueName, if not next rule
if !queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.Config).Debug("Submit access denied on queue",
zap.String("queueName", queueName),
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
// reset the queue name for the last rule in the chain
queueName = ""
continue
}
}
// we have a queue that allows submitting and can be created: app placed
break
}
}
log.Log(log.Config).Debug("Rule result for placing application",
zap.String("application", app.ApplicationID),
zap.String("queueName", queueName))
// no more rules to check no queueName found reject placement
if queueName == "" {
app.SetQueuePath("")
return fmt.Errorf("application rejected: no placement rule matched")
}
// Add the queue into the application, overriding what was submitted
app.SetQueuePath(queueName)
return nil
}