pkg/common/utils/k8s/client.go (296 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package k8s
import (
"context"
"errors"
"fmt"
"github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
dorisv1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils"
"github.com/apache/doris-operator/pkg/common/utils/resource"
appv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/autoscaling/v1"
v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// judge two services equal or not in some fields. develoer can custom the function.
type ServiceEqual func(svc1 *corev1.Service, svc2 *corev1.Service) bool
// judge two statefulset equal or not in some fields. develoer can custom the function.
type StatefulSetEqual func(st1 *appv1.StatefulSet, st2 *appv1.StatefulSet) bool
func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Service, equal ServiceEqual) error {
// As stated in the RetryOnConflict's documentation, the returned error shouldn't be wrapped.
var esvc corev1.Service
//avoid clusterIps Invalid value failed.
svc.Spec.ClusterIPs = nil
err := k8sclient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, &esvc)
if err != nil && apierrors.IsNotFound(err) {
//avoid client version not match k8s version will result resourceVersion is not "" and response "resourceVersion should not set" failed.
svc.ResourceVersion = ""
if err = CreateClientObject(ctx, k8sclient, svc); err == nil || apierrors.IsAlreadyExists(err) {
return nil
}
return err
} else if err != nil && apierrors.IsNotFound(err) {
return err
}
if equal(svc, &esvc) {
klog.Info("CreateOrUpdateService service Name, Ports, Selector, ServiceType, Labels have not change ", "namespace ", svc.Namespace, " name ", svc.Name)
return nil
}
//resolve the bug: metadata.resourceversion invalid value '' must be specified for an update
svc.ResourceVersion = esvc.ResourceVersion
return PatchClientObject(ctx, k8sclient, svc)
}
func ListServicesInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]corev1.Service, error) {
var svcList corev1.ServiceList
if err := k8sclient.List(ctx, &svcList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}
return svcList.Items, nil
}
func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]appv1.StatefulSet, error) {
var stsList appv1.StatefulSetList
if err := k8sclient.List(ctx, &stsList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}
return stsList.Items, nil
}
// ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset.
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error {
var est appv1.StatefulSet
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est)
if err != nil && apierrors.IsNotFound(err) {
return CreateClientObject(ctx, k8sclient, st)
} else if err != nil && !apierrors.IsNotFound(err) {
return err
}
//if have restart annotation we should exclude it impacts on hash.
if equal(st, &est) {
klog.Infof("ApplyStatefulSet Sync exist statefulset name=%s, namespace=%s, equals to new statefulset.", est.Name, est.Namespace)
return nil
}
st.ResourceVersion = est.ResourceVersion
err = PatchClientObject(ctx, k8sclient, st)
if err == nil || apierrors.IsConflict(err) {
return nil
}
return err
}
func ApplyDorisCluster(ctx context.Context, k8sclient client.Client, dcr *dorisv1.DorisCluster) error {
err := PatchClientObject(ctx, k8sclient, dcr)
if err == nil || apierrors.IsConflict(err) {
return nil
}
return err
}
func GetStatefulSet(ctx context.Context, k8sclient client.Client, namespace, name string) (*appv1.StatefulSet, error) {
var est appv1.StatefulSet
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &est)
return &est, err
}
func CreateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Info("Creating resource service ", "namespace ", object.GetNamespace(), " name ", object.GetName(), " kind ", object.GetObjectKind().GroupVersionKind().Kind)
if err := k8sclient.Create(ctx, object); err != nil {
return err
}
return nil
}
func UpdateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Info("Updating resource service ", "namespace ", object.GetNamespace(), " name ", object.GetName(), " kind ", object.GetObjectKind())
if err := k8sclient.Update(ctx, object); err != nil {
return err
}
return nil
}
func CreateOrUpdateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Infof("create or update resource namespace=%s,name=%s,kind=%s.", object.GetNamespace(), object.GetName(), object.GetObjectKind())
if err := k8sclient.Update(ctx, object); apierrors.IsNotFound(err) {
return k8sclient.Create(ctx, object)
} else if err != nil {
return err
}
return nil
}
// PatchClientObject patch object when the object exist. if not return error.
func PatchClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Infof("patch resource namespace=%s,name=%s,kind=%s.", object.GetNamespace(), object.GetName(), object.GetObjectKind())
if err := k8sclient.Patch(ctx, object, client.Merge); err != nil {
return err
}
return nil
}
// PatchOrCreate patch object if not exist create object.
func PatchOrCreate(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.V(4).Infof("patch or create resource namespace=%s,name=%s,kind=%s.", object.GetNamespace(), object.GetName(), object.GetObjectKind())
if err := k8sclient.Patch(ctx, object, client.Merge); apierrors.IsNotFound(err) {
return k8sclient.Create(ctx, object)
} else if err != nil {
return err
}
return nil
}
func DeleteClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
if err := k8sclient.Delete(ctx, object); err != nil {
return err
}
return nil
}
// DeleteStatefulset delete statefulset.
func DeleteStatefulset(ctx context.Context, k8sclient client.Client, namespace, name string) error {
var st appv1.StatefulSet
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &st); apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
return k8sclient.Delete(ctx, &st)
}
// DeleteService delete service.
func DeleteService(ctx context.Context, k8sclient client.Client, namespace, name string) error {
var svc corev1.Service
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &svc); apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
return k8sclient.Delete(ctx, &svc)
}
// DeleteAutoscaler as version type delete response autoscaler.
func DeleteAutoscaler(ctx context.Context, k8sclient client.Client, namespace, name string, autoscalerVersion dorisv1.AutoScalerVersion) error {
var autoscaler client.Object
switch autoscalerVersion {
case dorisv1.AutoScalerV1:
autoscaler = &v1.HorizontalPodAutoscaler{}
case dorisv1.AutoSclaerV2:
autoscaler = &v2.HorizontalPodAutoscaler{}
default:
return errors.New(fmt.Sprintf("the autoscaler type %s is not supported.", autoscalerVersion))
}
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, autoscaler); apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
return k8sclient.Delete(ctx, autoscaler)
}
func PodIsReady(status *corev1.PodStatus) bool {
if status.ContainerStatuses == nil {
return false
}
for _, cs := range status.ContainerStatuses {
if !cs.Ready {
return false
}
}
return true
}
// get the secret by namespace and name.
func GetSecret(ctx context.Context, k8sclient client.Client, namespace, name string) (*corev1.Secret, error) {
var secret corev1.Secret
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &secret); err != nil {
return nil, err
}
return &secret, nil
}
func CreateSecret(ctx context.Context, k8sclient client.Client, secret *corev1.Secret) error {
return k8sclient.Create(ctx, secret)
}
func UpdateSecret(ctx context.Context, k8sclient client.Client, secret *corev1.Secret) error {
if err := k8sclient.Update(ctx, secret); err != nil {
return err
}
return nil
}
// GetConfigMap get the configmap name=name, namespace=namespace.
func GetConfigMap(ctx context.Context, k8scient client.Client, namespace, name string) (*corev1.ConfigMap, error) {
var configMap corev1.ConfigMap
if err := k8scient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &configMap); err != nil {
return nil, err
}
return &configMap, nil
}
// GetConfigMaps get the configmap by the array of MountConfigMapInfo and namespace.
func GetConfigMaps(ctx context.Context, k8scient client.Client, namespace string, cms []dorisv1.MountConfigMapInfo) ([]*corev1.ConfigMap, error) {
var configMaps []*corev1.ConfigMap
errMessage := ""
for _, cm := range cms {
var configMap corev1.ConfigMap
if getErr := k8scient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cm.ConfigMapName}, &configMap); getErr != nil {
errMessage = errMessage + fmt.Sprintf("(name: %s, namespace: %s, err: %s), ", cm.ConfigMapName, namespace, getErr.Error())
}
configMaps = append(configMaps, &configMap)
}
if errMessage != "" {
return configMaps, errors.New("Failed to get configmap: " + errMessage)
}
return configMaps, nil
}
// get the Service by namespace and name.
func GetService(ctx context.Context, k8sclient client.Client, namespace, name string) (*corev1.Service, error) {
var svc corev1.Service
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &svc); err != nil {
return nil, err
}
return &svc, nil
}
func GetPods(ctx context.Context, k8sclient client.Client, namespace string, labels map[string]string) (corev1.PodList, error) {
pods := corev1.PodList{}
err := k8sclient.List(
ctx,
&pods,
client.InNamespace(namespace),
client.MatchingLabels(labels),
)
if err != nil {
return pods, err
}
return pods, nil
}
// GetConfig get conf from configmap by componentType , if not use configmap get an empty map.
func GetConfig(ctx context.Context, k8sclient client.Client, configMapInfo *dorisv1.ConfigMapInfo, namespace string, componentType dorisv1.ComponentType) (map[string]interface{}, error) {
cms := resource.GetMountConfigMapInfo(*configMapInfo)
if len(cms) == 0 {
return make(map[string]interface{}), nil
}
configMaps, err := GetConfigMaps(ctx, k8sclient, namespace, cms)
if err != nil {
klog.Errorf("GetConfig get configmap failed, namespace: %s,err: %s \n", namespace, err.Error())
}
res, resolveErr := resource.ResolveConfigMaps(configMaps, componentType)
return res, utils.MergeError(err, resolveErr)
}
// ApplyFoundationDBCluster apply FoundationDBCluster to apiserver.
func ApplyFoundationDBCluster(ctx context.Context, k8sclient client.Client, fdb *v1beta2.FoundationDBCluster) error {
var efdb v1beta2.FoundationDBCluster
if err := k8sclient.Get(ctx, types.NamespacedName{
Name: fdb.Name,
Namespace: fdb.Namespace,
}, &efdb); apierrors.IsNotFound(err) {
return k8sclient.Create(ctx, fdb)
}
fdb.ResourceVersion = efdb.ResourceVersion
return k8sclient.Patch(ctx, fdb, client.Merge)
}
func DeleteFoundationDBCluster(ctx context.Context, k8sclient client.Client, namespace, name string) error {
fdb, err := GetFoundationDBCluster(ctx, k8sclient, namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
return k8sclient.Delete(ctx, fdb)
}
func GetFoundationDBCluster(ctx context.Context, k8sclient client.Client, namespace, name string) (*v1beta2.FoundationDBCluster, error) {
var fdb v1beta2.FoundationDBCluster
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &fdb); err != nil {
return nil, err
}
return &fdb, nil
}
// DeletePVC clean up existing pvc by pvc name, namespace and labels
func DeletePVC(ctx context.Context, k8sclient client.Client, namespace, pvcName string, labels map[string]string) error {
pvc := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
Labels: labels,
},
}
err := k8sclient.Delete(ctx, &pvc)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
}
func GetPVC(ctx context.Context, k8sclient client.Client, name, namespace string) (*corev1.PersistentVolumeClaim, error) {
var pvc corev1.PersistentVolumeClaim
if err := k8sclient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &pvc); err != nil {
return nil, err
}
return &pvc, nil
}