in pkg/callback/scheduler_callback.go [47:105]
func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationResponse) error {
log.Log(log.ShimRMCallback).Debug("UpdateAllocation callback received",
zap.Stringer("UpdateAllocationResponse", response))
// handle new allocations
for _, alloc := range response.New {
// got allocation for pod, bind pod to the scheduled node
log.Log(log.ShimRMCallback).Debug("callback: response to new allocation",
zap.String("allocationKey", alloc.AllocationKey),
zap.String("UUID", alloc.UUID),
zap.String("applicationID", alloc.ApplicationID),
zap.String("nodeID", alloc.NodeID))
// update cache
if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil {
return err
}
if app := callback.context.GetApplication(alloc.ApplicationID); app != nil {
ev := cache.NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID)
dispatcher.Dispatch(ev)
}
}
for _, reject := range response.Rejected {
// request rejected by the scheduler, put it back and try scheduling again
log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation",
zap.String("allocationKey", reject.AllocationKey))
if app := callback.context.GetApplication(reject.ApplicationID); app != nil {
dispatcher.Dispatch(cache.NewRejectTaskEvent(app.GetApplicationID(), reject.AllocationKey,
fmt.Sprintf("task %s from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}
}
for _, release := range response.Released {
log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
zap.String("UUID", release.UUID))
// update cache
callback.context.ForgetPod(release.GetAllocationKey())
// TerminationType 0 mean STOPPED_BY_RM
if release.TerminationType != si.TerminationType_STOPPED_BY_RM {
// send release app allocation to application states machine
ev := cache.NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.UUID)
dispatcher.Dispatch(ev)
}
}
for _, ask := range response.ReleasedAsks {
log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
zap.String("allocation key", ask.AllocationKey))
if ask.TerminationType == si.TerminationType_TIMEOUT {
ev := cache.NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, ask.AllocationKey)
dispatcher.Dispatch(ev)
}
}
return nil
}