shardingsphere-operator/pkg/reconcile/chaos/job.go (279 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package chaos import ( "fmt" "reflect" "strconv" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/container" v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( DefaultImageName = "agoiyanzsa/tools-runtime:2.0" DefaultContainerName = "tools-runtime" DefaultWorkPath = "/app/start" DefaultConfigName = "cmd-conf" ) var ( DefaultTTLSecondsAfterFinished int32 = 300 ) var DefaultFileMode int32 = 493 const ( AnnoJobCompletions = "jobs.batch/completions" AnnoJobActiveDeadlineSeconds = "jobs.batch/activeDeadlineSeconds" AnnoJobParallelism = "job.batch/parallelism" AnnoJobBackoffLimit = "job.batch/backoffLimit" AnnoJobTTLSecondsAfterFinished = "job.batch/ttlSecondsAfterFinished" AnnoJobSuspend = "job.batch/suspend" ) type JobType string var ( InSteady JobType = "steady" InChaos JobType = "chaos" ) func MakeJobName(name string, requirement JobType) string { return fmt.Sprintf("%s-%s", name, string(requirement)) } func NewJob(ssChaos *v1alpha1.Chaos, requirement JobType) (*v1.Job, error) { jbd := NewJobBuilder() jbd.SetNamespace(ssChaos.Namespace).SetLabels(ssChaos.Labels).SetName(MakeJobName(ssChaos.Name, requirement)) c, _ := MustInt32(ssChaos.Annotations[AnnoJobCompletions]) jbd.SetCompletions(c) c, _ = MustInt32(ssChaos.Annotations[AnnoJobParallelism]) jbd.SetParallelism(c) c, _ = MustInt32(ssChaos.Annotations[AnnoJobBackoffLimit]) jbd.SetBackoffLimit(c) c, _ = MustInt32(ssChaos.Annotations[AnnoJobTTLSecondsAfterFinished]) jbd.SetTTLSecondsAfterFinished(c) t, _ := MustInt64(ssChaos.Annotations[AnnoJobActiveDeadlineSeconds]) jbd.SetActiveDeadlineSeconds(t) if v, ok := ssChaos.Annotations[AnnoJobSuspend]; ok { if v == "true" { jbd.SetSuspend(true) } if v == "false" { jbd.SetSuspend(false) } } v := &corev1.Volume{Name: DefaultConfigName} v.ConfigMap = &corev1.ConfigMapVolumeSource{} v.ConfigMap.LocalObjectReference.Name = ssChaos.Name v.ConfigMap.DefaultMode = &DefaultFileMode jbd.SetVolumes(v) vm := &corev1.VolumeMount{Name: DefaultConfigName, MountPath: DefaultWorkPath} cbd := container.NewContainerBuilder() cbd.SetImage(DefaultImageName) cbd.SetName(DefaultContainerName) // cbd.SetVolumeMount(vm) cbd.AppendVolumeMounts([]corev1.VolumeMount{*vm}) cbd.SetCommand([]string{"sh", "-c"}) container := cbd.BuildContainer() container.Args = NewCmds(requirement) jbd.SetContainers(container) rjob := jbd.Build() return rjob, nil } func NewCmds(requirement JobType) []string { var cmds []string if requirement == InSteady { cmds = append(cmds, fmt.Sprintf("%s/%s", DefaultWorkPath, configExperimental)) } if requirement == InChaos { cmds = append(cmds, fmt.Sprintf("%s/%s;%s/%s", DefaultWorkPath, configPressure, DefaultWorkPath, configExperimental)) } return cmds } func MustInt32(s string) (int32, error) { v, err := strconv.ParseInt(s, 10, 32) if err != nil { return 0, err } return int32(v), nil } func MustInt64(s string) (int64, error) { v, err := strconv.ParseInt(s, 10, 64) if err != nil { return 0, err } return int64(v), nil } func IsJobChanged(ssChaos *v1alpha1.Chaos, requirement JobType, cur *v1.Job) (bool, error) { now, err := NewJob(ssChaos, requirement) if err != nil { return false, err } isEqual := judgeJobEqual(cur, now) if isEqual { return true, nil } return false, nil } func judgeJobEqual(now *v1.Job, exp *v1.Job) bool { if !judgeJobConfigEqual(now, exp) { return false } if !judgeContainerEqual(&now.Spec.Template.Spec.Containers[0], &exp.Spec.Template.Spec.Containers[0]) { return false } return true } func judgeJobConfigEqual(now *v1.Job, exp *v1.Job) bool { if !judgeTTLSecondsAfterFinished(now.Spec.TTLSecondsAfterFinished, exp.Spec.TTLSecondsAfterFinished) { return false } if exp.Spec.BackoffLimit != nil && *now.Spec.BackoffLimit != *exp.Spec.BackoffLimit { return false } if exp.Spec.Suspend != nil && *now.Spec.Suspend != *exp.Spec.Suspend { return false } if exp.Spec.Parallelism != nil && *now.Spec.Parallelism != *exp.Spec.Parallelism { return false } if exp.Spec.Completions != nil && *now.Spec.Completions != *exp.Spec.Completions { return false } if !judgeActiveDeadlineSeconds(now.Spec.ActiveDeadlineSeconds, exp.Spec.ActiveDeadlineSeconds) { return false } return true } func judgeTTLSecondsAfterFinished(cur *int32, exp *int32) bool { if cur == nil && exp == nil { return true } if cur != nil && exp != nil { if *cur == *exp { return true } } return false } func judgeActiveDeadlineSeconds(cur *int64, exp *int64) bool { if exp != nil && *cur != *exp { return false } return true } func judgeContainerEqual(now *corev1.Container, exp *corev1.Container) bool { if now.Name != exp.Name { return false } if !reflect.DeepEqual(now.Command, exp.Command) { return false } if !reflect.DeepEqual(now.Args, exp.Args) { return false } if now.Image != exp.Image { return false } if !reflect.DeepEqual(now.VolumeMounts, now.VolumeMounts) { return false } return true } type JobBuilder interface { SetName(string) JobBuilder SetNamespace(string) JobBuilder SetLabels(map[string]string) JobBuilder SetCompletions(int32) JobBuilder SetActiveDeadlineSeconds(int64) JobBuilder SetParallelism(int32) JobBuilder SetBackoffLimit(int32) JobBuilder SetContainers(*corev1.Container) JobBuilder SetTTLSecondsAfterFinished(int32) JobBuilder SetSuspend(bool) JobBuilder SetVolumes(*corev1.Volume) JobBuilder Build() *v1.Job } func NewJobBuilder() JobBuilder { return &jobBuilder{ defaultJob(), } } type jobBuilder struct { job *v1.Job } func (j *jobBuilder) SetName(name string) JobBuilder { j.job.ObjectMeta.Name = name return j } func (j *jobBuilder) SetNamespace(namespace string) JobBuilder { j.job.ObjectMeta.Namespace = namespace return j } func (j *jobBuilder) SetLabels(labels map[string]string) JobBuilder { j.job.ObjectMeta.Labels = labels return j } func (j *jobBuilder) SetCompletions(i int32) JobBuilder { j.job.Spec.Completions = &i return j } func (j *jobBuilder) SetActiveDeadlineSeconds(i int64) JobBuilder { j.job.Spec.ActiveDeadlineSeconds = &i return j } func (j *jobBuilder) SetParallelism(i int32) JobBuilder { j.job.Spec.Parallelism = &i return j } func (j *jobBuilder) SetBackoffLimit(i int32) JobBuilder { j.job.Spec.BackoffLimit = &i return j } func (j *jobBuilder) SetContainers(container *corev1.Container) JobBuilder { if j.job.Spec.Template.Spec.Containers == nil { j.job.Spec.Template.Spec.Containers = []corev1.Container{*container} } for i := range j.job.Spec.Template.Spec.Containers { if j.job.Spec.Template.Spec.Containers[i].Name == DefaultContainerName { j.job.Spec.Template.Spec.Containers[i] = *container return j } } j.job.Spec.Template.Spec.Containers = append(j.job.Spec.Template.Spec.Containers, *container) return j } func (j *jobBuilder) SetTTLSecondsAfterFinished(i int32) JobBuilder { ret := i j.job.Spec.TTLSecondsAfterFinished = &ret return j } func (j *jobBuilder) SetSuspend(b bool) JobBuilder { j.job.Spec.Suspend = &b return j } func (j *jobBuilder) SetVolumes(volume *corev1.Volume) JobBuilder { if j.job.Spec.Template.Spec.Volumes == nil || len(j.job.Spec.Template.Spec.Volumes) == 0 { j.job.Spec.Template.Spec.Volumes = []corev1.Volume{} } for i := range j.job.Spec.Template.Spec.Volumes { if j.job.Spec.Template.Spec.Volumes[i].Name == volume.Name { j.job.Spec.Template.Spec.Volumes[i] = *volume return j } } j.job.Spec.Template.Spec.Volumes = append(j.job.Spec.Template.Spec.Volumes, *volume) return j } func (j *jobBuilder) Build() *v1.Job { return j.job } func defaultJob() *v1.Job { return &v1.Job{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "shardingsphere-proxy", }, Spec: v1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{}, RestartPolicy: corev1.RestartPolicyOnFailure, }, }, }, } }