func()

in pkg/scheduler/partition.go [1149:1317]


func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation) (requestCreated bool, allocCreated bool, err error) { //nolint:funlen
	// cannot do anything with a nil alloc, should only happen if the shim broke things badly
	if alloc == nil {
		return false, false, nil
	}
	if pc.isStopped() {
		return false, false, fmt.Errorf("partition %s is stopped; cannot process allocation %s", pc.Name, alloc.GetAllocationKey())
	}

	allocationKey := alloc.GetAllocationKey()
	applicationID := alloc.GetApplicationID()
	nodeID := alloc.GetNodeID()
	node := pc.GetNode(alloc.GetNodeID())

	log.Log(log.SchedPartition).Info("processing allocation",
		zap.String("partitionName", pc.Name),
		zap.String("appID", applicationID),
		zap.String("allocationKey", allocationKey))

	if alloc.IsForeign() {
		return pc.handleForeignAllocation(allocationKey, applicationID, nodeID, node, alloc)
	}

	// find application
	app := pc.getApplication(alloc.GetApplicationID())
	if app == nil {
		metrics.GetSchedulerMetrics().IncSchedulingError()
		return false, false, fmt.Errorf("failed to find application %s", applicationID)
	}
	queue := app.GetQueue()

	// find node if one is specified
	allocated := alloc.IsAllocated()
	if allocated {
		if node == nil {
			metrics.GetSchedulerMetrics().IncSchedulingError()
			return false, false, fmt.Errorf("failed to find node %s", nodeID)
		}
	}

	res := alloc.GetAllocatedResource()
	if resources.IsZero(res) {
		metrics.GetSchedulerMetrics().IncSchedulingError()
		return false, false, fmt.Errorf("allocation contains no resources")
	}
	if !resources.StrictlyGreaterThanZero(res) {
		metrics.GetSchedulerMetrics().IncSchedulingError()
		return false, false, fmt.Errorf("allocation contains negative resources")
	}

	// check to see if allocation exists already on app
	existing := app.GetAllocationAsk(allocationKey)

	// handle new allocation
	if existing == nil {
		// new request
		if node == nil {
			log.Log(log.SchedPartition).Info("handling new request",
				zap.String("partitionName", pc.Name),
				zap.String("appID", applicationID),
				zap.String("allocationKey", allocationKey))

			if err := app.AddAllocationAsk(alloc); err != nil {
				log.Log(log.SchedPartition).Info("failed to add request",
					zap.String("partitionName", pc.Name),
					zap.String("appID", applicationID),
					zap.String("allocationKey", allocationKey),
					zap.Error(err))
				return false, false, err
			}

			log.Log(log.SchedPartition).Info("added new request",
				zap.String("partitionName", pc.Name),
				zap.String("appID", applicationID),
				zap.String("allocationKey", allocationKey))
			return true, false, nil
		}

		// new allocation already assigned
		log.Log(log.SchedPartition).Info("handling existing allocation",
			zap.String("partitionName", pc.Name),
			zap.String("appID", applicationID),
			zap.String("allocationKey", allocationKey))

		queue.IncAllocatedResource(res)
		metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
		node.AddAllocation(alloc)
		alloc.SetInstanceType(node.GetInstanceType())
		app.RecoverAllocationAsk(alloc)
		app.AddAllocation(alloc)
		pc.updateAllocationCount(1)
		if alloc.IsPlaceholder() {
			pc.incPhAllocationCount()
		}

		log.Log(log.SchedPartition).Info("added existing allocation",
			zap.String("partitionName", pc.Name),
			zap.String("appID", applicationID),
			zap.String("allocationKey", allocationKey),
			zap.Bool("placeholder", alloc.IsPlaceholder()))
		return false, true, nil
	}

	var existingNode *objects.Node = nil
	if existing.IsAllocated() {
		existingNode = pc.GetNode(existing.GetNodeID())
		if existingNode == nil {
			metrics.GetSchedulerMetrics().IncSchedulingError()
			return false, false, fmt.Errorf("failed to find node %s", existing.GetNodeID())
		}
	}

	// since this is an update, check for resource change and process that first
	existingResource := existing.GetAllocatedResource().Clone()
	newResource := res.Clone()
	delta := resources.Sub(newResource, existingResource)
	delta.Prune()
	if !resources.IsZero(delta) && !resources.IsZero(newResource) {
		// resources have changed, update them on application, which also handles queue and user tracker updates
		if err := app.UpdateAllocationResources(alloc); err != nil {
			metrics.GetSchedulerMetrics().IncSchedulingError()
			return false, false, fmt.Errorf("cannot update alloc resources on application %s: %v ",
				alloc.GetApplicationID(), err)
		}

		// update node if allocation was previously allocated
		if existingNode != nil {
			existingNode.UpdateAllocatedResource(delta)
		}
	}

	// transitioning from requested to allocated
	if !existing.IsAllocated() && allocated {
		log.Log(log.SchedPartition).Info("handling allocation placement",
			zap.String("partitionName", pc.Name),
			zap.String("appID", applicationID),
			zap.String("allocationKey", allocationKey))

		existing.SetNodeID(nodeID)
		existing.SetBindTime(alloc.GetBindTime())
		if _, err := app.AllocateAsk(allocationKey); err != nil {
			log.Log(log.SchedPartition).Info("failed to allocate ask for allocation placement",
				zap.String("partitionName", pc.Name),
				zap.String("appID", applicationID),
				zap.String("allocationKey", allocationKey),
				zap.Error(err))
			return false, false, err
		}

		queue.IncAllocatedResource(alloc.GetAllocatedResource())
		metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
		node.AddAllocation(existing)
		existing.SetInstanceType(node.GetInstanceType())
		app.AddAllocation(existing)
		pc.updateAllocationCount(1)
		if existing.IsPlaceholder() {
			pc.incPhAllocationCount()
		}

		log.Log(log.SchedPartition).Info("external allocation placed",
			zap.String("partitionName", pc.Name),
			zap.String("appID", applicationID),
			zap.String("allocationKey", allocationKey),
			zap.Bool("placeholder", alloc.IsPlaceholder()))
		return false, true, nil
	}

	return false, false, nil
}