func()

in pkg/callback/scheduler_callback.go [47:105]


func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationResponse) error {
	log.Log(log.ShimRMCallback).Debug("UpdateAllocation callback received",
		zap.Stringer("UpdateAllocationResponse", response))
	// handle new allocations
	for _, alloc := range response.New {
		// got allocation for pod, bind pod to the scheduled node
		log.Log(log.ShimRMCallback).Debug("callback: response to new allocation",
			zap.String("allocationKey", alloc.AllocationKey),
			zap.String("UUID", alloc.UUID),
			zap.String("applicationID", alloc.ApplicationID),
			zap.String("nodeID", alloc.NodeID))

		// update cache
		if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil {
			return err
		}
		if app := callback.context.GetApplication(alloc.ApplicationID); app != nil {
			ev := cache.NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID)
			dispatcher.Dispatch(ev)
		}
	}

	for _, reject := range response.Rejected {
		// request rejected by the scheduler, put it back and try scheduling again
		log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation",
			zap.String("allocationKey", reject.AllocationKey))
		if app := callback.context.GetApplication(reject.ApplicationID); app != nil {
			dispatcher.Dispatch(cache.NewRejectTaskEvent(app.GetApplicationID(), reject.AllocationKey,
				fmt.Sprintf("task %s from application %s is rejected by scheduler",
					reject.AllocationKey, reject.ApplicationID)))
		}
	}

	for _, release := range response.Released {
		log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
			zap.String("UUID", release.UUID))

		// update cache
		callback.context.ForgetPod(release.GetAllocationKey())

		// TerminationType 0 mean STOPPED_BY_RM
		if release.TerminationType != si.TerminationType_STOPPED_BY_RM {
			// send release app allocation to application states machine
			ev := cache.NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.UUID)
			dispatcher.Dispatch(ev)
		}
	}

	for _, ask := range response.ReleasedAsks {
		log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
			zap.String("allocation key", ask.AllocationKey))

		if ask.TerminationType == si.TerminationType_TIMEOUT {
			ev := cache.NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, ask.AllocationKey)
			dispatcher.Dispatch(ev)
		}
	}
	return nil
}