pkg/scheduler/ugm/queue_tracker.go (357 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ugm
import (
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
// The QueueTracker is designed to be lock free and should remain as such.
// Each QueueTracker object is always only linked to single UserTracker or GroupTracker. The responsibility of managing locks is delegated to those objects.
type QueueTracker struct {
queueName string
queuePath string
resourceUsage *resources.Resource
runningApplications map[string]bool
maxResources *resources.Resource
maxRunningApps uint64
childQueueTrackers map[string]*QueueTracker
useWildCard bool
}
func newRootQueueTracker(trackType trackingType) *QueueTracker {
qt := newQueueTracker(common.Empty, configs.RootQueue, trackType)
return qt
}
func newQueueTracker(queuePath string, queueName string, trackType trackingType) *QueueTracker {
fullPath := queueName
if queuePath != common.Empty {
fullPath = queuePath + "." + queueName
}
queueTracker := &QueueTracker{
queueName: queueName,
queuePath: fullPath,
resourceUsage: nil,
runningApplications: make(map[string]bool),
maxResources: nil,
maxRunningApps: 0,
childQueueTrackers: make(map[string]*QueueTracker),
}
// Override user/group specific limits with wild card limit settings
if trackType == user {
if config := m.getUserWildCardLimitsConfig(fullPath); config != nil {
log.Log(log.SchedUGM).Debug("Use wild card limit settings as there is no limit set explicitly",
zap.String("queue name", queueName),
zap.String("queue path", queuePath),
zap.Uint64("max applications", config.maxApplications),
zap.Stringer("max resources", config.maxResources))
queueTracker.maxResources = config.maxResources.Clone()
queueTracker.maxRunningApps = config.maxApplications
queueTracker.useWildCard = true
}
}
log.Log(log.SchedUGM).Debug("Created queue tracker object for queue",
zap.String("queue", queueName))
return queueTracker
}
type trackingType int
const (
none trackingType = iota
user
group
)
func (tt trackingType) String() string {
return [...]string{"none", "user", "group"}[tt]
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) increaseTrackedResource(hierarchy []string, applicationID string, trackType trackingType, usage *resources.Resource) {
log.Log(log.SchedUGM).Debug("Increasing resource usage",
zap.Stringer("tracking type", trackType),
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("use wild card", qt.useWildCard))
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
qt.childQueueTrackers[childName].increaseTrackedResource(hierarchy[1:], applicationID, trackType, usage)
}
if qt.resourceUsage == nil {
qt.resourceUsage = resources.NewResource()
}
qt.resourceUsage.AddTo(usage)
qt.resourceUsage.Prune() // needed as we might be both adding and removing
qt.runningApplications[applicationID] = true
log.Log(log.SchedUGM).Debug("Successfully increased resource usage",
zap.Stringer("tracking type", trackType),
zap.String("queue path", qt.queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Uint64("max running applications", qt.maxRunningApps),
zap.Stringer("max resource usage", qt.maxResources),
zap.Bool("use wild card", qt.useWildCard),
zap.Stringer("total resource after increasing", qt.resourceUsage),
zap.Int("total applications after increasing", len(qt.runningApplications)))
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string, applicationID string, usage *resources.Resource, removeApp bool) bool {
log.Log(log.SchedUGM).Debug("Decreasing resource usage",
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
// depth first: all the way to the leaf, return false if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map",
zap.String("child queueTracker name", childName))
return false
}
removeQT := qt.childQueueTrackers[childName].decreaseTrackedResource(hierarchy[1:], applicationID, usage, removeApp)
if removeQT {
log.Log(log.SchedUGM).Debug("Removed queue tracker linkage from its parent",
zap.String("queue path ", qt.queuePath),
zap.String("removed queue name", childName),
zap.String("parent queue name", qt.queueName))
delete(qt.childQueueTrackers, childName)
}
}
qt.resourceUsage.SubFrom(usage)
qt.resourceUsage.Prune()
if removeApp {
log.Log(log.SchedUGM).Debug("Removed application from running applications",
zap.String("application", applicationID),
zap.String("queue path", qt.queuePath),
zap.String("queue name", qt.queueName))
delete(qt.runningApplications, applicationID)
}
log.Log(log.SchedUGM).Debug("Successfully decreased resource usage",
zap.String("queue path", qt.queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Stringer("total resource after decreasing", qt.resourceUsage),
zap.Int("total applications after decreasing", len(qt.runningApplications)))
// Determine if the queue tracker should be removed
removeQT := len(qt.childQueueTrackers) == 0 && len(qt.runningApplications) == 0 && resources.IsZero(qt.resourceUsage) &&
qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources)
log.Log(log.SchedUGM).Debug("Remove queue tracker",
zap.String("queue path ", qt.queuePath),
zap.Bool("remove QT", removeQT))
return removeQT
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) setLimit(hierarchy []string, maxResource *resources.Resource, maxApps uint64, useWildCard bool, trackType trackingType, doWildCardCheck bool) {
log.Log(log.SchedUGM).Debug("Setting limits",
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.Uint64("max applications", maxApps),
zap.Stringer("max resources", maxResource),
zap.Bool("use wild card", useWildCard))
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
qt.childQueueTrackers[childName].setLimit(hierarchy[1:], maxResource, maxApps, useWildCard, trackType, doWildCardCheck)
} else if len(hierarchy) == 1 {
// don't override named user/group specific limits with wild card limits
if doWildCardCheck && !qt.useWildCard {
return
}
qt.maxRunningApps = maxApps
qt.maxResources = maxResource
qt.useWildCard = useWildCard
}
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
// Note: headroom is not read-only, it also traverses the queue hierarchy and creates childQueueTracker if it does not exist.
func (qt *QueueTracker) headroom(hierarchy []string, trackType trackingType) *resources.Resource {
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
var headroom, childHeadroom *resources.Resource
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
childHeadroom = qt.childQueueTrackers[childName].headroom(hierarchy[1:], trackType)
}
// arrived at the leaf or on the way out: check against current max if set
if !resources.IsZero(qt.maxResources) {
headroom = resources.SubOnlyExisting(qt.maxResources, qt.resourceUsage)
}
if headroom == nil {
return childHeadroom
}
return resources.ComponentWiseMin(headroom, childHeadroom)
}
// getResourceUsageDAOInfo returns the REST representation of the queue tracker
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) getResourceUsageDAOInfo() *dao.ResourceUsageDAOInfo {
if qt == nil {
return &dao.ResourceUsageDAOInfo{}
}
apps := make([]string, len(qt.runningApplications))
i := 0
for app := range qt.runningApplications {
apps[i] = app
i++
}
children := make([]*dao.ResourceUsageDAOInfo, len(qt.childQueueTrackers))
i = 0
for _, cqt := range qt.childQueueTrackers {
children[i] = cqt.getResourceUsageDAOInfo()
i++
}
return &dao.ResourceUsageDAOInfo{
QueuePath: qt.queuePath,
ResourceUsage: qt.resourceUsage.DAOMap(),
MaxResources: qt.maxResources.DAOMap(),
MaxApplications: qt.maxRunningApps,
RunningApplications: apps,
Children: children,
}
}
// getMaxResources returns a map of all maxResources defined in the queue hierarchy.
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) getMaxResources() map[string]*resources.Resource {
if qt == nil {
return nil
}
maxRes := map[string]*resources.Resource{qt.queuePath: qt.maxResources}
for _, cqt := range qt.childQueueTrackers {
childUsage := cqt.getMaxResources()
maps.Copy(maxRes, childUsage)
}
return maxRes
}
// getMaxApplications returns a map of all maxRunningApps defined in the queue hierarchy.
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) getMaxApplications() map[string]uint64 {
if qt == nil {
return nil
}
maxApps := map[string]uint64{qt.queuePath: qt.maxRunningApps}
for _, cqt := range qt.childQueueTrackers {
childApps := cqt.getMaxApplications()
maps.Copy(maxApps, childApps)
}
return maxApps
}
// getUsedResources returns a map of all resourceUsage defined in the queue hierarchy.
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) getUsedResources() map[string]*resources.Resource {
if qt == nil {
return nil
}
maxRes := map[string]*resources.Resource{qt.queuePath: qt.resourceUsage}
for _, cqt := range qt.childQueueTrackers {
childUsage := cqt.getUsedResources()
maps.Copy(maxRes, childUsage)
}
return maxRes
}
// isQueuePathTrackedCompletely Traverse queue path upto the end queue through its linkage
// to confirm entire queuePath has been tracked completely or not
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) isQueuePathTrackedCompletely(hierarchy []string) bool {
// depth first: all the way to the leaf, ignore if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] != nil {
return qt.childQueueTrackers[childName].isQueuePathTrackedCompletely(hierarchy[1:])
}
} else if len(hierarchy) == 1 {
// reach end of hierarchy
if hierarchy[0] == configs.RootQueue || hierarchy[0] == qt.queueName {
return true
}
}
return false
}
// isUnlinkRequired Traverse queue path upto the leaf queue and decide whether
// linkage needs to be removed or not based on the running applications.
// If there are any running applications in end leaf queue, we should remove the linkage between
// the leaf and its parent queue using unlink method. Otherwise, we should leave as it is.
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) isUnlinkRequired(hierarchy []string) bool {
// depth first: all the way to the leaf, ignore if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] != nil {
return qt.childQueueTrackers[childName].isUnlinkRequired(hierarchy[1:])
}
} else if len(hierarchy) == 1 {
// reach end of hierarchy
if hierarchy[0] == configs.RootQueue || hierarchy[0] == qt.queueName {
if len(qt.runningApplications) == 0 {
log.Log(log.SchedUGM).Debug("Is Unlink Required?",
zap.String("queue path", qt.queuePath),
zap.Int("no. of applications", len(qt.runningApplications)))
return true
}
}
}
return false
}
// unlink Traverse queue path upto the end queue. If end queue has any more child queue trackers,
// then goes upto each child queue and removes the linkage with its immediate parent
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) unlink(hierarchy []string) bool {
log.Log(log.SchedUGM).Debug("Unlinking current queue tracker from its parent",
zap.String("current queue ", qt.queueName),
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.Int("no. of child queue trackers", len(qt.childQueueTrackers)))
// depth first: all the way to the leaf, ignore if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] != nil {
if qt.childQueueTrackers[childName].unlink(hierarchy[1:]) {
delete(qt.childQueueTrackers, childName)
// returning false, so that it comes out when end queue detach itself from its immediate parent.
// i.e., once leaf detached from root.parent for root.parent.leaf queue path.
// otherwise, detachment continues all the way upto the root, even parent from root. which is not needed.
return false
}
}
} else if len(hierarchy) <= 1 {
// reach end of hierarchy, unlink all queues under this queue
for childName, childQT := range qt.childQueueTrackers {
if childQT.unlink([]string{childName}) {
delete(qt.childQueueTrackers, childName)
}
}
}
if len(qt.runningApplications) == 0 && len(qt.childQueueTrackers) == 0 {
return true
}
return false
}
// decreaseTrackedResourceUsageDownwards queuePath either could be parent or leaf queue path.
// If it is parent queue path, then reset resourceUsage and runningApplications for all child queues,
// If it is leaf queue path, reset resourceUsage and runningApplications for queue trackers in this queue path.
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) decreaseTrackedResourceUsageDownwards(hierarchy []string) map[string]bool {
// depth first: all the way to the leaf, ignore if not exists
// more than 1 in the slice means we need to recurse down
removedApplications := make(map[string]bool)
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] != nil {
removedApplications = qt.childQueueTrackers[childName].decreaseTrackedResourceUsageDownwards(hierarchy[1:])
}
} else if len(hierarchy) <= 1 {
// reach end of hierarchy, remove all resources under this queue
removedApplications = qt.runningApplications
for childName, childQT := range qt.childQueueTrackers {
if len(childQT.runningApplications) > 0 && !resources.IsZero(childQT.resourceUsage) {
// runningApplications in parent queue should contain all the running applications in child queues,
// so we don't need to update removedApplications from child queue result.
childQT.decreaseTrackedResourceUsageDownwards([]string{childName})
}
}
}
if len(qt.runningApplications) > 0 && !resources.IsZero(qt.resourceUsage) {
qt.resourceUsage = nil
qt.runningApplications = make(map[string]bool)
}
return removedApplications
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
// Note: canRunApp is not read-only, it also traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
func (qt *QueueTracker) canRunApp(hierarchy []string, applicationID string, trackType trackingType) bool {
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
childCanRunApp := true
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
childCanRunApp = qt.childQueueTrackers[childName].canRunApp(hierarchy[1:], applicationID, trackType)
}
if !childCanRunApp {
return false
}
// arrived at the leaf or on the way out: check against current max if set
var running int
if existingApp := qt.runningApplications[applicationID]; existingApp {
return true
} else {
running = len(qt.runningApplications) + 1
}
// apply user/group specific limit settings set if configured, otherwise use wild card limit settings
if qt.maxRunningApps != 0 && running > int(qt.maxRunningApps) { //nolint: gosec
return false
}
return true
}
// canBeRemoved Start from root and reach all levels of queue hierarchy to confirm whether corresponding queue tracker
// object can be removed from ugm or not. Based on running applications, resource usage, child queue trackers, max running apps, max resources etc
// it decides the removal. It returns false the moment it sees any unexpected values for any queue in any levels.
// Note: Lock free call. The RLock of the linked tracker (UserTracker and GroupTracker) should be held before calling this function.
func (qt *QueueTracker) canBeRemoved() bool {
for _, childQT := range qt.childQueueTrackers {
// quick check to avoid further traversal
if childQT.canBeRemovedInternal() {
if !childQT.canBeRemoved() {
return false
}
} else {
return false
}
}
// reached leaf queues, no more to traverse
return qt.canBeRemovedInternal()
}
func (qt *QueueTracker) canBeRemovedInternal() bool {
if len(qt.runningApplications) == 0 && resources.IsZero(qt.resourceUsage) && len(qt.childQueueTrackers) == 0 &&
qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources) {
return true
}
return false
}