in deploy/kubernetes/operator/pkg/webhook/inspector/rss.go [39:102]
func (i *inspector) validateRSS(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionReview {
if !i.ignoreRSS && ar.Request.Operation == admissionv1.Update {
oldRSS := &unifflev1alpha1.RemoteShuffleService{}
if err := json.Unmarshal(ar.Request.OldObject.Raw, oldRSS); err != nil {
klog.Errorf("unmarshal old object of rss (%v) failed: %v",
string(ar.Request.OldObject.Raw), err)
return util.AdmissionReviewFailed(ar, err)
}
// for security purposes, we forbid updating rss objects when they are in upgrading phase.
// generally speaking, we should also deny updating when rss is terminating. However, it would introduce more
// complexity and controller's current terminating logic can tolerate the rss object update.
if oldRSS.Status.Phase == unifflev1alpha1.RSSUpgrading {
message := "can not update upgrading rss object: " + utils.UniqueName(oldRSS)
return util.AdmissionReviewForbidden(ar, message)
}
}
newRSS := &unifflev1alpha1.RemoteShuffleService{}
if err := json.Unmarshal(ar.Request.Object.Raw, newRSS); err != nil {
klog.Errorf("unmarshal object of rss (%v) failed: %v",
string(ar.Request.Object.Raw), err)
return util.AdmissionReviewFailed(ar, err)
}
if err := validateRuntimeClassNames(newRSS, i.kubeClient); err != nil {
klog.Errorf("validate runtime class of rss (%v) failed: %v",
utils.UniqueName(newRSS), err)
return util.AdmissionReviewFailed(ar, err)
}
if err := validateCoordinator(newRSS.Spec.Coordinator); err != nil {
klog.Errorf("validate coordinator config of rss (%v) failed: %v",
utils.UniqueName(newRSS), err)
return util.AdmissionReviewFailed(ar, err)
}
// validate configurations of logHostPath for shuffle servers.
shuffleServerLogPath := newRSS.Spec.ShuffleServer.LogHostPath
if len(shuffleServerLogPath) > 0 && len(newRSS.Spec.ShuffleServer.HostPathMounts[shuffleServerLogPath]) == 0 {
return util.AdmissionReviewFailed(ar, fmt.Errorf("empty log volume mount path for shuffle servers"))
}
// validate configurations of different upgrade modes for shuffle servers.
upgradeStrategy := newRSS.Spec.ShuffleServer.UpgradeStrategy
switch upgradeStrategy.Type {
case unifflev1alpha1.FullUpgrade:
case unifflev1alpha1.PartitionUpgrade:
var err error
if upgradeStrategy.Partition == nil {
err = fmt.Errorf("empty partition for %v", upgradeStrategy.Type)
} else if *upgradeStrategy.Partition <= 0 {
err = fmt.Errorf("invalid partition (%v) for %v", *upgradeStrategy.Partition,
upgradeStrategy.Type)
}
if err != nil {
return util.AdmissionReviewFailed(ar, err)
}
case unifflev1alpha1.SpecificUpgrade:
if len(upgradeStrategy.SpecificNames) == 0 {
return util.AdmissionReviewFailed(ar,
fmt.Errorf("empty specific copies for %v", upgradeStrategy.Type))
}
case unifflev1alpha1.FullRestart:
default:
return util.AdmissionReviewFailed(ar,
fmt.Errorf("invalid upgrade stragety type (%v)", upgradeStrategy.Type))
}
return util.AdmissionReviewAllow(ar)
}