in pkg/scheduler/context.go [487:574]
func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdateApplicationEvent) {
request := event.Request
if len(request.New) == 0 && len(request.Remove) == 0 {
return
}
acceptedApps := make([]*si.AcceptedApplication, 0)
rejectedApps := make([]*si.RejectedApplication, 0)
for _, app := range request.New {
partition := cc.GetPartition(app.PartitionName)
if partition == nil {
msg := fmt.Sprintf("Failed to add application %s to partition %s, partition doesn't exist", app.ApplicationID, app.PartitionName)
rejectedApps = append(rejectedApps, &si.RejectedApplication{
ApplicationID: app.ApplicationID,
Reason: msg,
})
log.Log(log.SchedContext).Error("Failed to add application to non existing partition",
zap.String("applicationID", app.ApplicationID),
zap.String("partitionName", app.PartitionName))
continue
}
// convert and resolve the user: cache can be set per partition
// need to do this before we create the application
ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags))
if err != nil {
rejectedApps = append(rejectedApps, &si.RejectedApplication{
ApplicationID: app.ApplicationID,
Reason: err.Error(),
})
partition.AddRejectedApplication(objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID), err.Error())
log.Log(log.SchedContext).Error("Failed to add application to partition (user rejected)",
zap.String("applicationID", app.ApplicationID),
zap.String("partitionName", app.PartitionName),
zap.Error(err))
continue
}
// create a new app object and add it to the partition (partition logs details)
schedApp := objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID)
if err = partition.AddApplication(schedApp); err != nil {
rejectedApps = append(rejectedApps, &si.RejectedApplication{
ApplicationID: app.ApplicationID,
Reason: err.Error(),
})
partition.AddRejectedApplication(schedApp, err.Error())
log.Log(log.SchedContext).Error("Failed to add application to partition (placement rejected)",
zap.String("applicationID", app.ApplicationID),
zap.String("partitionName", app.PartitionName),
zap.Error(err))
continue
}
acceptedApps = append(acceptedApps, &si.AcceptedApplication{
ApplicationID: schedApp.ApplicationID,
})
log.Log(log.SchedContext).Info("Added application to partition",
zap.String("applicationID", app.ApplicationID),
zap.String("partitionName", app.PartitionName),
zap.String("requested queue", app.QueueName),
zap.String("placed queue", schedApp.GetQueuePath()))
}
// Respond to RMProxy with accepted and rejected apps if needed
if len(rejectedApps) > 0 || len(acceptedApps) > 0 {
cc.rmEventHandler.HandleEvent(
&rmevent.RMApplicationUpdateEvent{
RmID: request.RmID,
AcceptedApplications: acceptedApps,
RejectedApplications: rejectedApps,
})
}
// Update metrics with removed applications
if len(request.Remove) > 0 {
for _, app := range request.Remove {
partition := cc.GetPartition(app.PartitionName)
if partition == nil {
continue
}
allocations := partition.removeApplication(app.ApplicationID)
if len(allocations) > 0 {
cc.notifyRMAllocationReleased(partition.RmID, partition.Name, allocations, si.TerminationType_STOPPED_BY_RM,
fmt.Sprintf("Application %s Removed", app.ApplicationID))
}
log.Log(log.SchedContext).Info("Application removed from partition",
zap.String("applicationID", app.ApplicationID),
zap.String("partitionName", app.PartitionName),
zap.Int("allocations released", len(allocations)))
}
}
}