pkg/scheduler/context.go (692 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package scheduler import ( "errors" "fmt" "math" "time" "go.uber.org/zap" "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/handler" "github.com/apache/yunikorn-core/pkg/locking" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-core/pkg/webservice/dao" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) const disableReservation = "DISABLE_RESERVATION" type ClusterContext struct { partitions map[string]*PartitionContext policyGroup string rmEventHandler handler.EventHandler uuid string // config values that change scheduling behaviour needPreemption bool reservationDisabled bool rmInfo map[string]*RMInformation startTime time.Time locking.RWMutex lastHealthCheckResult *dao.SchedulerHealthDAOInfo } type RMInformation struct { RMBuildInformation map[string]string } // Create a new cluster context to be used outside of the event system. // test only func NewClusterContext(rmID, policyGroup string, config []byte) (*ClusterContext, error) { // load the config this returns a validated configuration conf, err := configs.LoadSchedulerConfigFromByteArray(config) if err != nil { return nil, err } // create the context and set the policyGroup cc := &ClusterContext{ partitions: make(map[string]*PartitionContext), policyGroup: policyGroup, reservationDisabled: common.GetBoolEnvVar(disableReservation, false), startTime: time.Now(), uuid: common.GetNewUUID(), } // If reservation is turned off set the reservation delay to the maximum duration defined. // The time package does not export maxDuration so use the equivalent from the math package. if cc.reservationDisabled { objects.SetReservationDelay(math.MaxInt64) } err = cc.updateSchedulerConfig(conf, rmID) if err != nil { return nil, err } // update the global config configs.ConfigContext.Set(policyGroup, conf) return cc, nil } func newClusterContext() *ClusterContext { cc := &ClusterContext{ partitions: make(map[string]*PartitionContext), reservationDisabled: common.GetBoolEnvVar(disableReservation, false), startTime: time.Now(), uuid: common.GetNewUUID(), } // If reservation is turned off set the reservation delay to the maximum duration defined. // The time package does not export maxDuration so use the equivalent from the math package. if cc.reservationDisabled { objects.SetReservationDelay(math.MaxInt64) } return cc } func (cc *ClusterContext) setEventHandler(rmHandler handler.EventHandler) { cc.rmEventHandler = rmHandler } // schedule is the main scheduling routine. // Process each partition in the scheduler, walk over each queue and app to check if anything can be scheduled. // This can be forked into a go routine per partition if needed to increase parallel allocations. // Returns true if an allocation was able to be scheduled. func (cc *ClusterContext) schedule() bool { // schedule each partition defined in the cluster activity := false for _, psc := range cc.GetPartitionMapClone() { // if there are no resources in the partition just skip if psc.root.GetMaxResource() == nil { continue } // a stopped partition does not allocate if psc.isStopped() { continue } // try reservations first schedulingStart := time.Now() result := psc.tryReservedAllocate() if result == nil { // placeholder replacement second result = psc.tryPlaceholderAllocate() // nothing reserved that can be allocated try normal allocate if result == nil { result = psc.tryAllocate() } } metrics.GetSchedulerMetrics().ObserveSchedulingLatency(schedulingStart) if result != nil { if result.ResultType == objects.Replaced { // communicate the removal to the RM cc.notifyRMAllocationReleased(psc.RmID, psc.Name, []*objects.Allocation{result.Request.GetRelease()}, si.TerminationType_PLACEHOLDER_REPLACED, "replacing allocationKey: "+result.Request.GetAllocationKey()) } else { cc.notifyRMNewAllocation(psc.RmID, result.Request) } activity = true } } return activity } func (cc *ClusterContext) processRMRegistrationEvent(event *rmevent.RMRegistrationEvent) { cc.Lock() defer cc.Unlock() rmID := event.Registration.RmID // we should not have any partitions set at this point if len(cc.partitions) > 0 { event.Channel <- &rmevent.Result{ Reason: fmt.Sprintf("RM %s has been registered before, active partitions %d", rmID, len(cc.partitions)), Succeeded: false, } return } policyGroup := event.Registration.PolicyGroup config := event.Registration.Config configs.SetConfigMap(event.Registration.ExtraConfig) // load the config this returns a validated configuration if len(config) == 0 { log.Log(log.SchedContext).Info("No scheduler configuration supplied, using defaults", zap.String("rmID", rmID)) config = configs.DefaultSchedulerConfig } conf, err := configs.LoadSchedulerConfigFromByteArray([]byte(config)) if err != nil { event.Channel <- &rmevent.Result{Succeeded: false, Reason: err.Error()} return } err = cc.updateSchedulerConfig(conf, rmID) if err != nil { event.Channel <- &rmevent.Result{Succeeded: false, Reason: err.Error()} return } // update global scheduler configs, set the policyGroup for this cluster cc.policyGroup = policyGroup configs.ConfigContext.Set(policyGroup, conf) // store the build information of RM cc.SetRMInfo(rmID, event.Registration.BuildInfo) // Done, notify channel event.Channel <- &rmevent.Result{ Succeeded: true, } } func (cc *ClusterContext) processRMConfigUpdateEvent(event *rmevent.RMConfigUpdateEvent) { cc.Lock() defer cc.Unlock() rmID := event.RmID // need to enhance the check to support multiple RMs if len(cc.partitions) == 0 { event.Channel <- &rmevent.Result{ Reason: fmt.Sprintf("RM %s has no active partitions, make sure it is registered", rmID), Succeeded: false, } return } // set extra configuration configs.SetConfigMap(event.ExtraConfig) // load the config this returns a validated configuration config := event.Config if len(config) == 0 { log.Log(log.SchedContext).Info("No scheduler configuration supplied, using defaults", zap.String("rmID", rmID)) config = configs.DefaultSchedulerConfig } conf, err := configs.LoadSchedulerConfigFromByteArray([]byte(config)) if err != nil { event.Channel <- &rmevent.Result{Succeeded: false, Reason: err.Error()} return } // skip update if config has not changed oldConf := configs.ConfigContext.Get(cc.policyGroup) if conf.Checksum == oldConf.Checksum { event.Channel <- &rmevent.Result{ Succeeded: true, } return } // update scheduler configuration err = cc.updateSchedulerConfig(conf, rmID) if err != nil { event.Channel <- &rmevent.Result{Succeeded: false, Reason: err.Error()} return } // Done, notify channel event.Channel <- &rmevent.Result{ Succeeded: true, } // update global scheduler configs configs.ConfigContext.Set(cc.policyGroup, conf) } func (cc *ClusterContext) handleRMUpdateNodeEvent(event *rmevent.RMUpdateNodeEvent) { request := event.Request cc.processNodes(request) } // processNodes process all node requests: add, remove and update. func (cc *ClusterContext) processNodes(request *si.NodeRequest) { nodeCount := len(request.GetNodes()) if nodeCount > 0 { acceptedNodes := make([]*si.AcceptedNode, 0) rejectedNodes := make([]*si.RejectedNode, 0) for _, nodeInfo := range request.GetNodes() { // nil nodes must be skipped if nodeInfo == nil { continue } create := false schedulable := false switch nodeInfo.Action { case si.NodeInfo_CREATE: create = true schedulable = true case si.NodeInfo_CREATE_DRAIN: create = true } if create { err := cc.addNode(nodeInfo, schedulable) if err == nil { acceptedNodes = append(acceptedNodes, &si.AcceptedNode{ NodeID: nodeInfo.NodeID, }) } else { rejectedNodes = append(rejectedNodes, &si.RejectedNode{ NodeID: nodeInfo.NodeID, Reason: err.Error(), }) } } else { cc.updateNode(nodeInfo) } } if len(acceptedNodes) > 0 || len(rejectedNodes) > 0 { // inform the RM which nodes have been accepted/rejected cc.rmEventHandler.HandleEvent( &rmevent.RMNodeUpdateEvent{ RmID: request.RmID, AcceptedNodes: acceptedNodes, RejectedNodes: rejectedNodes, }) } } } // Called when a RM re-registers. This triggers a full clean up. // Registration expects everything to be clean. func (cc *ClusterContext) removePartitionsByRMID(event *rmevent.RMPartitionsRemoveEvent) { cc.Lock() defer cc.Unlock() partitionToRemove := make(map[string]bool) // Just remove corresponding partitions for k, partition := range cc.partitions { if partition.RmID == event.RmID { partition.partitionManager.Stop() partitionToRemove[k] = true } } for partitionName := range partitionToRemove { delete(cc.partitions, partitionName) } // Done, notify channel event.Channel <- &rmevent.Result{ Succeeded: true, } } // Locked version of the configuration update called outside of event system. // Updates the current config via the config loader. // Used in test only, normal updates use the internal call func (cc *ClusterContext) UpdateRMSchedulerConfig(rmID string, config []byte) error { cc.Lock() defer cc.Unlock() if len(cc.partitions) == 0 { return fmt.Errorf("RM %s has no active partitions, make sure it is registered", rmID) } // load the config this returns a validated configuration conf, err := configs.LoadSchedulerConfigFromByteArray(config) if err != nil { return err } err = cc.updateSchedulerConfig(conf, rmID) if err != nil { return err } // update global scheduler configs configs.ConfigContext.Set(cc.policyGroup, conf) return nil } // Update or set the scheduler config. If the partitions list does not contain the specific partition it creates a new // partition otherwise it performs an update. // Called if the config file is updated, indirectly when the webservice is called. // During tests this is called outside of the even system to init. // unlocked call must only be called holding the ClusterContext lock func (cc *ClusterContext) updateSchedulerConfig(conf *configs.SchedulerConfig, rmID string) error { visited := map[string]bool{} var err error // walk over the partitions in the config: update existing ones for _, p := range conf.Partitions { partitionName := common.GetNormalizedPartitionName(p.Name, rmID) p.Name = partitionName part, ok := cc.partitions[p.Name] if ok { // make sure the new info passes all checks _, err = newPartitionContext(p, rmID, nil, true) if err != nil { return err } // checks passed perform the real update log.Log(log.SchedContext).Info("updating partitions", zap.String("partitionName", partitionName)) err = part.updatePartitionDetails(p) if err != nil { return err } } else { // not found: new partition, no checks needed log.Log(log.SchedContext).Info("added partitions", zap.String("partitionName", partitionName)) part, err = newPartitionContext(p, rmID, cc, false) if err != nil { return err } go part.partitionManager.Run() cc.partitions[partitionName] = part } // add it to the partitions to update visited[p.Name] = true } // get the removed partitions, mark them as deleted for _, part := range cc.partitions { if !visited[part.Name] { part.partitionManager.Stop() log.Log(log.SchedContext).Info("marked partition for removal", zap.String("partitionName", part.Name)) } } return nil } // Get the config name. func (cc *ClusterContext) GetPolicyGroup() string { cc.RLock() defer cc.RUnlock() return cc.policyGroup } func (cc *ClusterContext) GetStartTime() time.Time { cc.RLock() defer cc.RUnlock() return cc.startTime } func (cc *ClusterContext) GetRMInfoMapClone() map[string]*RMInformation { cc.RLock() defer cc.RUnlock() newMap := make(map[string]*RMInformation) for k, v := range cc.rmInfo { newMap[k] = v } return newMap } func (cc *ClusterContext) GetPartitionMapClone() map[string]*PartitionContext { cc.RLock() defer cc.RUnlock() newMap := make(map[string]*PartitionContext) for k, v := range cc.partitions { newMap[k] = v } return newMap } func (cc *ClusterContext) GetPartition(partitionName string) *PartitionContext { cc.RLock() defer cc.RUnlock() return cc.partitions[partitionName] } func (cc *ClusterContext) GetPartitionWithoutClusterID(partitionName string) *PartitionContext { cc.RLock() defer cc.RUnlock() for k, v := range cc.partitions { if len(partitionName) > 0 && common.GetPartitionNameWithoutClusterID(k) == partitionName { return v } } return nil } // Get the scheduling application based on the ID from the partition. // Returns nil if the partition or app cannot be found. // Visible for tests func (cc *ClusterContext) GetApplication(appID, partitionName string) *objects.Application { cc.RLock() defer cc.RUnlock() if partition := cc.partitions[partitionName]; partition != nil { return partition.getApplication(appID) } return nil } // Get the scheduling queue based on the queue path name from the partition. // Returns nil if the partition or queue cannot be found. // Visible for tests func (cc *ClusterContext) GetQueue(queueName string, partitionName string) *objects.Queue { cc.RLock() defer cc.RUnlock() if partition := cc.partitions[partitionName]; partition != nil { return partition.GetQueue(queueName) } return nil } // Process the application update. Add and remove applications from the partitions. // Lock free call, all updates occur on the underlying partition which is locked, or via events. func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdateApplicationEvent) { request := event.Request if len(request.New) == 0 && len(request.Remove) == 0 { return } acceptedApps := make([]*si.AcceptedApplication, 0) rejectedApps := make([]*si.RejectedApplication, 0) for _, app := range request.New { partition := cc.GetPartition(app.PartitionName) if partition == nil { msg := fmt.Sprintf("Failed to add application %s to partition %s, partition doesn't exist", app.ApplicationID, app.PartitionName) rejectedApps = append(rejectedApps, &si.RejectedApplication{ ApplicationID: app.ApplicationID, Reason: msg, }) log.Log(log.SchedContext).Error("Failed to add application to non existing partition", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName)) continue } // convert and resolve the user: cache can be set per partition // need to do this before we create the application ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags)) if err != nil { rejectedApps = append(rejectedApps, &si.RejectedApplication{ ApplicationID: app.ApplicationID, Reason: err.Error(), }) partition.AddRejectedApplication(objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID), err.Error()) log.Log(log.SchedContext).Error("Failed to add application to partition (user rejected)", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), zap.Error(err)) continue } // create a new app object and add it to the partition (partition logs details) schedApp := objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID) if err = partition.AddApplication(schedApp); err != nil { rejectedApps = append(rejectedApps, &si.RejectedApplication{ ApplicationID: app.ApplicationID, Reason: err.Error(), }) partition.AddRejectedApplication(schedApp, err.Error()) log.Log(log.SchedContext).Error("Failed to add application to partition (placement rejected)", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), zap.Error(err)) continue } acceptedApps = append(acceptedApps, &si.AcceptedApplication{ ApplicationID: schedApp.ApplicationID, }) log.Log(log.SchedContext).Info("Added application to partition", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), zap.String("requested queue", app.QueueName), zap.String("placed queue", schedApp.GetQueuePath())) } // Respond to RMProxy with accepted and rejected apps if needed if len(rejectedApps) > 0 || len(acceptedApps) > 0 { cc.rmEventHandler.HandleEvent( &rmevent.RMApplicationUpdateEvent{ RmID: request.RmID, AcceptedApplications: acceptedApps, RejectedApplications: rejectedApps, }) } // Update metrics with removed applications if len(request.Remove) > 0 { for _, app := range request.Remove { partition := cc.GetPartition(app.PartitionName) if partition == nil { continue } allocations := partition.removeApplication(app.ApplicationID) if len(allocations) > 0 { cc.notifyRMAllocationReleased(partition.RmID, partition.Name, allocations, si.TerminationType_STOPPED_BY_RM, fmt.Sprintf("Application %s Removed", app.ApplicationID)) } log.Log(log.SchedContext).Info("Application removed from partition", zap.String("applicationID", app.ApplicationID), zap.String("partitionName", app.PartitionName), zap.Int("allocations released", len(allocations))) } } } func (cc *ClusterContext) NeedPreemption() bool { cc.RLock() defer cc.RUnlock() return cc.needPreemption } // Callback from the partition manager to finalise the removal of the partition func (cc *ClusterContext) removePartition(partitionName string) { cc.Lock() defer cc.Unlock() delete(cc.partitions, partitionName) } // addNode adds a new node to the cluster enforcing just one unlimited node in the cluster. // nil nodeInfo objects must be filtered out before calling this function func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo, schedulable bool) error { sn := objects.NewNode(nodeInfo) sn.SetSchedulable(schedulable) partition := cc.GetPartition(sn.Partition) if partition == nil { err := fmt.Errorf("failed to find partition %s for new node %s", sn.Partition, sn.NodeID) //nolint:godox //TODO: assess impact of partition metrics (this never hit the partition) metrics.GetSchedulerMetrics().IncFailedNodes() log.Log(log.SchedContext).Error("Failed to add node to non existing partition", zap.String("nodeID", sn.NodeID), zap.String("partitionName", sn.Partition)) return err } err := partition.AddNode(sn) sn.SendNodeAddedEvent() if err != nil { wrapped := errors.Join(errors.New("failure while adding new node, node rejected with error: "), err) log.Log(log.SchedContext).Error("Failed to add node to partition (rejected)", zap.String("nodeID", sn.NodeID), zap.String("partitionName", sn.Partition), zap.Error(err)) return wrapped } if !sn.IsSchedulable() { metrics.GetSchedulerMetrics().IncDrainingNodes() } log.Log(log.SchedContext).Info("successfully added node", zap.String("nodeID", sn.NodeID), zap.String("partition", sn.Partition), zap.Bool("schedulable", sn.IsSchedulable())) return nil } // updateNode updates an existing node of the cluster. // nil nodeInfo objects must be filtered out before calling this function func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) { var partition *PartitionContext if p, ok := nodeInfo.Attributes[siCommon.NodePartition]; ok { partition = cc.GetPartition(p) } else { log.Log(log.SchedContext).Error("node partition not specified", zap.String("nodeID", nodeInfo.NodeID), zap.Stringer("nodeAction", nodeInfo.Action)) return } if partition == nil { log.Log(log.SchedContext).Error("Failed to update node on non existing partition", zap.String("nodeID", nodeInfo.NodeID), zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]), zap.Stringer("nodeAction", nodeInfo.Action)) return } node := partition.GetNode(nodeInfo.NodeID) if node == nil { log.Log(log.SchedContext).Error("Failed to update non existing node", zap.String("nodeID", nodeInfo.NodeID), zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]), zap.Stringer("nodeAction", nodeInfo.Action)) return } switch nodeInfo.Action { case si.NodeInfo_UPDATE: if sr := nodeInfo.SchedulableResource; sr != nil { partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr))) } case si.NodeInfo_DRAIN_NODE: if node.IsSchedulable() { // set the state to not schedulable node.SetSchedulable(false) metrics.GetSchedulerMetrics().IncDrainingNodes() } case si.NodeInfo_DRAIN_TO_SCHEDULABLE: if !node.IsSchedulable() { metrics.GetSchedulerMetrics().DecDrainingNodes() // set the state to schedulable node.SetSchedulable(true) } case si.NodeInfo_DECOMISSION: if !node.IsSchedulable() { metrics.GetSchedulerMetrics().DecDrainingNodes() } metrics.GetSchedulerMetrics().IncTotalDecommissionedNodes() // set the state to not schedulable then tell the partition to clean up node.SetSchedulable(false) released, confirmed := partition.removeNode(node.NodeID) node.SendNodeRemovedEvent() // notify the shim allocations have been released from node if len(released) != 0 { cc.notifyRMAllocationReleased(partition.RmID, partition.Name, released, si.TerminationType_STOPPED_BY_RM, fmt.Sprintf("Node %s Removed", node.NodeID)) } for _, confirm := range confirmed { cc.notifyRMNewAllocation(partition.RmID, confirm) } default: log.Log(log.SchedContext).Debug("unknown action for node update", zap.String("nodeID", nodeInfo.NodeID), zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]), zap.Stringer("nodeAction", nodeInfo.Action)) } } // Process an ask and allocation update request. // - Add new allocations for the application(s). // - Add new asks and remove released asks for the application(s). // - Release allocations for the application(s). // Lock free call, all updates occur on the underlying application which is locked or via events. func (cc *ClusterContext) handleRMUpdateAllocationEvent(event *rmevent.RMUpdateAllocationEvent) { request := event.Request if len(request.Allocations) != 0 { cc.processAllocations(request) } if request.Releases != nil { if len(request.Releases.AllocationsToRelease) > 0 { cc.processAllocationReleases(request.Releases.AllocationsToRelease, request.RmID) } } } func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) { // Send rejected allocations back to RM rejectedAllocs := make([]*si.RejectedAllocation, 0) // Send to scheduler for _, siAlloc := range request.Allocations { // try to get partition partition := cc.GetPartition(siAlloc.PartitionName) if partition == nil { msg := fmt.Sprintf("Failed to find partition %s, for application %s and allocation %s", siAlloc.PartitionName, siAlloc.ApplicationID, siAlloc.AllocationKey) log.Log(log.SchedContext).Error("Invalid allocation add requested by shim, partition not found", zap.String("partition", siAlloc.PartitionName), zap.String("nodeID", siAlloc.NodeID), zap.String("applicationID", siAlloc.ApplicationID), zap.String("allocationKey", siAlloc.AllocationKey)) rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{ AllocationKey: siAlloc.AllocationKey, ApplicationID: siAlloc.ApplicationID, Reason: msg, }) continue } alloc := objects.NewAllocationFromSI(siAlloc) _, newAlloc, err := partition.UpdateAllocation(alloc) if err != nil { rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{ AllocationKey: siAlloc.AllocationKey, ApplicationID: siAlloc.ApplicationID, Reason: err.Error(), }) log.Log(log.SchedContext).Error("Invalid allocation update requested by shim", zap.String("partition", siAlloc.PartitionName), zap.String("nodeID", siAlloc.NodeID), zap.String("applicationID", siAlloc.ApplicationID), zap.String("allocationKey", siAlloc.AllocationKey), zap.Error(err)) continue } // at some point, we may need to handle new requests as well if newAlloc && !alloc.IsForeign() { cc.notifyRMNewAllocation(request.RmID, alloc) } } // Reject allocs returned to RM proxy for the apps and partitions not found if len(rejectedAllocs) > 0 { cc.rmEventHandler.HandleEvent(&rmevent.RMRejectedAllocationEvent{ RmID: request.RmID, RejectedAllocations: rejectedAllocs, }) } } func (cc *ClusterContext) processAllocationReleases(releases []*si.AllocationRelease, rmID string) { for _, toRelease := range releases { partition := cc.GetPartition(toRelease.PartitionName) if partition != nil { allocs, confirmed := partition.removeAllocation(toRelease) // notify the RM of the exact released allocations if len(allocs) > 0 { cc.notifyRMAllocationReleased(rmID, partition.Name, allocs, si.TerminationType_STOPPED_BY_RM, "allocation remove as per RM request") } // notify the RM of the confirmed allocations (placeholder swap & preemption) if confirmed != nil { cc.notifyRMNewAllocation(rmID, confirmed) } } } } // Create a RM update event to notify RM of new allocations // Lock free call, all updates occur via events. func (cc *ClusterContext) notifyRMNewAllocation(rmID string, alloc *objects.Allocation) { c := make(chan *rmevent.Result) // communicate the allocation to the RM synchronously cc.rmEventHandler.HandleEvent(&rmevent.RMNewAllocationsEvent{ Allocations: []*si.Allocation{alloc.NewSIFromAllocation()}, RmID: rmID, Channel: c, }) // Wait from channel result := <-c if result.Succeeded { log.Log(log.SchedContext).Debug("Successfully synced shim on new allocation. response: " + result.Reason) } else { log.Log(log.SchedContext).Info("failed to sync shim on new allocation", zap.String("Allocation key: ", alloc.GetAllocationKey())) } } // Create a RM update event to notify RM of released allocations // Lock free call, all updates occur via events. func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, partitionName string, released []*objects.Allocation, terminationType si.TerminationType, message string) { c := make(chan *rmevent.Result) releaseEvent := &rmevent.RMReleaseAllocationEvent{ ReleasedAllocations: make([]*si.AllocationRelease, 0), RmID: rmID, Channel: c, } for _, alloc := range released { releaseEvent.ReleasedAllocations = append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{ ApplicationID: alloc.GetApplicationID(), PartitionName: partitionName, TerminationType: terminationType, Message: message, AllocationKey: alloc.GetAllocationKey(), }) } cc.rmEventHandler.HandleEvent(releaseEvent) // Wait from channel result := <-c if result.Succeeded { log.Log(log.SchedContext).Debug("Successfully synced shim on released allocations. response: " + result.Reason) } else { log.Log(log.SchedContext).Info("failed to sync shim on released allocations") } } // Get a scheduling node based on its name from the partition. // Returns nil if the partition or node cannot be found. // Visible for tests func (cc *ClusterContext) GetNode(nodeID, partitionName string) *objects.Node { cc.Lock() defer cc.Unlock() partition := cc.partitions[partitionName] if partition == nil { log.Log(log.SchedContext).Info("partition not found for scheduling node", zap.String("nodeID", nodeID), zap.String("partitionName", partitionName)) return nil } return partition.GetNode(nodeID) } func (cc *ClusterContext) SetRMInfo(rmID string, rmBuildInformation map[string]string) { if cc.rmInfo == nil { cc.rmInfo = make(map[string]*RMInformation) } buildInfo := make(map[string]string) for k, v := range rmBuildInformation { buildInfo[k] = v } buildInfo["rmId"] = rmID cc.rmInfo[rmID] = &RMInformation{ RMBuildInformation: buildInfo, } } func (cc *ClusterContext) GetLastHealthCheckResult() *dao.SchedulerHealthDAOInfo { cc.RLock() defer cc.RUnlock() return cc.lastHealthCheckResult } func (cc *ClusterContext) SetLastHealthCheckResult(c *dao.SchedulerHealthDAOInfo) { cc.Lock() defer cc.Unlock() cc.lastHealthCheckResult = c } func (cc *ClusterContext) GetUUID() string { return cc.uuid } func (cc *ClusterContext) Stop() { log.Log(log.SchedContext).Info("Stopping background services of partitions") for _, part := range cc.GetPartitionMapClone() { part.partitionManager.Stop() part.userGroupCache.Stop() } }