in pkg/scheduler/partition.go [866:955]
func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects.AllocationResult {
// find the app make sure it still exists
appID := result.Request.GetApplicationID()
app := pc.getApplication(appID)
if app == nil {
log.Log(log.SchedPartition).Info("Application was removed while allocating",
zap.String("appID", appID))
return nil
}
// find the node make sure it still exists
// if the node was passed in use that ID instead of the one from the allocation
// the node ID is set when a reservation is allocated on a non-reserved node
alloc := result.Request
targetNodeID := result.NodeID
targetNode := pc.GetNode(targetNodeID)
if targetNode == nil {
log.Log(log.SchedPartition).Info("Target node was removed while allocating",
zap.String("nodeID", targetNodeID),
zap.String("appID", appID))
// attempt to deallocate
if alloc.IsAllocated() {
allocKey := alloc.GetAllocationKey()
if _, err := app.DeallocateAsk(allocKey); err != nil {
log.Log(log.SchedPartition).Warn("Failed to unwind allocation",
zap.String("nodeID", targetNodeID),
zap.String("appID", appID),
zap.String("allocationKey", allocKey),
zap.Error(err))
}
}
return nil
}
// reservations were cancelled during the processing
pc.decReservationCount(result.CancelledReservations)
// reservation
if result.ResultType == objects.Reserved {
pc.reserve(app, targetNode, result.Request)
return nil
}
// unreserve
if result.ResultType == objects.Unreserved || result.ResultType == objects.AllocatedReserved {
var reservedNodeID string
if result.ReservedNodeID == "" {
reservedNodeID = result.NodeID
} else {
reservedNodeID = result.ReservedNodeID
log.Log(log.SchedPartition).Debug("Reservation allocated on different node",
zap.String("current node", result.NodeID),
zap.String("reserved node", reservedNodeID),
zap.String("appID", appID))
}
reservedNode := pc.GetNode(reservedNodeID)
if reservedNode != nil {
pc.unReserve(app, reservedNode, result.Request)
} else {
log.Log(log.SchedPartition).Info("Reserved node was removed while allocating",
zap.String("nodeID", reservedNodeID),
zap.String("appID", appID))
}
if result.ResultType == objects.Unreserved {
return nil
}
// remove the link to the reserved node
result.ReservedNodeID = ""
}
alloc.SetBindTime(time.Now())
alloc.SetNodeID(targetNodeID)
alloc.SetInstanceType(targetNode.GetInstanceType())
// track the number of allocations
pc.updateAllocationCount(1)
if result.Request.IsPlaceholder() {
pc.incPhAllocationCount()
}
log.Log(log.SchedPartition).Info("scheduler allocation processed",
zap.String("appID", result.Request.GetApplicationID()),
zap.String("allocationKey", result.Request.GetAllocationKey()),
zap.Stringer("allocatedResource", result.Request.GetAllocatedResource()),
zap.Bool("placeholder", result.Request.IsPlaceholder()),
zap.String("targetNode", targetNodeID))
// pass the allocation result back to the RM via the cluster context
return result
}