func()

in pkg/scheduler/context.go [487:574]


func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdateApplicationEvent) {
	request := event.Request
	if len(request.New) == 0 && len(request.Remove) == 0 {
		return
	}
	acceptedApps := make([]*si.AcceptedApplication, 0)
	rejectedApps := make([]*si.RejectedApplication, 0)

	for _, app := range request.New {
		partition := cc.GetPartition(app.PartitionName)
		if partition == nil {
			msg := fmt.Sprintf("Failed to add application %s to partition %s, partition doesn't exist", app.ApplicationID, app.PartitionName)
			rejectedApps = append(rejectedApps, &si.RejectedApplication{
				ApplicationID: app.ApplicationID,
				Reason:        msg,
			})
			log.Log(log.SchedContext).Error("Failed to add application to non existing partition",
				zap.String("applicationID", app.ApplicationID),
				zap.String("partitionName", app.PartitionName))
			continue
		}
		// convert and resolve the user: cache can be set per partition
		// need to do this before we create the application
		ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags))
		if err != nil {
			rejectedApps = append(rejectedApps, &si.RejectedApplication{
				ApplicationID: app.ApplicationID,
				Reason:        err.Error(),
			})
			partition.AddRejectedApplication(objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID), err.Error())
			log.Log(log.SchedContext).Error("Failed to add application to partition (user rejected)",
				zap.String("applicationID", app.ApplicationID),
				zap.String("partitionName", app.PartitionName),
				zap.Error(err))
			continue
		}
		// create a new app object and add it to the partition (partition logs details)
		schedApp := objects.NewApplication(app, ugi, cc.rmEventHandler, request.RmID)
		if err = partition.AddApplication(schedApp); err != nil {
			rejectedApps = append(rejectedApps, &si.RejectedApplication{
				ApplicationID: app.ApplicationID,
				Reason:        err.Error(),
			})
			partition.AddRejectedApplication(schedApp, err.Error())
			log.Log(log.SchedContext).Error("Failed to add application to partition (placement rejected)",
				zap.String("applicationID", app.ApplicationID),
				zap.String("partitionName", app.PartitionName),
				zap.Error(err))
			continue
		}
		acceptedApps = append(acceptedApps, &si.AcceptedApplication{
			ApplicationID: schedApp.ApplicationID,
		})
		log.Log(log.SchedContext).Info("Added application to partition",
			zap.String("applicationID", app.ApplicationID),
			zap.String("partitionName", app.PartitionName),
			zap.String("requested queue", app.QueueName),
			zap.String("placed queue", schedApp.GetQueuePath()))
	}

	// Respond to RMProxy with accepted and rejected apps if needed
	if len(rejectedApps) > 0 || len(acceptedApps) > 0 {
		cc.rmEventHandler.HandleEvent(
			&rmevent.RMApplicationUpdateEvent{
				RmID:                 request.RmID,
				AcceptedApplications: acceptedApps,
				RejectedApplications: rejectedApps,
			})
	}
	// Update metrics with removed applications
	if len(request.Remove) > 0 {
		for _, app := range request.Remove {
			partition := cc.GetPartition(app.PartitionName)
			if partition == nil {
				continue
			}
			allocations := partition.removeApplication(app.ApplicationID)
			if len(allocations) > 0 {
				cc.notifyRMAllocationReleased(partition.RmID, partition.Name, allocations, si.TerminationType_STOPPED_BY_RM,
					fmt.Sprintf("Application %s Removed", app.ApplicationID))
			}
			log.Log(log.SchedContext).Info("Application removed from partition",
				zap.String("applicationID", app.ApplicationID),
				zap.String("partitionName", app.PartitionName),
				zap.Int("allocations released", len(allocations)))
		}
	}
}