in pkg/scheduler/partition.go [1149:1317]
func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation) (requestCreated bool, allocCreated bool, err error) { //nolint:funlen
// cannot do anything with a nil alloc, should only happen if the shim broke things badly
if alloc == nil {
return false, false, nil
}
if pc.isStopped() {
return false, false, fmt.Errorf("partition %s is stopped; cannot process allocation %s", pc.Name, alloc.GetAllocationKey())
}
allocationKey := alloc.GetAllocationKey()
applicationID := alloc.GetApplicationID()
nodeID := alloc.GetNodeID()
node := pc.GetNode(alloc.GetNodeID())
log.Log(log.SchedPartition).Info("processing allocation",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
if alloc.IsForeign() {
return pc.handleForeignAllocation(allocationKey, applicationID, nodeID, node, alloc)
}
// find application
app := pc.getApplication(alloc.GetApplicationID())
if app == nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("failed to find application %s", applicationID)
}
queue := app.GetQueue()
// find node if one is specified
allocated := alloc.IsAllocated()
if allocated {
if node == nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("failed to find node %s", nodeID)
}
}
res := alloc.GetAllocatedResource()
if resources.IsZero(res) {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("allocation contains no resources")
}
if !resources.StrictlyGreaterThanZero(res) {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("allocation contains negative resources")
}
// check to see if allocation exists already on app
existing := app.GetAllocationAsk(allocationKey)
// handle new allocation
if existing == nil {
// new request
if node == nil {
log.Log(log.SchedPartition).Info("handling new request",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
if err := app.AddAllocationAsk(alloc); err != nil {
log.Log(log.SchedPartition).Info("failed to add request",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey),
zap.Error(err))
return false, false, err
}
log.Log(log.SchedPartition).Info("added new request",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
return true, false, nil
}
// new allocation already assigned
log.Log(log.SchedPartition).Info("handling existing allocation",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
queue.IncAllocatedResource(res)
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
node.AddAllocation(alloc)
alloc.SetInstanceType(node.GetInstanceType())
app.RecoverAllocationAsk(alloc)
app.AddAllocation(alloc)
pc.updateAllocationCount(1)
if alloc.IsPlaceholder() {
pc.incPhAllocationCount()
}
log.Log(log.SchedPartition).Info("added existing allocation",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey),
zap.Bool("placeholder", alloc.IsPlaceholder()))
return false, true, nil
}
var existingNode *objects.Node = nil
if existing.IsAllocated() {
existingNode = pc.GetNode(existing.GetNodeID())
if existingNode == nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("failed to find node %s", existing.GetNodeID())
}
}
// since this is an update, check for resource change and process that first
existingResource := existing.GetAllocatedResource().Clone()
newResource := res.Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()
if !resources.IsZero(delta) && !resources.IsZero(newResource) {
// resources have changed, update them on application, which also handles queue and user tracker updates
if err := app.UpdateAllocationResources(alloc); err != nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("cannot update alloc resources on application %s: %v ",
alloc.GetApplicationID(), err)
}
// update node if allocation was previously allocated
if existingNode != nil {
existingNode.UpdateAllocatedResource(delta)
}
}
// transitioning from requested to allocated
if !existing.IsAllocated() && allocated {
log.Log(log.SchedPartition).Info("handling allocation placement",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
existing.SetNodeID(nodeID)
existing.SetBindTime(alloc.GetBindTime())
if _, err := app.AllocateAsk(allocationKey); err != nil {
log.Log(log.SchedPartition).Info("failed to allocate ask for allocation placement",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey),
zap.Error(err))
return false, false, err
}
queue.IncAllocatedResource(alloc.GetAllocatedResource())
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
node.AddAllocation(existing)
existing.SetInstanceType(node.GetInstanceType())
app.AddAllocation(existing)
pc.updateAllocationCount(1)
if existing.IsPlaceholder() {
pc.incPhAllocationCount()
}
log.Log(log.SchedPartition).Info("external allocation placed",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey),
zap.Bool("placeholder", alloc.IsPlaceholder()))
return false, true, nil
}
return false, false, nil
}