func()

in deploy/kubernetes/operator/pkg/webhook/inspector/rss.go [39:102]


func (i *inspector) validateRSS(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
	if !i.ignoreRSS && ar.Request.Operation == admissionv1.Update {
		oldRSS := &unifflev1alpha1.RemoteShuffleService{}
		if err := json.Unmarshal(ar.Request.OldObject.Raw, oldRSS); err != nil {
			klog.Errorf("unmarshal old object of rss (%v) failed: %v",
				string(ar.Request.OldObject.Raw), err)
			return util.AdmissionReviewFailed(ar, err)
		}
		// for security purposes, we forbid updating rss objects when they are in upgrading phase.
		// generally speaking, we should also deny updating when rss is terminating. However, it would introduce more
		// complexity and controller's current terminating logic can tolerate the rss object update.
		if oldRSS.Status.Phase == unifflev1alpha1.RSSUpgrading {
			message := "can not update upgrading rss object: " + utils.UniqueName(oldRSS)
			return util.AdmissionReviewForbidden(ar, message)
		}
	}
	newRSS := &unifflev1alpha1.RemoteShuffleService{}
	if err := json.Unmarshal(ar.Request.Object.Raw, newRSS); err != nil {
		klog.Errorf("unmarshal object of rss (%v) failed: %v",
			string(ar.Request.Object.Raw), err)
		return util.AdmissionReviewFailed(ar, err)
	}
	if err := validateRuntimeClassNames(newRSS, i.kubeClient); err != nil {
		klog.Errorf("validate runtime class of rss (%v) failed: %v",
			utils.UniqueName(newRSS), err)
		return util.AdmissionReviewFailed(ar, err)
	}
	if err := validateCoordinator(newRSS.Spec.Coordinator); err != nil {
		klog.Errorf("validate coordinator config of rss (%v) failed: %v",
			utils.UniqueName(newRSS), err)
		return util.AdmissionReviewFailed(ar, err)
	}
	// validate configurations of logHostPath for shuffle servers.
	shuffleServerLogPath := newRSS.Spec.ShuffleServer.LogHostPath
	if len(shuffleServerLogPath) > 0 && len(newRSS.Spec.ShuffleServer.HostPathMounts[shuffleServerLogPath]) == 0 {
		return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log volume mount path for shuffle servers"))
	}
	// validate configurations of different upgrade modes for shuffle servers.
	upgradeStrategy := newRSS.Spec.ShuffleServer.UpgradeStrategy
	switch upgradeStrategy.Type {
	case unifflev1alpha1.FullUpgrade:
	case unifflev1alpha1.PartitionUpgrade:
		var err error
		if upgradeStrategy.Partition == nil {
			err = fmt.Errorf("empty partition for %v", upgradeStrategy.Type)
		} else if *upgradeStrategy.Partition <= 0 {
			err = fmt.Errorf("invalid partition (%v) for %v", *upgradeStrategy.Partition,
				upgradeStrategy.Type)
		}
		if err != nil {
			return util.AdmissionReviewFailed(ar, err)
		}
	case unifflev1alpha1.SpecificUpgrade:
		if len(upgradeStrategy.SpecificNames) == 0 {
			return util.AdmissionReviewFailed(ar,
				fmt.Errorf("empty specific copies for %v", upgradeStrategy.Type))
		}
	case unifflev1alpha1.FullRestart:
	default:
		return util.AdmissionReviewFailed(ar,
			fmt.Errorf("invalid upgrade stragety type (%v)", upgradeStrategy.Type))
	}
	return util.AdmissionReviewAllow(ar)
}