pkg/scheduler/ugm/group_tracker.go (137 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ugm
import (
"strings"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
type GroupTracker struct {
groupName string // Name of the group for which usage is being tracked upon
applications map[string]string // Hold applications currently run by all users belong to this group
queueTracker *QueueTracker // Holds the actual resource usage of queue path where application run
events *ugmEvents
locking.RWMutex
}
func newGroupTracker(groupName string, events *ugmEvents) *GroupTracker {
queueTracker := newRootQueueTracker(group)
groupTracker := &GroupTracker{
groupName: groupName,
applications: make(map[string]string),
queueTracker: queueTracker,
events: events,
}
return groupTracker
}
func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID string, usage *resources.Resource, user string) {
if gt == nil {
return
}
gt.Lock()
defer gt.Unlock()
gt.events.sendIncResourceUsageForGroup(gt.groupName, queuePath, usage)
gt.applications[applicationID] = user
gt.queueTracker.increaseTrackedResource(strings.Split(queuePath, configs.DOT), applicationID, group, usage)
}
func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID string, usage *resources.Resource, removeApp bool) bool {
if gt == nil {
return false
}
gt.Lock()
defer gt.Unlock()
gt.events.sendDecResourceUsageForGroup(gt.groupName, queuePath, usage)
if removeApp {
delete(gt.applications, applicationID)
}
return gt.queueTracker.decreaseTrackedResource(strings.Split(queuePath, configs.DOT), applicationID, usage, removeApp)
}
func (gt *GroupTracker) getTrackedApplications() map[string]string {
gt.RLock()
defer gt.RUnlock()
return gt.applications
}
func (gt *GroupTracker) setLimits(queuePath string, resource *resources.Resource, maxApps uint64) {
gt.Lock()
defer gt.Unlock()
gt.events.sendLimitSetForGroup(gt.groupName, queuePath)
gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), resource, maxApps, false, group, false)
}
func (gt *GroupTracker) clearLimits(queuePath string) {
gt.Lock()
defer gt.Unlock()
gt.events.sendLimitRemoveForGroup(gt.groupName, queuePath)
gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0, false, group, false)
}
// headroom calculate the resource headroom for the group in the hierarchy defined
// Note: headroom of queue tracker is not read-only.
// It traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
func (gt *GroupTracker) headroom(hierarchy []string) *resources.Resource {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.headroom(hierarchy, group)
}
// GetResourceUsageDAOInfo returns the DAO object used in the REST API for this group tracker
func (gt *GroupTracker) GetResourceUsageDAOInfo() *dao.GroupResourceUsageDAOInfo {
gt.RLock()
defer gt.RUnlock()
var apps []string
for app := range gt.applications {
apps = append(apps, app)
}
return &dao.GroupResourceUsageDAOInfo{
Applications: apps,
GroupName: gt.groupName,
Queues: gt.queueTracker.getResourceUsageDAOInfo(),
}
}
func (gt *GroupTracker) isQueuePathTrackedCompletely(hierarchy []string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.isQueuePathTrackedCompletely(hierarchy)
}
func (gt *GroupTracker) isUnlinkRequired(hierarchy []string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.isUnlinkRequired(hierarchy)
}
func (gt *GroupTracker) unlinkQT(hierarchy []string) bool {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.unlink(hierarchy)
}
func (gt *GroupTracker) canBeRemoved() bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.canBeRemoved()
}
func (gt *GroupTracker) decreaseAllTrackedResourceUsage(hierarchy []string) map[string]string {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(hierarchy)
removedApplications := make(map[string]string)
for app := range applications {
if u, ok := gt.applications[app]; ok {
removedApplications[app] = u
}
}
return removedApplications
}
// canRunApp checks if the group is allowed to run the application in the queue defined in hierarchy.
// Note: canRunApp of queue tracker is not read-only,
// It traverses the queue hierarchy and creates a childQueueTracker if it does not exist.
func (gt *GroupTracker) canRunApp(hierarchy []string, applicationID string) bool {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.canRunApp(hierarchy, applicationID, group)
}
// GetMaxResources returns a map of the maxResources for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) GetMaxResources() map[string]*resources.Resource {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getMaxResources()
}
// GetMaxApplications returns a map of the maxRunningApps for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) GetMaxApplications() map[string]uint64 {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getMaxApplications()
}
// getUsedResources returns a map of the usedResources for all queues registered under this group tracker.
// The key into the map is the queue path.
// This should only be used in test
func (gt *GroupTracker) getUsedResources() map[string]*resources.Resource {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.getUsedResources()
}