shardingsphere-operator/pkg/reconcile/proxy/deployment.go (428 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package proxy import ( "fmt" "reflect" "strconv" "strings" "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) // NewDeployment creates a new Deployment func NewDeployment(ssproxy *v1alpha1.ShardingSphereProxy) *v1.Deployment { return ConstructCascadingDeployment(ssproxy) } const ( // AnnoRollingUpdateMaxSurge refers to Deployment RollingUpdate Strategy AnnoRollingUpdateMaxSurge = "shardingsphereproxy.shardingsphere.org/rolling-update-max-surge" // AnnoRollingUpdateMaxUnavailable refers to Deployment RollingUpdate Strategy AnnoRollingUpdateMaxUnavailable = "shardingsphereproxy.shardingsphere.org/rolling-update-max-unavailable" // miniReadyCount Minimum number of replicas that can be served miniReadyCount = 1 // mysqlConnectorJarVolumeMountName refers to the name of the volume mount for the mysql connector jar mysqlConnectorJarVolumeMountName = "mysql-connector-jar" // downloadMySQLConnectorJarContainerName refers to the name of the init container for downloading the mysql connector jar downloadMySQLConnectorJarContainerName = "download-mysql-connect" ) // ConstructCascadingDeployment construct a Deployment from crd ShardingSphereProxy func ConstructCascadingDeployment(proxy *v1alpha1.ShardingSphereProxy) *v1.Deployment { if proxy == nil || reflect.DeepEqual(proxy, &v1alpha1.ShardingSphereProxy{}) { return &v1.Deployment{} } var ( maxUnavailable intstr.IntOrString maxSurge intstr.IntOrString ) if proxy.Annotations[AnnoRollingUpdateMaxUnavailable] != "" { n, _ := strconv.Atoi(proxy.Annotations[AnnoRollingUpdateMaxUnavailable]) maxUnavailable = intstr.FromInt(n) } else { maxUnavailable = intstr.FromInt(0) } if proxy.Annotations[AnnoRollingUpdateMaxSurge] != "" { n, _ := strconv.Atoi(proxy.Annotations[AnnoRollingUpdateMaxSurge]) maxSurge = intstr.FromInt(n) } else { maxSurge = intstr.FromInt(1) } dp := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: proxy.Name, Namespace: proxy.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(proxy.GetObjectMeta(), proxy.GroupVersionKind()), }, }, Spec: v1.DeploymentSpec{ Strategy: v1.DeploymentStrategy{ Type: v1.RollingUpdateDeploymentStrategyType, RollingUpdate: &v1.RollingUpdateDeployment{ MaxUnavailable: &maxUnavailable, MaxSurge: &maxSurge, }, }, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "apps": proxy.Name, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "apps": proxy.Name, }, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "proxy", Image: fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version), ImagePullPolicy: corev1.PullIfNotPresent, Ports: []corev1.ContainerPort{ { ContainerPort: proxy.Spec.Port, }, }, Env: []corev1.EnvVar{ { Name: "PORT", Value: strconv.FormatInt(int64(proxy.Spec.Port), 10), }, }, VolumeMounts: []corev1.VolumeMount{ { Name: "config", MountPath: "/opt/shardingsphere-proxy/conf", }, }, }, }, Volumes: []corev1.Volume{ { Name: "config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: proxy.Spec.ProxyConfigName, }, }, }, }, }, }, }, }, } if proxy.Spec.AutomaticScaling == nil { dp.Spec.Replicas = &proxy.Spec.Replicas } dp.Spec.Template.Spec.Containers[0].Resources = proxy.Spec.Resources if proxy.Spec.LivenessProbe != nil { dp.Spec.Template.Spec.Containers[0].LivenessProbe = proxy.Spec.LivenessProbe } if proxy.Spec.ReadinessProbe != nil { dp.Spec.Template.Spec.Containers[0].ReadinessProbe = proxy.Spec.ReadinessProbe } if proxy.Spec.StartupProbe != nil { dp.Spec.Template.Spec.Containers[0].StartupProbe = proxy.Spec.StartupProbe } if len(proxy.Spec.ImagePullSecrets) > 0 { dp.Spec.Template.Spec.ImagePullSecrets = proxy.Spec.ImagePullSecrets } return processOptionalParameter(proxy, dp) } func processOptionalParameter(proxy *v1alpha1.ShardingSphereProxy, dp *v1.Deployment) *v1.Deployment { if proxy.Spec.MySQLDriver != nil { addInitContainer(dp, proxy.Spec.MySQLDriver) } return dp } const script = `wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar; wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/${VERSION}/mysql-connector-java-${VERSION}.jar.md5; if [ $(md5sum /mysql-connector-java-${VERSION}.jar | cut -d ' ' -f1) = $(cat /mysql-connector-java-${VERSION}.jar.md5) ]; then echo success; else echo failed;exit 1;fi;mv /mysql-connector-java-${VERSION}.jar /opt/shardingsphere-proxy/ext-lib` func addInitContainer(dp *v1.Deployment, mysql *v1alpha1.MySQLDriver) { if len(dp.Spec.Template.Spec.InitContainers) == 0 { dp.Spec.Template.Spec.Containers[0].VolumeMounts = append(dp.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ Name: mysqlConnectorJarVolumeMountName, MountPath: "/opt/shardingsphere-proxy/ext-lib", }) dp.Spec.Template.Spec.Volumes = append(dp.Spec.Template.Spec.Volumes, corev1.Volume{ Name: mysqlConnectorJarVolumeMountName, VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }) } dp.Spec.Template.Spec.InitContainers = []corev1.Container{ { Name: downloadMySQLConnectorJarContainerName, Image: "busybox:1.35.0", Command: []string{"/bin/sh", "-c", script}, Env: []corev1.EnvVar{ { Name: "VERSION", Value: mysql.Version, }, }, VolumeMounts: []corev1.VolumeMount{ { Name: mysqlConnectorJarVolumeMountName, MountPath: "/opt/shardingsphere-proxy/ext-lib", }, }, }, } } // UpdateDeployment FIXME:merge UpdateDeployment and ConstructCascadingDeployment func UpdateDeployment(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *v1.Deployment { exp := act.DeepCopy() var ( maxUnavailable intstr.IntOrString maxSurge intstr.IntOrString ) if proxy.Annotations[AnnoRollingUpdateMaxUnavailable] != "" { n, _ := strconv.Atoi(proxy.Annotations[AnnoRollingUpdateMaxUnavailable]) maxUnavailable = intstr.FromInt(n) } else { maxUnavailable = intstr.FromInt(0) } if proxy.Annotations[AnnoRollingUpdateMaxSurge] != "" { n, _ := strconv.Atoi(proxy.Annotations[AnnoRollingUpdateMaxSurge]) maxSurge = intstr.FromInt(n) } else { maxSurge = intstr.FromInt(1) } exp.Spec.Strategy.Type = v1.RollingUpdateDeploymentStrategyType if exp.Spec.Strategy.RollingUpdate == nil { exp.Spec.Strategy.RollingUpdate = &v1.RollingUpdateDeployment{} } exp.Spec.Strategy.RollingUpdate.MaxSurge = &maxSurge exp.Spec.Strategy.RollingUpdate.MaxUnavailable = &maxUnavailable if proxy.Spec.AutomaticScaling == nil || !proxy.Spec.AutomaticScaling.Enable { exp.Spec.Replicas = updateReplicas(proxy, act) } exp.Spec.Template = updatePodTemplateSpec(proxy, act) return exp } func updateReplicas(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *int32 { if *act.Spec.Replicas != proxy.Spec.Replicas { return &proxy.Spec.Replicas } return act.Spec.Replicas } func updatePodTemplateSpec(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) corev1.PodTemplateSpec { exp := act.Spec.Template.DeepCopy() ssProxyContainer := updateSSProxyContainer(proxy, act) for i := range exp.Spec.Containers { if exp.Spec.Containers[i].Name == "proxy" { exp.Spec.Containers[i] = *ssProxyContainer } } if proxy.Spec.MySQLDriver != nil { initContainer := updateInitContainer(proxy, act) for i := range exp.Spec.InitContainers { if exp.Spec.InitContainers[i].Name == downloadMySQLConnectorJarContainerName { exp.Spec.InitContainers[i] = *initContainer } } } configName := updateConfigName(proxy, act) exp.Spec.Volumes[0].ConfigMap.Name = configName return *exp } func updateConfigName(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) string { if act.Spec.Template.Spec.Volumes[0].ConfigMap.Name != proxy.Spec.ProxyConfigName { return proxy.Spec.ProxyConfigName } return act.Spec.Template.Spec.Volumes[0].ConfigMap.Name } func updateInitContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container { var exp *corev1.Container for idx := range act.Spec.Template.Spec.InitContainers { if act.Spec.Template.Spec.InitContainers[idx].Name == downloadMySQLConnectorJarContainerName { updateEnv(act.Spec.Template.Spec.InitContainers[idx].Env, proxy.Spec.MySQLDriver.Version) exp = act.Spec.Template.Spec.InitContainers[idx].DeepCopy() } } return exp } func updateEnv(envs []corev1.EnvVar, version string) { for i := range envs { if envs[i].Name == "VERSION" { if envs[i].Value != version { envs[i].Value = version } } } } func setProbes(spec *v1alpha1.ProxySpec, act, exp *corev1.Container) { if checkProbe(spec.LivenessProbe, act.LivenessProbe) != nil { exp.LivenessProbe = spec.LivenessProbe } if checkProbe(spec.ReadinessProbe, act.ReadinessProbe) != nil { exp.ReadinessProbe = spec.ReadinessProbe } if checkProbe(spec.StartupProbe, act.StartupProbe) != nil { exp.StartupProbe = spec.StartupProbe } } func updateSSProxyContainer(proxy *v1alpha1.ShardingSphereProxy, act *v1.Deployment) *corev1.Container { var exp *corev1.Container for idx := range act.Spec.Template.Spec.Containers { if act.Spec.Template.Spec.Containers[idx].Name != "proxy" { continue } exp = act.Spec.Template.Spec.Containers[idx].DeepCopy() tag := strings.Split(act.Spec.Template.Spec.Containers[idx].Image, ":")[1] if tag != proxy.Spec.Version { exp.Image = fmt.Sprintf("%s:%s", imageName, proxy.Spec.Version) } exp.Resources = proxy.Spec.Resources setProbes(&proxy.Spec, &act.Spec.Template.Spec.Containers[idx], exp) for i := range act.Spec.Template.Spec.Containers[idx].Env { if act.Spec.Template.Spec.Containers[idx].Env[i].Name == "PORT" { proxyPort := strconv.FormatInt(int64(proxy.Spec.Port), 10) if act.Spec.Template.Spec.Containers[idx].Env[i].Value != proxyPort { act.Spec.Template.Spec.Containers[idx].Env[i].Value = proxyPort exp.Ports[0].ContainerPort = proxy.Spec.Port } } } exp.Env = act.Spec.Template.Spec.Containers[idx].Env } return exp } func checkProbe(proxy, act *corev1.Probe) *corev1.Probe { if proxy != nil && !reflect.DeepEqual(act, proxy) { return proxy } return nil } func getReadyNodes(podlist *corev1.PodList) int32 { var cnt int32 findRunningPod := func(pod *corev1.Pod) { if pod.Status.Phase != corev1.PodRunning { return } if isTrueReadyPod(pod) { for j := range pod.Status.ContainerStatuses { if pod.Status.ContainerStatuses[j].Name == "shardingsphere-proxy" && pod.Status.ContainerStatuses[j].Ready { cnt++ } } } } for idx := range podlist.Items { findRunningPod(&podlist.Items[idx]) } return cnt } func isTrueReadyPod(pod *corev1.Pod) bool { for i := range pod.Status.Conditions { if pod.Status.Conditions[i].Type == corev1.PodReady && pod.Status.Conditions[i].Status == corev1.ConditionTrue { return true } } return false } // ReconcileStatus returns the status of ShardingSphereProxy func ReconcileStatus(podlist *corev1.PodList, rt *v1alpha1.ShardingSphereProxy) v1alpha1.ProxyStatus { readyNodes := getReadyNodes(podlist) rt.Status.ReadyNodes = readyNodes if rt.Spec.Replicas == 0 { rt.Status.Phase = v1alpha1.StatusNotReady } else { if readyNodes < miniReadyCount { rt.Status.Phase = v1alpha1.StatusNotReady } else { rt.Status.Phase = v1alpha1.StatusReady } } if rt.Status.Phase == v1alpha1.StatusReady { rt.Status.Conditions = updateReadyConditions(rt.Status.Conditions, v1alpha1.Condition{ Type: v1alpha1.ConditionReady, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), }) } else { cond := clusterCondition(podlist) rt.Status.Conditions = updateNotReadyConditions(rt.Status.Conditions, cond) } return rt.Status } func newConditions(conditions []v1alpha1.Condition, cond v1alpha1.Condition) []v1alpha1.Condition { if conditions == nil { conditions = []v1alpha1.Condition{} } if cond.Type == "" { return conditions } found := false for idx := range conditions { if conditions[idx].Type != cond.Type { continue } conditions[idx].LastUpdateTime = cond.LastUpdateTime conditions[idx].Status = cond.Status found = true break } if !found { conditions = append(conditions, cond) } return conditions } func updateReadyConditions(conditions []v1alpha1.Condition, cond v1alpha1.Condition) []v1alpha1.Condition { return newConditions(conditions, cond) } func updateNotReadyConditions(conditions []v1alpha1.Condition, cond v1alpha1.Condition) []v1alpha1.Condition { cur := newConditions(conditions, cond) for idx := range cur { if cur[idx].Type == v1alpha1.ConditionReady { cur[idx].LastUpdateTime = metav1.Now() cur[idx].Status = metav1.ConditionFalse } } return cur } func clusterCondition(podlist *corev1.PodList) v1alpha1.Condition { cond := v1alpha1.Condition{} if len(podlist.Items) == 0 { return cond } condStarted := v1alpha1.Condition{ Type: v1alpha1.ConditionStarted, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), } condSucceed := v1alpha1.Condition{ Type: v1alpha1.ConditionSucceed, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), } condUnknown := v1alpha1.Condition{ Type: v1alpha1.ConditionUnknown, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), } condDeployed := v1alpha1.Condition{ Type: v1alpha1.ConditionDeployed, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), } condFailed := v1alpha1.Condition{ Type: v1alpha1.ConditionFailed, Status: metav1.ConditionTrue, LastUpdateTime: metav1.Now(), } //FIXME: do not capture ConditionStarted in some cases for i := range podlist.Items { switch podlist.Items[i].Status.Phase { case corev1.PodSucceeded: return condSucceed case corev1.PodRunning: return condStarted case corev1.PodUnknown: return condUnknown case corev1.PodPending: return condDeployed case corev1.PodFailed: return condFailed } } return cond }