pkg/scheduler/partition.go (1,118 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package scheduler import ( "context" "fmt" "math" "strconv" "strings" "sync" "time" "github.com/looplab/fsm" "go.uber.org/zap" "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/common/security" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-core/pkg/scheduler/placement" "github.com/apache/yunikorn-core/pkg/scheduler/policies" "github.com/apache/yunikorn-core/pkg/scheduler/ugm" "github.com/apache/yunikorn-core/pkg/webservice/dao" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) type PartitionContext struct { RmID string // the RM the partition belongs to Name string // name of the partition (logging mainly) // Private fields need protection root *objects.Queue // start of the queue hierarchy applications map[string]*objects.Application // applications assigned to this partition completedApplications map[string]*objects.Application // completed applications from this partition rejectedApplications map[string]*objects.Application // rejected applications from this partition nodes objects.NodeCollection // nodes assigned to this partition placementManager *placement.AppPlacementManager // placement manager for this partition partitionManager *partitionManager // manager for this partition stateMachine *fsm.FSM // the state of the partition for scheduling stateTime time.Time // last time the state was updated (needed for cleanup) rules *[]configs.PlacementRule // placement rules to be loaded by the scheduler userGroupCache *security.UserGroupCache // user cache per partition totalPartitionResource *resources.Resource // Total node resources allocations int // Number of allocations on the partition reservations int // number of reservations placeholderAllocations int // number of placeholder allocations // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application // acquires a write lock of the application object. While holding the write lock a list of nodes is // requested from the partition. This requires a read lock on the partition. // If the partition write lock is held while manipulating an application a dead lock could occur. // Since application objects handle their own locks there is no requirement to hold the partition lock // while manipulating the application. // Similarly adding, updating or removing a node or a queue should only hold the partition write lock // while manipulating the partition information not while manipulating the underlying objects. sync.RWMutex } func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) { if conf.Name == "" || rmID == "" { log.Log(log.SchedPartition).Info("partition cannot be created", zap.String("partition name", conf.Name), zap.String("rmID", rmID), zap.Any("cluster context", cc)) return nil, fmt.Errorf("partition cannot be created without name or RM, one is not set") } pc := &PartitionContext{ Name: conf.Name, RmID: rmID, stateMachine: objects.NewObjectState(), stateTime: time.Now(), applications: make(map[string]*objects.Application), completedApplications: make(map[string]*objects.Application), nodes: objects.NewNodeCollection(conf.Name), } pc.partitionManager = newPartitionManager(pc, cc) if err := pc.initialPartitionFromConfig(conf); err != nil { return nil, err } return pc, nil } // Initialise the partition func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error { if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { return fmt.Errorf("partition cannot be created without root queue") } // Setup the queue structure: root first it should be the only queue at this level // Add the rest of the queue structure recursively queueConf := conf.Queues[0] var err error if pc.root, err = objects.NewConfiguredQueue(queueConf, nil); err != nil { return err } // recursively add the queues to the root if err = pc.addQueue(queueConf.Queues, pc.root); err != nil { return err } log.Log(log.SchedPartition).Info("root queue added", zap.String("partitionName", pc.Name), zap.String("rmID", pc.RmID)) pc.rules = &conf.PlacementRules // We need to pass in the locked version of the GetQueue function. // Placing an application will not have a lock on the partition context. pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.GetQueue) // get the user group cache for the partition // TODO get the resolver from the config pc.userGroupCache = security.GetUserGroupCache("") pc.updateNodeSortingPolicy(conf) // update limit settings: start at the root return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) } // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) { var configuredPolicy policies.SortingPolicy configuredPolicy, err := policies.SortingPolicyFromString(conf.NodeSortPolicy.Type) if err != nil { log.Log(log.SchedPartition).Debug("NodeSorting policy incorrectly set or unknown", zap.Error(err)) log.Log(log.SchedPartition).Info(fmt.Sprintf("NodeSorting policy not set using '%s' as default", configuredPolicy)) } else { log.Log(log.SchedPartition).Info("NodeSorting policy set from config", zap.Stringer("policyName", configuredPolicy)) } pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights)) } func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error { pc.Lock() defer pc.Unlock() if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { return fmt.Errorf("partition cannot be created without root queue") } if pc.placementManager.IsInitialised() { log.Log(log.SchedPartition).Info("Updating placement manager rules on config reload") err := pc.placementManager.UpdateRules(conf.PlacementRules) if err != nil { log.Log(log.SchedPartition).Info("New placement rules not activated, config reload failed", zap.Error(err)) return err } pc.rules = &conf.PlacementRules } else { log.Log(log.SchedPartition).Info("Creating new placement manager on config reload") pc.rules = &conf.PlacementRules // We need to pass in the locked version of the GetQueue function. // Placing an application will not have a lock on the partition context. pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.GetQueue) } pc.updateNodeSortingPolicy(conf) // start at the root: there is only one queue queueConf := conf.Queues[0] root := pc.root // update the root queue if err := root.ApplyConf(queueConf); err != nil { return err } root.UpdateQueueProperties() // update the rest of the queues recursively if err := pc.updateQueues(queueConf.Queues, root); err != nil { return err } // update limit settings: start at the root return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name) } // Process the config structure and create a queue info tree for this partition func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error { // create the queue at this level for _, queueConf := range conf { thisQueue, err := objects.NewConfiguredQueue(queueConf, parent) if err != nil { return err } // recursive create the queues below if len(queueConf.Queues) > 0 { err = pc.addQueue(queueConf.Queues, thisQueue) if err != nil { return err } } } return nil } // Update the passed in queues and then do this recursively for the children // // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *objects.Queue) error { // get the name of the passed in queue parentPath := parent.QueuePath + configs.DOT // keep track of which children we have updated visited := map[string]bool{} // walk over the queues recursively for _, queueConfig := range config { pathName := parentPath + queueConfig.Name queue := pc.getQueueInternal(pathName) var err error if queue == nil { queue, err = objects.NewConfiguredQueue(queueConfig, parent) } else { err = queue.ApplyConf(queueConfig) } if err != nil { return err } // special call to convert to a real policy from the property queue.UpdateQueueProperties() if err = pc.updateQueues(queueConfig.Queues, queue); err != nil { return err } visited[queue.Name] = true } // remove all children that were not visited for childName, childQueue := range parent.GetCopyOfChildren() { if !visited[childName] { childQueue.MarkQueueForRemoval() } } return nil } // Mark the partition for removal from the system. // This can be executed multiple times and is only effective the first time. // The current cleanup sequence is "immediate". This is implemented to allow a graceful cleanup. func (pc *PartitionContext) markPartitionForRemoval() { if err := pc.handlePartitionEvent(objects.Remove); err != nil { log.Log(log.SchedPartition).Error("failed to mark partition for deletion", zap.String("partitionName", pc.Name), zap.Error(err)) } } // Get the state of the partition. // No new nodes and applications will be accepted if stopped or being removed. func (pc *PartitionContext) isDraining() bool { return pc.stateMachine.Current() == objects.Draining.String() } func (pc *PartitionContext) isRunning() bool { return pc.stateMachine.Current() == objects.Active.String() } func (pc *PartitionContext) isStopped() bool { return pc.stateMachine.Current() == objects.Stopped.String() } // Handle the state event for the partition. // The state machine handles the locking. func (pc *PartitionContext) handlePartitionEvent(event objects.ObjectEvent) error { err := pc.stateMachine.Event(context.Background(), event.String(), pc.Name) if err == nil { pc.stateTime = time.Now() return nil } // handle the same state transition not nil error (limit of fsm). if err.Error() == "no transition" { return nil } return err } // Get the placement manager. The manager could change when we process the configuration changes // we thus need to lock. func (pc *PartitionContext) getPlacementManager() *placement.AppPlacementManager { pc.RLock() defer pc.RUnlock() return pc.placementManager } // Add a new application to the partition. // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) AddApplication(app *objects.Application) error { if pc.isDraining() || pc.isStopped() { return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID) } // Check if the app exists appID := app.ApplicationID if pc.getApplication(appID) != nil { return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name) } // Put app under the queue queueName := app.GetQueuePath() pm := pc.getPlacementManager() if pm.IsInitialised() { err := pm.PlaceApplication(app) if err != nil { return fmt.Errorf("failed to place application %s: %v", appID, err) } queueName = app.GetQueuePath() if queueName == "" { return fmt.Errorf("application rejected by placement rules: %s", appID) } } // lock the partition and make the last change: we need to do this before creating the queues. // queue cleanup might otherwise remove the queue again before we can add the application pc.Lock() defer pc.Unlock() // we have a queue name either from placement or direct, get the queue queue := pc.getQueueInternal(queueName) if queue == nil { // queue must exist if not using placement rules if !pm.IsInitialised() { return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName) } // with placement rules the hierarchy might not exist so try and create it var err error queue, err = pc.createQueue(queueName, app.GetUser()) if err != nil { return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID) } } // check the queue: is a leaf queue with submit access if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) { return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) } // add the app to the queue to set the quota on the queue if needed queue.AddApplication(app) // check only for gang request // - make sure the taskgroup request fits in the maximum set for the queue hierarchy // - task groups should only be used in FIFO or StateAware queues // if the check fails remove the app from the queue again if placeHolder := app.GetPlaceholderAsk(); !resources.IsZero(placeHolder) { // check the queue sorting if !queue.SupportTaskGroup() { queue.RemoveApplication(app) return fmt.Errorf("queue %s cannot run application %s with task group request: unsupported sort type", queueName, appID) } // retrieve the max set if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil { if !maxQueue.FitInMaxUndef(placeHolder) { queue.RemoveApplication(app) return fmt.Errorf("queue %s cannot fit application %s: task group request %s larger than max queue allocation %s", queueName, appID, placeHolder.String(), maxQueue.String()) } } } // all is OK update the app and add it to the partition app.SetQueue(queue) app.SetTerminatedCallback(pc.moveTerminatedApp) pc.applications[appID] = app return nil } // Remove the application from the partition. // This does not fail and handles missing app/queue/node/allocations internally func (pc *PartitionContext) removeApplication(appID string) []*objects.Allocation { // update the partition details, must be locked but all other updates should not hold partition lock app := pc.removeAppInternal(appID) if app == nil { return nil } // Remove all asks and thus all reservations and pending resources (queue included) _ = app.RemoveAllocationAsk("") // Remove app from queue if queue := app.GetQueue(); queue != nil { queue.RemoveApplication(app) } // Remove all allocations allocations := app.RemoveAllAllocations() // Remove all allocations from node(s) (queues have been updated already) if len(allocations) != 0 { // track the number of allocations pc.updateAllocationCount(-len(allocations)) for _, alloc := range allocations { currentUUID := alloc.GetUUID() node := pc.GetNode(alloc.GetNodeID()) if node == nil { log.Log(log.SchedPartition).Warn("unknown node: not found in active node list", zap.String("appID", appID), zap.String("nodeID", alloc.GetNodeID())) continue } if nodeAlloc := node.RemoveAllocation(currentUUID); nodeAlloc == nil { log.Log(log.SchedPartition).Warn("unknown allocation: not found on the node", zap.String("appID", appID), zap.String("allocationId", currentUUID), zap.String("nodeID", alloc.GetNodeID())) } } } return allocations } // Locked updates of the partition tracking info func (pc *PartitionContext) removeAppInternal(appID string) *objects.Application { pc.Lock() defer pc.Unlock() // Remove from applications map app := pc.applications[appID] if app == nil { return nil } // remove from partition then cleanup underlying objects delete(pc.applications, appID) return app } func (pc *PartitionContext) getApplication(appID string) *objects.Application { pc.RLock() defer pc.RUnlock() return pc.applications[appID] } func (pc *PartitionContext) getRejectedApplication(appID string) *objects.Application { pc.RLock() defer pc.RUnlock() return pc.rejectedApplications[appID] } // GetQueue returns queue from the structure based on the fully qualified name. // Wrapper around the unlocked version getQueueInternal() // Visible by tests func (pc *PartitionContext) GetQueue(name string) *objects.Queue { pc.RLock() defer pc.RUnlock() return pc.getQueueInternal(name) } // Get the queue from the structure based on the fully qualified name. // The name is not syntax checked and must be valid. // Returns nil if the queue is not found otherwise the queue object. // // NOTE: this is a lock free call. It should only be called holding the PartitionContext lock. func (pc *PartitionContext) getQueueInternal(name string) *objects.Queue { // start at the root queue := pc.root part := strings.Split(strings.ToLower(name), configs.DOT) // no input if len(part) == 0 || part[0] != configs.RootQueue { return nil } // walk over the parts going down towards the requested queue for i := 1; i < len(part); i++ { // if child not found break out and return if queue = queue.GetChildQueue(part[i]); queue == nil { break } } return queue } // Get the queue info for the whole queue structure to pass to the webservice func (pc *PartitionContext) GetQueueInfo() dao.QueueDAOInfo { return pc.root.GetQueueInfo() } // Get the queue info for the whole queue structure to pass to the webservice func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo { var PartitionQueueDAOInfo = dao.PartitionQueueDAOInfo{} PartitionQueueDAOInfo = pc.root.GetPartitionQueueDAOInfo() PartitionQueueDAOInfo.Partition = common.GetPartitionNameWithoutClusterID(pc.Name) return PartitionQueueDAOInfo } // Create a queue with full hierarchy. This is called when a new queue is created from a placement rule. // The final leaf queue does not exist otherwise we would not get here. // This means that at least 1 queue (a leaf queue) will be created func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (*objects.Queue, error) { // find the queue furthest down the hierarchy that exists var toCreate []string if !strings.HasPrefix(name, configs.RootQueue) || !strings.Contains(name, configs.DOT) { return nil, fmt.Errorf("illegal queue name passed in: %s", name) } current := name queue := pc.getQueueInternal(current) log.Log(log.SchedPartition).Debug("Checking queue creation") for queue == nil { toCreate = append(toCreate, current[strings.LastIndex(current, configs.DOT)+1:]) current = current[0:strings.LastIndex(current, configs.DOT)] queue = pc.getQueueInternal(current) } // Check the ACL before we really create // The existing parent queue is the lowest we need to look at if !queue.CheckSubmitAccess(user) { return nil, fmt.Errorf("submit access to queue %s denied during create of: %s", current, name) } if queue.IsLeafQueue() { return nil, fmt.Errorf("creation of queue %s failed parent is already a leaf: %s", name, current) } log.Log(log.SchedPartition).Debug("Creating queue(s)", zap.String("parent", current), zap.String("fullPath", name)) for i := len(toCreate) - 1; i >= 0; i-- { // everything is checked and there should be no errors var err error queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue) if err != nil { log.Log(log.SchedPartition).Warn("Queue auto create failed unexpected", zap.String("queueName", toCreate[i]), zap.Error(err)) return nil, err } } return queue, nil } // Get a node from the partition by nodeID. func (pc *PartitionContext) GetNode(nodeID string) *objects.Node { pc.RLock() defer pc.RUnlock() return pc.nodes.GetNode(nodeID) } // Add the node to the partition and process the allocations that are reported by the node. // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error { if node == nil { return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name) } log.Log(log.SchedPartition).Info("adding node to partition", zap.String("partition", pc.Name), zap.String("nodeID", node.NodeID)) if pc.isDraining() || pc.isStopped() { return fmt.Errorf("partition %s is stopped cannot add a new node %s", pc.Name, node.NodeID) } if err := pc.addNodeToList(node); err != nil { return err } // Add allocations that exist on the node when added if len(existingAllocations) > 0 { for current, alloc := range existingAllocations { if err := pc.addAllocation(alloc); err != nil { // not expecting any inflight replacements on node recovery released, _ := pc.removeNode(node.NodeID) log.Log(log.SchedPartition).Info("Failed to add existing allocations, changes reversed", zap.String("nodeID", node.NodeID), zap.Int("existingAllocations", len(existingAllocations)), zap.Int("releasedAllocations", len(released)), zap.Int("processingAlloc", current), zap.Stringer("allocation", alloc), zap.Error(err)) // update failed metric, active metrics are tracked in add/remove from list metrics.GetSchedulerMetrics().IncFailedNodes() return err } } } return nil } // Update the partition resources based on the change of the node information func (pc *PartitionContext) updatePartitionResource(delta *resources.Resource) { pc.Lock() defer pc.Unlock() if delta != nil { if pc.totalPartitionResource == nil { pc.totalPartitionResource = delta.Clone() } else { pc.totalPartitionResource.AddTo(delta) } pc.root.SetMaxResource(pc.totalPartitionResource) } } // Update the partition details when removing a node. // This locks the partition. The partition may not be locked when we process the allocation // additions to the node as that takes further app, queue or node locks func (pc *PartitionContext) addNodeToList(node *objects.Node) error { pc.Lock() defer pc.Unlock() // Node can be added to the system to allow processing of the allocations if err := pc.nodes.AddNode(node); err != nil { return fmt.Errorf("failed to add node %s to partition %s, error: %v", node.NodeID, pc.Name, err) } metrics.GetSchedulerMetrics().IncActiveNodes() // update/set the resources available in the cluster if pc.totalPartitionResource == nil { pc.totalPartitionResource = node.GetCapacity().Clone() } else { pc.totalPartitionResource.AddTo(node.GetCapacity()) } pc.root.SetMaxResource(pc.totalPartitionResource) log.Log(log.SchedPartition).Info("Updated available resources from added node", zap.String("partitionName", pc.Name), zap.String("nodeID", node.NodeID), zap.Stringer("partitionResource", pc.totalPartitionResource)) return nil } // Update the partition details when removing a node. // This locks the partition. The partition may not be locked when we process the allocation // removal from the node as that takes further app, queue or node locks func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node { pc.Lock() defer pc.Unlock() node := pc.nodes.RemoveNode(nodeID) if node == nil { log.Log(log.SchedPartition).Debug("node was not found, node already removed", zap.String("nodeID", nodeID), zap.String("partitionName", pc.Name)) return nil } // Remove node from list of tracked nodes metrics.GetSchedulerMetrics().DecActiveNodes() // found the node cleanup the available resources, partition resources cannot be nil at this point pc.totalPartitionResource.SubFrom(node.GetCapacity()) pc.root.SetMaxResource(pc.totalPartitionResource) log.Log(log.SchedPartition).Info("Updated available resources from removed node", zap.String("partitionName", pc.Name), zap.String("nodeID", node.NodeID), zap.Stringer("partitionResource", pc.totalPartitionResource)) return node } // Remove a node from the partition. It returns all removed and confirmed allocations. // The removed allocations are all linked to the current node. // The confirmed allocations are real allocations that are linked to placeholders on the current node and are linked to // other nodes. // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) removeNode(nodeID string) ([]*objects.Allocation, []*objects.Allocation) { log.Log(log.SchedPartition).Info("removing node from partition", zap.String("partition", pc.Name), zap.String("nodeID", nodeID)) node := pc.removeNodeFromList(nodeID) if node == nil { return nil, nil } // unreserve all the apps that were reserved on the node. // The node is not reachable anymore unless you have the pointer. for _, r := range node.GetReservations() { _, app, ask := r.GetObjects() pc.unReserve(app, node, ask) } // cleanup the allocations linked to the node return pc.removeNodeAllocations(node) } // Remove all allocations that are assigned to a node as part of the node removal. This is not part of the node object // as updating the applications and queues is the only goal. Applications and queues are not accessible from the node. // The removed and confirmed allocations are returned. // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) { released := make([]*objects.Allocation, 0) confirmed := make([]*objects.Allocation, 0) // walk over all allocations still registered for this node for _, alloc := range node.GetAllAllocations() { allocID := alloc.GetUUID() // since we are not locking the node and or application we could have had an update while processing // note that we do not return the allocation if the app or allocation is not found and assume that it // was already removed app := pc.getApplication(alloc.GetApplicationID()) if app == nil { log.Log(log.SchedPartition).Info("app is not found, skipping while removing the node", zap.String("appID", alloc.GetApplicationID()), zap.String("nodeID", node.NodeID)) continue } // check for an inflight replacement. if alloc.GetReleaseCount() != 0 { release := alloc.GetFirstRelease() // allocation to update the ask on: this needs to happen on the real alloc never the placeholder askAlloc := alloc // placeholder gets handled differently from normal if alloc.IsPlaceholder() { // Check if the real allocation is made on the same node if not we should trigger a confirmation of // the replacement. Trigger the replacement only if it is NOT on the same node. // If it is on the same node we just keep going as the real allocation will be unlinked as a result of // the removal of this placeholder. The ask update will trigger rescheduling later for the real alloc. if alloc.GetNodeID() != release.GetNodeID() { // ignore the return as that is the same as alloc, the alloc is gone after this call _ = app.ReplaceAllocation(allocID) // we need to check the resources equality delta := resources.Sub(release.GetAllocatedResource(), alloc.GetAllocatedResource()) // Any negative value in the delta means that at least one of the requested resource in the // placeholder is larger than the real allocation. The nodes are correct the queue needs adjusting. // The reverse case is handled during allocation. if delta.HasNegativeValue() { // this looks incorrect but the delta is negative and the result will be a real decrease err := app.GetQueue().IncAllocatedResource(delta, false) // this should not happen as we really decrease the value if err != nil { log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder", zap.String("appID", alloc.GetApplicationID()), zap.String("placeholderID", alloc.GetUUID()), zap.String("allocationID", release.GetUUID()), zap.Error(err)) } log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation", zap.String("allocationID", release.GetUUID()), zap.Stringer("requested resource", release.GetAllocatedResource()), zap.String("placeholderID", alloc.GetUUID()), zap.Stringer("placeholder resource", alloc.GetAllocatedResource())) } // track what we confirm on the other node to confirm it in the shim and get is bound confirmed = append(confirmed, release) // the allocation is removed so add it to the list that we return released = append(released, alloc) log.Log(log.SchedPartition).Info("allocation removed from node and replacement confirmed", zap.String("nodeID", node.NodeID), zap.String("allocationId", allocID), zap.String("replacement nodeID", release.GetNodeID()), zap.String("replacement allocationId", release.GetUUID())) continue } askAlloc = release } // unlink the placeholder and allocation release.ClearReleases() alloc.ClearReleases() // update the repeat on the real alloc to get it re-scheduled _, err := app.UpdateAskRepeat(askAlloc.GetAsk().GetAllocationKey(), 1) if err == nil { log.Log(log.SchedPartition).Info("inflight placeholder replacement reversed due to node removal", zap.String("appID", askAlloc.GetApplicationID()), zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()), zap.String("nodeID", node.NodeID), zap.String("replacement allocationId", askAlloc.GetUUID())) } else { log.Log(log.SchedPartition).Error("node removal: repeat update failure for inflight replacement", zap.String("appID", askAlloc.GetApplicationID()), zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()), zap.String("nodeID", node.NodeID), zap.Error(err)) } } // check allocations on the app if app.RemoveAllocation(allocID, si.TerminationType_UNKNOWN_TERMINATION_TYPE) == nil { log.Log(log.SchedPartition).Info("allocation is not found, skipping while removing the node", zap.String("allocationId", allocID), zap.String("appID", app.ApplicationID), zap.String("nodeID", node.NodeID)) continue } if err := app.GetQueue().DecAllocatedResource(alloc.GetAllocatedResource()); err != nil { log.Log(log.SchedPartition).Warn("failed to release resources from queue", zap.String("appID", alloc.GetApplicationID()), zap.Error(err)) } else { metrics.GetQueueMetrics(app.GetQueuePath()).IncReleasedContainer() } // remove preempted resources if alloc.IsPreempted() { app.GetQueue().DecPreemptingResource(alloc.GetAllocatedResource()) } if alloc.IsPlaceholder() { pc.decPhAllocationCount(1) } // the allocation is removed so add it to the list that we return released = append(released, alloc) log.Log(log.SchedPartition).Info("allocation removed from node", zap.String("nodeID", node.NodeID), zap.String("allocationId", allocID)) } // track the number of allocations: decrement the released allocation AND increment with the confirmed pc.updateAllocationCount(len(confirmed) - len(released)) return released, confirmed } func (pc *PartitionContext) calculateOutstandingRequests() []*objects.AllocationAsk { if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { return nil } outstanding := make([]*objects.AllocationAsk, 0) pc.root.GetQueueOutstandingRequests(&outstanding) return outstanding } // Try regular allocation for the partition // Lock free call this all locks are taken when needed in called functions func (pc *PartitionContext) tryAllocate() *objects.Allocation { if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { // nothing to do just return return nil } // try allocating from the root down alloc := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode) if alloc != nil { return pc.allocate(alloc) } return nil } // Try process reservations for the partition // Lock free call this all locks are taken when needed in called functions func (pc *PartitionContext) tryReservedAllocate() *objects.Allocation { if pc.getReservationCount() == 0 { return nil } if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { // nothing to do just return return nil } // try allocating from the root down alloc := pc.root.TryReservedAllocate(pc.GetNodeIterator) if alloc != nil { return pc.allocate(alloc) } return nil } // Try process placeholder for the partition // Lock free call this all locks are taken when needed in called functions func (pc *PartitionContext) tryPlaceholderAllocate() *objects.Allocation { if pc.getPhAllocationCount() == 0 { return nil } if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) { // nothing to do just return return nil } // try allocating from the root down alloc := pc.root.TryPlaceholderAllocate(pc.GetNodeIterator, pc.GetNode) if alloc != nil { log.Log(log.SchedPartition).Info("scheduler replace placeholder processed", zap.String("appID", alloc.GetApplicationID()), zap.String("allocationKey", alloc.GetAllocationKey()), zap.String("uuid", alloc.GetUUID()), zap.String("placeholder released uuid", alloc.GetFirstRelease().GetUUID())) // pass the release back to the RM via the cluster context return alloc } return nil } // Process the allocation and make the left over changes in the partition. // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) allocate(alloc *objects.Allocation) *objects.Allocation { // find the app make sure it still exists appID := alloc.GetApplicationID() app := pc.getApplication(appID) if app == nil { log.Log(log.SchedPartition).Info("Application was removed while allocating", zap.String("appID", appID)) return nil } // find the node make sure it still exists // if the node was passed in use that ID instead of the one from the allocation // the node ID is set when a reservation is allocated on a non-reserved node var nodeID string if alloc.GetReservedNodeID() == "" { nodeID = alloc.GetNodeID() } else { nodeID = alloc.GetReservedNodeID() log.Log(log.SchedPartition).Debug("Reservation allocated on different node", zap.String("current node", alloc.GetNodeID()), zap.String("reserved node", nodeID), zap.String("appID", appID)) } node := pc.GetNode(nodeID) if node == nil { log.Log(log.SchedPartition).Info("Node was removed while allocating", zap.String("nodeID", nodeID), zap.String("appID", appID)) return nil } alloc.SetInstanceType(node.GetInstanceType()) // reservation if alloc.GetResult() == objects.Reserved { pc.reserve(app, node, alloc.GetAsk()) return nil } // unreserve if alloc.GetResult() == objects.Unreserved || alloc.GetResult() == objects.AllocatedReserved { pc.unReserve(app, node, alloc.GetAsk()) if alloc.GetResult() == objects.Unreserved { return nil } // remove the link to the reserved node alloc.SetReservedNodeID("") } // track the number of allocations pc.updateAllocationCount(1) if alloc.IsPlaceholder() { pc.incPhAllocationCount() } log.Log(log.SchedPartition).Info("scheduler allocation processed", zap.String("appID", alloc.GetApplicationID()), zap.String("allocationKey", alloc.GetAllocationKey()), zap.String("uuid", alloc.GetUUID()), zap.Stringer("allocatedResource", alloc.GetAllocatedResource()), zap.Bool("placeholder", alloc.IsPlaceholder()), zap.String("targetNode", alloc.GetNodeID())) // pass the allocation back to the RM via the cluster context return alloc } // Process the reservation in the scheduler // Lock free call this must be called holding the context lock func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) { appID := app.ApplicationID // app has node already reserved cannot reserve again if app.IsReservedOnNode(node.NodeID) { log.Log(log.SchedPartition).Info("Application is already reserved on node", zap.String("appID", appID), zap.String("nodeID", node.NodeID)) return } // all ok, add the reservation to the app, this will also reserve the node if err := app.Reserve(node, ask); err != nil { log.Log(log.SchedPartition).Debug("Failed to handle reservation, error during update of app", zap.Error(err)) return } // add the reservation to the queue list app.GetQueue().Reserve(appID) pc.incReservationCount() log.Log(log.SchedPartition).Info("allocation ask is reserved", zap.String("appID", appID), zap.String("queue", app.GetQueuePath()), zap.String("allocationKey", ask.GetAllocationKey()), zap.String("node", node.NodeID)) } // unReserve removes the reservation from the objects in the scheduler // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) unReserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) { // remove the reservation of the app, this will also unReserve the node var err error var num int if num, err = app.UnReserve(node, ask); err != nil { log.Log(log.SchedPartition).Info("Failed to unreserve, error during allocate on the app", zap.Error(err)) return } // remove the reservation of the queue appID := app.ApplicationID app.GetQueue().UnReserve(appID, num) pc.decReservationCount(num) log.Log(log.SchedPartition).Info("allocation ask is unreserved", zap.String("appID", appID), zap.String("queue", app.GetQueuePath()), zap.String("allocationKey", ask.GetAllocationKey()), zap.String("node", node.NodeID), zap.Int("reservationsRemoved", num)) } // Create an ordered node iterator based on the node sort policy set for this partition. // The iterator is nil if there are no unreserved nodes available. func (pc *PartitionContext) GetNodeIterator() objects.NodeIterator { return pc.nodes.GetNodeIterator() } // Create an ordered node iterator based on the node sort policy set for this partition. // The iterator is nil if there are no nodes available. func (pc *PartitionContext) GetFullNodeIterator() objects.NodeIterator { return pc.nodes.GetFullNodeIterator() } // Updated the allocations counter for the partition func (pc *PartitionContext) updateAllocationCount(allocs int) { pc.Lock() defer pc.Unlock() pc.allocations += allocs } func (pc *PartitionContext) GetTotalPartitionResource() *resources.Resource { pc.RLock() defer pc.RUnlock() return pc.totalPartitionResource } func (pc *PartitionContext) GetAllocatedResource() *resources.Resource { pc.RLock() defer pc.RUnlock() return pc.root.GetAllocatedResource() } func (pc *PartitionContext) GetTotalAllocationCount() int { pc.RLock() defer pc.RUnlock() return pc.allocations } func (pc *PartitionContext) GetTotalNodeCount() int { pc.RLock() defer pc.RUnlock() return pc.nodes.GetNodeCount() } // GetApplications returns a slice of the current applications tracked by the partition. func (pc *PartitionContext) GetApplications() []*objects.Application { pc.RLock() defer pc.RUnlock() var appList []*objects.Application for _, app := range pc.applications { appList = append(appList, app) } return appList } // GetCompletedApplications returns a slice of the completed applications tracked by the partition. func (pc *PartitionContext) GetCompletedApplications() []*objects.Application { pc.RLock() defer pc.RUnlock() var appList []*objects.Application for _, app := range pc.completedApplications { appList = append(appList, app) } return appList } // GetRejectedApplications returns a slice of the rejected applications tracked by the partition. func (pc *PartitionContext) GetRejectedApplications() []*objects.Application { pc.RLock() defer pc.RUnlock() var appList []*objects.Application for _, app := range pc.rejectedApplications { appList = append(appList, app) } return appList } // getAppsByState returns a slice of applicationIDs for the current applications filtered by state // Completed and Rejected applications are tracked in a separate map and will never be included. func (pc *PartitionContext) getAppsByState(state string) []string { pc.RLock() defer pc.RUnlock() var apps []string for appID, app := range pc.applications { if app.CurrentState() == state { apps = append(apps, appID) } } return apps } // getRejectedAppsByState returns a slice of applicationIDs for the rejected applications filtered by state. func (pc *PartitionContext) getRejectedAppsByState(state string) []string { pc.RLock() defer pc.RUnlock() var apps []string for appID, app := range pc.rejectedApplications { if app.CurrentState() == state { apps = append(apps, appID) } } return apps } // getCompletedAppsByState returns a slice of applicationIDs for the completed applicationIDs filtered by state. func (pc *PartitionContext) getCompletedAppsByState(state string) []string { pc.RLock() defer pc.RUnlock() var apps []string for appID, app := range pc.completedApplications { if app.CurrentState() == state { apps = append(apps, appID) } } return apps } // cleanupExpiredApps cleans up applications in the Expired state from the three tracking maps func (pc *PartitionContext) cleanupExpiredApps() { for _, appID := range pc.getAppsByState(objects.Expired.String()) { pc.Lock() delete(pc.applications, appID) pc.Unlock() } for _, appID := range pc.getRejectedAppsByState(objects.Expired.String()) { pc.Lock() delete(pc.rejectedApplications, appID) pc.Unlock() } for _, appID := range pc.getCompletedAppsByState(objects.Expired.String()) { pc.Lock() delete(pc.completedApplications, appID) pc.Unlock() } } // GetNodes returns a slice of all nodes unfiltered from the iterator func (pc *PartitionContext) GetNodes() []*objects.Node { pc.RLock() defer pc.RUnlock() return pc.nodes.GetNodes() } // Add an allocation to the partition/node/application/queue during node registration. // Queue max allocation is not checked as the allocation is part of a new node addition. // // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error { // cannot do anything with a nil alloc, should only happen if the shim broke things badly if alloc == nil { return nil } if pc.isStopped() { return fmt.Errorf("partition %s is stopped cannot add new allocation %s", pc.Name, alloc.GetAllocationKey()) } // We must not generate a new uuid for it, we directly use the uuid reported by shim // to track this allocation, a missing uuid is a broken allocation if alloc.GetUUID() == "" { metrics.GetSchedulerMetrics().IncSchedulingError() return fmt.Errorf("failing to restore allocation %s for application %s: missing uuid", alloc.GetAllocationKey(), alloc.GetApplicationID()) } log.Log(log.SchedPartition).Info("adding recovered allocation", zap.String("partitionName", pc.Name), zap.String("appID", alloc.GetApplicationID()), zap.String("allocKey", alloc.GetAllocationKey()), zap.String("uuid", alloc.GetUUID())) // Check if allocation violates any resource restriction, or allocate on a // non-existent application or nodes. node := pc.GetNode(alloc.GetNodeID()) if node == nil { metrics.GetSchedulerMetrics().IncSchedulingError() return fmt.Errorf("failed to find node %s", alloc.GetNodeID()) } app := pc.getApplication(alloc.GetApplicationID()) if app == nil { metrics.GetSchedulerMetrics().IncSchedulingError() return fmt.Errorf("failed to find application %s", alloc.GetApplicationID()) } queue := app.GetQueue() // Do not check if the new allocation goes beyond the queue's max resource (recursive). // still handle a returned error but they should never happen. if err := queue.IncAllocatedResource(alloc.GetAllocatedResource(), true); err != nil { metrics.GetSchedulerMetrics().IncSchedulingError() return fmt.Errorf("cannot allocate resource from application %s: %v ", alloc.GetApplicationID(), err) } metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer() node.AddAllocation(alloc) app.RecoverAllocationAsk(alloc.GetAsk()) app.AddAllocation(alloc) // track the number of allocations pc.updateAllocationCount(1) if alloc.IsPlaceholder() { pc.incPhAllocationCount() } log.Log(log.SchedPartition).Info("recovered allocation", zap.String("partitionName", pc.Name), zap.String("appID", alloc.GetApplicationID()), zap.String("allocKey", alloc.GetAllocationKey()), zap.String("allocationUid", alloc.GetUUID()), zap.Bool("placeholder", alloc.IsPlaceholder())) return nil } func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation) (security.UserGroup, error) { pc.RLock() defer pc.RUnlock() return pc.userGroupCache.ConvertUGI(ugi) } // calculate overall nodes resource usage and returns a map as the result, // where the key is the resource name, e.g memory, and the value is a []int, // which is a slice with 10 elements, // each element represents a range of resource usage, // such as // // 0: 0%->10% // 1: 10% -> 20% // ... // 9: 90% -> 100% // // the element value represents number of nodes fall into this bucket. // if slice[9] = 3, this means there are 3 nodes resource usage is in the range 80% to 90%. // // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) calculateNodesResourceUsage() map[string][]int { nodesCopy := pc.GetNodes() mapResult := make(map[string][]int) for _, node := range nodesCopy { capacity := node.GetCapacity() allocated := node.GetAllocatedResource() for name, total := range capacity.Resources { if total > 0 { resourceAllocated := float64(allocated.Resources[name]) // Consider over-allocated node as 100% utilized. v := math.Min(resourceAllocated/float64(total), 1) idx := int(math.Dim(math.Ceil(v*10), 1)) if dist, ok := mapResult[name]; !ok { newDist := make([]int, 10) for i := range newDist { newDist[i] = 0 } mapResult[name] = newDist mapResult[name][idx]++ } else { dist[idx]++ } } } } return mapResult } // removeAllocation removes the referenced allocation(s) from the applications and nodes // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { if release == nil { return nil, nil } appID := release.ApplicationID uuid := release.GetUUID() app := pc.getApplication(appID) // no app nothing to do everything should already be clean if app == nil { log.Log(log.SchedPartition).Info("Application not found while releasing allocation", zap.String("appID", appID), zap.String("allocationId", uuid), zap.Stringer("terminationType", release.TerminationType)) return nil, nil } // temp store for allocations manipulated released := make([]*objects.Allocation, 0) var confirmed *objects.Allocation // when uuid is not specified, remove all allocations from the app if uuid == "" { log.Log(log.SchedPartition).Info("remove all allocations", zap.String("appID", appID)) released = append(released, app.RemoveAllAllocations()...) total := 0 for _, r := range released { if r.IsPlaceholder() { total++ } } pc.decPhAllocationCount(total) } else { // if we have an uuid the termination type is important if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED { log.Log(log.SchedPartition).Info("replacing placeholder allocation", zap.String("appID", appID), zap.String("allocationId", uuid)) if alloc := app.ReplaceAllocation(uuid); alloc != nil { released = append(released, alloc) if alloc.IsPlaceholder() { pc.decPhAllocationCount(1) } } } else { log.Log(log.SchedPartition).Info("removing allocation from application", zap.String("appID", appID), zap.String("allocationId", uuid), zap.Stringer("terminationType", release.TerminationType)) if alloc := app.RemoveAllocation(uuid, release.TerminationType); alloc != nil { released = append(released, alloc) } } } // for each allocation to release, update the node and queue. total := resources.NewResource() totalPreempting := resources.NewResource() for _, alloc := range released { node := pc.GetNode(alloc.GetNodeID()) if node == nil { log.Log(log.SchedPartition).Info("node not found while releasing allocation", zap.String("appID", appID), zap.String("allocationId", alloc.GetUUID()), zap.String("nodeID", alloc.GetNodeID())) continue } if release.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED { confirmed = alloc.GetFirstRelease() // we need to check the resources equality delta := resources.Sub(confirmed.GetAllocatedResource(), alloc.GetAllocatedResource()) // Any negative value in the delta means that at least one of the requested resource in the // placeholder is larger than the real allocation. The node and queue need adjusting. // The reverse case is handled during allocation. if delta.HasNegativeValue() { // This looks incorrect but the delta is negative and the result will be an increase of the // total tracked. The total will later be deducted from the queue usage. total.SubFrom(delta) log.Log(log.SchedPartition).Warn("replacing placeholder: placeholder is larger than real allocation", zap.String("allocationID", confirmed.GetUUID()), zap.Stringer("requested resource", confirmed.GetAllocatedResource()), zap.String("placeholderID", alloc.GetUUID()), zap.Stringer("placeholder resource", alloc.GetAllocatedResource())) } // replacements could be on a different node and different size handle all cases if confirmed.GetNodeID() == alloc.GetNodeID() { // this is the real swap on the node, adjust usage if needed node.ReplaceAllocation(alloc.GetUUID(), confirmed, delta) } else { // we have already added the real allocation to the new node, just remove the placeholder node.RemoveAllocation(alloc.GetUUID()) } log.Log(log.SchedPartition).Info("replacing placeholder allocation on node", zap.String("nodeID", alloc.GetNodeID()), zap.String("allocationId", alloc.GetUUID()), zap.String("allocation nodeID", confirmed.GetNodeID())) } else if node.RemoveAllocation(alloc.GetUUID()) != nil { // all non replacement are real removes: must update the queue usage total.AddTo(alloc.GetAllocatedResource()) log.Log(log.SchedPartition).Info("removing allocation from node", zap.String("nodeID", alloc.GetNodeID()), zap.String("allocationId", alloc.GetUUID())) } if alloc.IsPreempted() { totalPreempting.AddTo(alloc.GetAllocatedResource()) } } if resources.StrictlyGreaterThanZero(total) { queue := app.GetQueue() if err := queue.DecAllocatedResource(total); err != nil { log.Log(log.SchedPartition).Warn("failed to release resources from queue", zap.String("appID", appID), zap.String("allocationId", uuid), zap.Error(err)) } else { metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer() } } if resources.StrictlyGreaterThanZero(totalPreempting) { app.GetQueue().DecPreemptingResource(totalPreempting) } // if confirmed is set we can assume there will just be one alloc in the released // that allocation was already released by the shim, so clean up released if confirmed != nil { released = nil } // track the number of allocations, when we replace the result is no change pc.updateAllocationCount(-len(released)) // if the termination type is timeout, we don't notify the shim, because it's // originated from that side if release.TerminationType == si.TerminationType_TIMEOUT { released = nil } return released, confirmed } // Remove the allocation ask from the specified application // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) removeAllocationAsk(release *si.AllocationAskRelease) { if release == nil { return } appID := release.ApplicationID allocKey := release.AllocationKey // A timeout termination is send by the core to the shim, ignore on return. if release.TerminationType == si.TerminationType_TIMEOUT { log.Log(log.SchedPartition).Debug("Ignoring ask release with termination type Timeout", zap.String("appID", appID), zap.String("ask", allocKey)) return } app := pc.getApplication(appID) if app == nil { log.Log(log.SchedPartition).Info("Invalid ask release requested by shim", zap.String("appID", appID), zap.String("ask", allocKey), zap.Stringer("terminationType", release.TerminationType)) return } // remove the allocation asks from the app _ = app.RemoveAllocationAsk(allocKey) } // Add the allocation ask to the specified application // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. func (pc *PartitionContext) addAllocationAsk(siAsk *si.AllocationAsk) error { if siAsk == nil { return nil } app := pc.getApplication(siAsk.ApplicationID) if app == nil { return fmt.Errorf("failed to find application %s, for allocation ask %s", siAsk.ApplicationID, siAsk.AllocationKey) } // add the allocation asks to the app return app.AddAllocationAsk(objects.NewAllocationAskFromSI(siAsk)) } func (pc *PartitionContext) GetCurrentState() string { return pc.stateMachine.Current() } func (pc *PartitionContext) GetStateTime() time.Time { pc.RLock() defer pc.RUnlock() return pc.stateTime } func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy { pc.RLock() defer pc.RUnlock() policy := pc.nodes.GetNodeSortingPolicy() return policy.PolicyType() } func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64 { pc.RLock() defer pc.RUnlock() policy := pc.nodes.GetNodeSortingPolicy() return policy.ResourceWeights() } func (pc *PartitionContext) moveTerminatedApp(appID string) { app := pc.getApplication(appID) // nothing to do if the app is not found on the partition if app == nil { log.Log(log.SchedPartition).Debug("Application already removed from app list", zap.String("appID", appID)) return } app.UnSetQueue() // new ID as completedApplications map key, use negative value to get a divider newID := appID + strconv.FormatInt(-(time.Now()).Unix(), 10) log.Log(log.SchedPartition).Info("Removing terminated application from the application list", zap.String("appID", appID), zap.String("app status", app.CurrentState())) app.LogAppSummary(pc.RmID) app.CleanupUsedResource() pc.Lock() defer pc.Unlock() delete(pc.applications, appID) pc.completedApplications[newID] = app } func (pc *PartitionContext) AddRejectedApplication(rejectedApplication *objects.Application, rejectedMessage string) { if err := rejectedApplication.RejectApplication(rejectedMessage); err != nil { log.Log(log.SchedPartition).Warn("BUG: Unexpected failure: Application state not changed to Rejected", zap.String("currentState", rejectedApplication.CurrentState()), zap.Error(err)) } if pc.rejectedApplications == nil { pc.rejectedApplications = make(map[string]*objects.Application) } pc.rejectedApplications[rejectedApplication.ApplicationID] = rejectedApplication } func (pc *PartitionContext) incPhAllocationCount() { pc.Lock() defer pc.Unlock() pc.placeholderAllocations++ } func (pc *PartitionContext) decPhAllocationCount(num int) { pc.Lock() defer pc.Unlock() pc.placeholderAllocations -= num } func (pc *PartitionContext) getPhAllocationCount() int { pc.RLock() defer pc.RUnlock() return pc.placeholderAllocations } func (pc *PartitionContext) incReservationCount() { pc.Lock() defer pc.Unlock() pc.reservations++ } func (pc *PartitionContext) decReservationCount(num int) { pc.Lock() defer pc.Unlock() pc.reservations -= num } func (pc *PartitionContext) getReservationCount() int { pc.RLock() defer pc.RUnlock() return pc.reservations }