shardingsphere-operator/pkg/controllers/compute_node_controller.go (356 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"reflect"
"time"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/configmap"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/deployment"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
reconcile "github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/computenode"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
computeNodeControllerName = "compute-node-controller"
defaultRequeueTime = 10 * time.Second
)
// ComputeNodeReconciler is a controller for the compute node
type ComputeNodeReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
Builder reconcile.Builder
Resources kubernetes.Resources
Deployment deployment.Deployment
Service service.Service
ConfigMap configmap.ConfigMap
}
// SetupWithManager sets up the controller with the Manager
func (r *ComputeNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ComputeNode{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
// +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=computenodes,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=shardingsphere.apache.org,resources=computenodes/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// Reconcile handles main function of this controller
func (r *ComputeNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues(computeNodeControllerName, req.NamespacedName)
cn := &v1alpha1.ComputeNode{}
if err := r.Get(ctx, req.NamespacedName, cn); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
logger.Error(err, "Failed to get the compute node")
return ctrl.Result{Requeue: true}, err
}
if err := r.reconcileStatus(ctx, cn); err != nil {
logger.Error(err, "Failed to reconcile status")
}
errors := []error{}
if err := r.reconcileDeployment(ctx, cn); err != nil {
logger.Error(err, "Failed to reconcile deployement")
errors = append(errors, err)
}
if err := r.reconcileService(ctx, cn); err != nil {
logger.Error(err, "Failed to reconcile service")
errors = append(errors, err)
}
if err := r.reconcileConfigMap(ctx, cn); err != nil {
logger.Error(err, "Failed to reconcile configmap")
errors = append(errors, err)
}
if len(errors) != 0 {
return ctrl.Result{Requeue: true}, errors[0]
}
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
func (r *ComputeNodeReconciler) reconcileDeployment(ctx context.Context, cn *v1alpha1.ComputeNode) error {
deploy, err := r.getDeploymentByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name})
if err != nil {
return err
}
if deploy != nil {
return r.updateDeployment(ctx, cn, deploy)
}
return r.createDeployment(ctx, cn)
}
func (r *ComputeNodeReconciler) createDeployment(ctx context.Context, cn *v1alpha1.ComputeNode) error {
deploy := r.Builder.BuildDeployment(ctx, cn)
err := r.Resources.Deployment().Create(ctx, deploy)
if err != nil && apierrors.IsAlreadyExists(err) || err == nil {
return nil
}
return err
}
func (r *ComputeNodeReconciler) updateDeployment(ctx context.Context, cn *v1alpha1.ComputeNode, deploy *appsv1.Deployment) error {
exp := r.Builder.BuildDeployment(ctx, cn)
exp.ObjectMeta = deploy.ObjectMeta
exp.Labels = deploy.Labels
exp.Annotations = deploy.Annotations
if !reflect.DeepEqual(deploy.Spec, exp.Spec) {
return r.Resources.Deployment().Update(ctx, exp)
}
return nil
}
func (r *ComputeNodeReconciler) getDeploymentByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*appsv1.Deployment, error) {
dp, err := r.Resources.Deployment().GetByNamespacedName(ctx, namespacedName)
if err != nil {
return nil, err
}
return dp, nil
}
func (r *ComputeNodeReconciler) reconcileService(ctx context.Context, cn *v1alpha1.ComputeNode) error {
svc, err := r.getServiceByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name})
if err != nil {
return err
}
if svc != nil {
return r.updateService(ctx, cn, svc)
}
return r.createService(ctx, cn)
}
func (r *ComputeNodeReconciler) createService(ctx context.Context, cn *v1alpha1.ComputeNode) error {
svc := r.Builder.BuildService(ctx, cn)
err := r.Resources.Service().Create(ctx, svc)
if err != nil && apierrors.IsAlreadyExists(err) || err == nil {
return nil
}
return err
}
func (r *ComputeNodeReconciler) updateComputeNodePortBindings(ctx context.Context, cn *v1alpha1.ComputeNode) error {
if rt, err := r.getRuntimeComputeNode(ctx, types.NamespacedName{
Namespace: cn.Namespace,
Name: cn.Name,
}); err == nil {
rt.Spec.PortBindings = cn.Spec.PortBindings
if err := r.Update(ctx, rt); err != nil {
return err
}
}
return nil
}
func (r *ComputeNodeReconciler) updateService(ctx context.Context, cn *v1alpha1.ComputeNode, s *corev1.Service) error {
pbs := []v1alpha1.PortBinding{}
copy(cn.Spec.PortBindings, pbs)
switch cn.Spec.ServiceType {
case corev1.ServiceTypeClusterIP:
updateServiceClusterIP(cn.Spec.PortBindings)
if !reflect.DeepEqual(cn.Spec.PortBindings, pbs) {
return r.updateComputeNodePortBindings(ctx, cn)
}
case corev1.ServiceTypeExternalName:
fallthrough
case corev1.ServiceTypeLoadBalancer:
fallthrough
case corev1.ServiceTypeNodePort:
updateServiceNodePort(cn.Spec.PortBindings, s.Spec.Ports)
if !reflect.DeepEqual(cn.Spec.PortBindings, pbs) {
return r.updateComputeNodePortBindings(ctx, cn)
}
}
exp := r.Builder.BuildService(ctx, cn)
exp.ObjectMeta = s.ObjectMeta
exp.Spec.ClusterIP = s.Spec.ClusterIP
exp.Spec.ClusterIPs = s.Spec.ClusterIPs
if cn.Spec.ServiceType == corev1.ServiceTypeNodePort {
exp.Spec.Ports = updateNodePorts(cn.Spec.PortBindings, s.Spec.Ports)
}
if !reflect.DeepEqual(exp.Spec, s.Spec) {
return r.Update(ctx, exp)
}
return nil
}
func updateServiceNodePort(pbs []v1alpha1.PortBinding, ports []corev1.ServicePort) {
for idx := range ports {
for i := range pbs {
if ports[idx].Name == pbs[i].Name {
if pbs[i].NodePort == 0 {
pbs[i].NodePort = ports[idx].NodePort
}
break
}
}
}
}
func updateServiceClusterIP(portBindings []v1alpha1.PortBinding) {
for idx := range portBindings {
if portBindings[idx].NodePort != 0 {
portBindings[idx].NodePort = 0
break
}
}
}
func updateNodePorts(portbindings []v1alpha1.PortBinding, svcports []corev1.ServicePort) []corev1.ServicePort {
ports := []corev1.ServicePort{}
for pb := range portbindings {
for sp := range svcports {
if portbindings[pb].Name == svcports[sp].Name {
port := corev1.ServicePort{
Name: portbindings[pb].Name,
TargetPort: intstr.FromInt(int(portbindings[pb].ContainerPort)),
Port: portbindings[pb].ServicePort,
Protocol: portbindings[pb].Protocol,
}
if svcports[sp].NodePort != 0 {
port.NodePort = svcports[sp].NodePort
}
ports = append(ports, port)
}
}
}
return ports
}
func (r *ComputeNodeReconciler) getServiceByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*corev1.Service, error) {
svc, err := r.Resources.Service().GetByNamespacedName(ctx, namespacedName)
if err != nil {
return nil, err
}
return svc, nil
}
func (r *ComputeNodeReconciler) createConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode) error {
cm := r.Builder.BuildConfigMap(ctx, cn)
err := r.Resources.ConfigMap().Create(ctx, cm)
if err != nil && apierrors.IsAlreadyExists(err) || err == nil {
return nil
}
return err
}
func (r *ComputeNodeReconciler) updateConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode, cm *corev1.ConfigMap) error {
exp := r.Builder.BuildConfigMap(ctx, cn)
exp.ObjectMeta = cm.ObjectMeta
exp.Labels = cm.Labels
exp.Annotations = cm.Annotations
if !reflect.DeepEqual(cm.Data, exp.Data) {
return r.Resources.ConfigMap().Update(ctx, exp)
}
return nil
}
func (r *ComputeNodeReconciler) getConfigMapByNamespacedName(ctx context.Context, namespacedName types.NamespacedName) (*corev1.ConfigMap, error) {
cm, err := r.Resources.ConfigMap().GetByNamespacedName(ctx, namespacedName)
if err != nil {
return nil, err
}
return cm, nil
}
func (r *ComputeNodeReconciler) reconcileConfigMap(ctx context.Context, cn *v1alpha1.ComputeNode) error {
cm, err := r.getConfigMapByNamespacedName(ctx, types.NamespacedName{Namespace: cn.Namespace, Name: cn.Name})
if err != nil {
return err
}
if cm != nil {
return r.updateConfigMap(ctx, cn, cm)
}
return r.createConfigMap(ctx, cn)
}
func (r *ComputeNodeReconciler) reconcileStatus(ctx context.Context, cn *v1alpha1.ComputeNode) error {
selector, err := metav1.LabelSelectorAsSelector(cn.Spec.Selector)
if err != nil {
err := fmt.Errorf("error retrieving ComputeNode labels")
return err
}
podlist := &corev1.PodList{}
if err := r.List(ctx, podlist, client.InNamespace(cn.Namespace), client.MatchingLabels(cn.Spec.Selector.MatchLabels)); err != nil {
return err
}
service := &corev1.Service{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: cn.Namespace,
Name: cn.Name,
}, service); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
status := reconcileComputeNodeStatus(podlist, service, cn)
rt, err := r.getRuntimeComputeNode(ctx, types.NamespacedName{
Namespace: cn.Namespace,
Name: cn.Name,
})
if err != nil {
return err
}
status.Selector = selector.String()
rt.Status = *status
return r.Status().Update(ctx, rt)
}
func getReadyProxyInstances(podlist *corev1.PodList) int32 {
var cnt int32
findRunningPod := func(pod *corev1.Pod) {
if pod.Status.Phase != corev1.PodRunning {
return
}
if isTrueReadyPod(pod) {
for j := range pod.Status.ContainerStatuses {
if pod.Status.ContainerStatuses[j].Name == "shardingsphere-proxy" && pod.Status.ContainerStatuses[j].Ready {
cnt++
}
}
}
}
for idx := range podlist.Items {
findRunningPod(&podlist.Items[idx])
}
return cnt
}
func isTrueReadyPod(pod *corev1.Pod) bool {
for i := range pod.Status.Conditions {
if pod.Status.Conditions[i].Type == corev1.PodReady && pod.Status.Conditions[i].Status == corev1.ConditionTrue {
return true
}
}
return false
}
func updateComputeNodeStatusCondition(conditions []v1alpha1.ComputeNodeCondition, conds []v1alpha1.ComputeNodeCondition) []v1alpha1.ComputeNodeCondition {
for idx := range conds {
var found bool
for i := range conditions {
conditions[i].LastUpdateTime = conds[idx].LastUpdateTime
if conditions[i].Type == conds[idx].Type {
found = true
conditions[i].Type = conds[idx].Type
conditions[i].Status = conds[idx].Status
conditions[i].Message = conds[idx].Message
conditions[i].Reason = conds[idx].Reason
} else if conds[idx].Type == v1alpha1.ComputeNodeConditionUnknown || conditions[i].Type == v1alpha1.ComputeNodeConditionUnknown {
conditions[i].Status = v1alpha1.ConditionStatusFalse
} else {
continue
}
}
// check current conditions
if len(conditions) == 0 || !found {
conditions = append(conditions, conds[idx])
}
}
return conditions
}
func reconcileComputeNodeStatus(podlist *corev1.PodList, svc *corev1.Service, cn *v1alpha1.ComputeNode) *v1alpha1.ComputeNodeStatus {
conds := reconcile.GetConditionFromPods(podlist)
cn.Status.Conditions = updateComputeNodeStatusCondition(cn.Status.Conditions, conds)
ready := getReadyProxyInstances(podlist)
cn.Status.Ready = fmt.Sprintf("%d/%d", ready, len(podlist.Items))
cn.Status.Replicas = int32(len(podlist.Items))
if ready > 0 {
cn.Status.Phase = v1alpha1.ComputeNodeStatusReady
} else {
cn.Status.Phase = v1alpha1.ComputeNodeStatusNotReady
}
cn.Status.LoadBalancer.ClusterIP = svc.Spec.ClusterIP
cn.Status.LoadBalancer.Ingress = svc.Status.LoadBalancer.Ingress
return &cn.Status
}
func (r *ComputeNodeReconciler) getRuntimeComputeNode(ctx context.Context, namespacedName types.NamespacedName) (*v1alpha1.ComputeNode, error) {
rt := &v1alpha1.ComputeNode{}
err := r.Get(ctx, namespacedName, rt)
return rt, err
}