pkg/scheduler/partition_manager.go (127 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
)
const (
DefaultCleanRootInterval = 10000 * time.Millisecond // sleep between queue removal checks
DefaultCleanExpiredAppsInterval = 24 * time.Hour // sleep between apps removal checks
)
type partitionManager struct {
pc *PartitionContext
cc *ClusterContext
stopCleanRoot chan struct{}
stopCleanExpiredApps chan struct{}
cleanRootInterval time.Duration
cleanExpiredAppsInterval time.Duration
}
func newPartitionManager(pc *PartitionContext, cc *ClusterContext) *partitionManager {
return &partitionManager{
pc: pc,
cc: cc,
stopCleanRoot: make(chan struct{}),
stopCleanExpiredApps: make(chan struct{}),
cleanRootInterval: DefaultCleanRootInterval,
cleanExpiredAppsInterval: DefaultCleanExpiredAppsInterval,
}
}
// Run the manager for the partition.
// The manager has four tasks:
// - clean up the managed queues that are empty and removed from the configuration
// - remove empty unmanaged queues
// - remove completed applications from the partition
// - remove rejected applications from the partition
// When the manager exits the partition is removed from the system and must be cleaned up
func (manager *partitionManager) Run() {
log.Log(log.SchedPartition).Info("starting partition manager",
zap.String("partition", manager.pc.Name),
zap.Stringer("cleanRootInterval", manager.cleanRootInterval))
go manager.cleanExpiredApps()
go manager.cleanRoot()
}
func (manager *partitionManager) cleanRoot() {
log.Log(log.SchedPartition).Info("Starting partition queue cleaner")
// exit only when the partition this manager belongs to exits
for {
cleanRootInterval := manager.cleanRootInterval
if cleanRootInterval <= 0 {
cleanRootInterval = DefaultCleanRootInterval
}
select {
case <-manager.stopCleanRoot:
return
case <-time.After(cleanRootInterval):
runStart := time.Now()
manager.cleanQueues(manager.pc.root)
log.Log(log.SchedPartition).Debug("time consumed for queue cleaner",
zap.Stringer("duration", time.Since(runStart)))
}
}
}
// Set the flag that the will allow the manager to exit.
// No locking needed as there is just one place where this is called which is already locked.
func (manager *partitionManager) Stop() {
log.Log(log.SchedPartition).Info("Stopping partition manager",
zap.String("partition", manager.pc.Name))
close(manager.stopCleanExpiredApps)
close(manager.stopCleanRoot)
manager.remove()
}
// Remove drained managed and empty unmanaged queues. Perform the action recursively.
// Only called internally and recursive, no locking
func (manager *partitionManager) cleanQueues(queue *objects.Queue) {
if queue == nil {
return
}
// check the children first: call recursive
if children := queue.GetCopyOfChildren(); len(children) != 0 {
for _, child := range children {
manager.cleanQueues(child)
}
}
// when we have done the children (or have none) this queue might be removable
if queue.IsDraining() || !queue.IsManaged() {
log.Log(log.SchedPartition).Debug("removing queue",
zap.String("queueName", queue.QueuePath),
zap.String("partitionName", manager.pc.Name))
// make sure the queue is empty
if queue.IsEmpty() {
// all OK update the queue hierarchy and partition
if !queue.RemoveQueue() {
log.Log(log.SchedPartition).Debug("unexpected failure removing the queue",
zap.String("partitionName", manager.pc.Name),
zap.String("queue", queue.QueuePath))
}
} else {
log.Log(log.SchedPartition).Debug("skip removing the queue",
zap.String("reason", "there are existing assigned apps or leaf queues"),
zap.String("queue", queue.QueuePath),
zap.String("partitionName", manager.pc.Name))
}
}
}
// The partition has been removed from the configuration and must be removed.
// Clean up all linked objects:
// - queues
// - applications
// - nodes
// last action is to remove the cluster links
//
//nolint:errcheck
func (manager *partitionManager) remove() {
log.Log(log.SchedPartition).Info("marking all queues for removal",
zap.String("partitionName", manager.pc.Name))
// mark all queues for removal
manager.pc.root.MarkQueueForRemoval()
// remove applications: we do not care about return values or issues
apps := manager.pc.GetApplications()
log.Log(log.SchedPartition).Info("removing all applications from partition",
zap.Int("numOfApps", len(apps)),
zap.String("partitionName", manager.pc.Name))
for i := range apps {
_ = apps[i].FailApplication("PartitionRemoved")
appID := apps[i].ApplicationID
_ = manager.pc.removeApplication(appID)
}
// remove the nodes
nodes := manager.pc.GetNodes()
log.Log(log.SchedPartition).Info("removing all nodes from partition",
zap.Int("numOfNodes", len(nodes)),
zap.String("partitionName", manager.pc.Name))
for i := range nodes {
_, _ = manager.pc.removeNode(nodes[i].NodeID)
}
log.Log(log.SchedPartition).Info("removing partition",
zap.String("partitionName", manager.pc.Name))
// remove the scheduler object
manager.cc.removePartition(manager.pc.Name)
}
func (manager *partitionManager) cleanExpiredApps() {
log.Log(log.SchedPartition).Info("Starting partition expired apps cleaner")
for {
cleanExpiredAppsInterval := manager.cleanExpiredAppsInterval
if cleanExpiredAppsInterval <= 0 {
cleanExpiredAppsInterval = DefaultCleanExpiredAppsInterval
}
select {
case <-manager.stopCleanExpiredApps:
return
case <-time.After(cleanExpiredAppsInterval):
manager.pc.cleanupExpiredApps()
}
}
}