pkg/plugins/resources/k8s/store.go (363 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package k8s
import (
"context"
"regexp"
"strings"
"time"
)
import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
kube_apierrs "k8s.io/apimachinery/pkg/api/errors"
kube_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
import (
"github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
k8s_common "github.com/apache/dubbo-kubernetes/pkg/plugins/common/k8s"
k8s_model "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/model"
k8s_registry "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/registry"
util_store "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/util"
util_k8s "github.com/apache/dubbo-kubernetes/pkg/util/k8s"
)
func typeIsUnregistered(err error) bool {
var typeErr *k8s_registry.UnknownTypeError
return errors.As(err, &typeErr)
}
var _ store.ResourceStore = &KubernetesStore{}
type KubernetesStore struct {
Client kube_client.Client
Converter k8s_common.Converter
Scheme *kube_runtime.Scheme
}
func NewStore(client kube_client.Client, scheme *kube_runtime.Scheme, converter k8s_common.Converter) (store.ResourceStore, error) {
return &KubernetesStore{
Client: client,
Converter: converter,
Scheme: scheme,
}, nil
}
func (s *KubernetesStore) Create(ctx context.Context, r core_model.Resource, fs ...store.CreateOptionsFunc) error {
opts := store.NewCreateOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return errors.Errorf("cannot create instance of unregistered type %q", r.Descriptor().Name)
}
return errors.Wrap(err, "failed to convert core model into k8s counterpart")
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
// for k8s metadata.name require
if name, err = EncodeK8sResName(name); err != nil {
return err
}
obj.GetObjectMeta().SetLabels(opts.Labels)
obj.SetMesh(opts.Mesh)
obj.GetObjectMeta().SetName(name)
obj.GetObjectMeta().SetNamespace(namespace)
if opts.Owner != nil {
k8sOwner, err := s.Converter.ToKubernetesObject(opts.Owner)
if err != nil {
return errors.Wrap(err, "failed to convert core model into k8s counterpart")
}
if err := controllerutil.SetOwnerReference(k8sOwner, obj, s.Scheme); err != nil {
return errors.Wrap(err, "failed to set owner reference for object")
}
}
if err := s.Client.Create(ctx, obj); err != nil {
if kube_apierrs.IsAlreadyExists(err) {
// 如果资源已经存在了就直接返回空即可
logger.Sugar().Warn("资源已经存在了")
return nil
}
return errors.Wrap(err, "failed to create k8s resource")
}
if name, err = DecodeK8sResName(obj.GetName()); err != nil {
return err
} else {
obj.SetName(name)
}
err = s.Converter.ToCoreResource(obj, r)
if err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
return nil
}
func (s *KubernetesStore) Update(ctx context.Context, r core_model.Resource, fs ...store.UpdateOptionsFunc) error {
opts := store.NewUpdateOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return errors.Errorf("cannot update instance of unregistered type %q", r.Descriptor().Name)
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
obj.GetObjectMeta().SetLabels(opts.Labels)
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
// for k8s metadata.name require
if name, err = EncodeK8sResName(name); err != nil {
return err
}
if r.GetMeta() == nil {
if err = s.Client.Get(ctx, kube_client.ObjectKey{Namespace: namespace, Name: name}, obj); err != nil {
if kube_apierrs.IsNotFound(err) {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return errors.Wrap(err, "failed to get k8s resource")
}
obj.SetSpec(r.GetSpec())
} else {
obj.SetName(name)
obj.SetMesh(r.GetMeta().GetMesh())
}
if err = s.Client.Update(ctx, obj); err != nil {
if kube_apierrs.IsConflict(err) {
return store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh())
}
return errors.Wrap(err, "failed to update k8s resource")
}
if name, err = DecodeK8sResName(obj.GetName()); err != nil {
return err
} else {
obj.SetName(name)
}
err = s.Converter.ToCoreResource(obj, r)
if err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
return nil
}
func (s *KubernetesStore) Delete(ctx context.Context, r core_model.Resource, fs ...store.DeleteOptionsFunc) error {
opts := store.NewDeleteOptions(fs...)
// get object and validate mesh
if err := s.Get(ctx, r, store.GetByKey(opts.Name, opts.Mesh)); err != nil {
return err
}
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
// Unregistered types can't exist in the first place, so deletion would automatically succeed.
if typeIsUnregistered(err) {
return nil
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
// for k8s metadata.name require
if name, err = EncodeK8sResName(name); err != nil {
return err
}
obj.GetObjectMeta().SetName(name)
obj.GetObjectMeta().SetNamespace(namespace)
if name, err = DecodeK8sResName(obj.GetName()); err != nil {
return err
} else {
obj.SetName(name)
}
if err := s.Client.Delete(ctx, obj); err != nil {
if kube_apierrs.IsNotFound(err) {
return nil
}
return errors.Wrap(err, "failed to delete k8s resource")
}
return nil
}
func (s *KubernetesStore) Get(ctx context.Context, r core_model.Resource, fs ...store.GetOptionsFunc) error {
opts := store.NewGetOptions(fs...)
obj, err := s.Converter.ToKubernetesObject(r)
if err != nil {
if typeIsUnregistered(err) {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return errors.Wrapf(err, "failed to convert core model of type %s into k8s counterpart", r.Descriptor().Name)
}
name, namespace, err := k8sNameNamespace(opts.Name, obj.Scope())
if err != nil {
return err
}
// for k8s metadata.name require
if name, err = EncodeK8sResName(name); err != nil {
return err
}
if err := s.Client.Get(ctx, kube_client.ObjectKey{Namespace: namespace, Name: name}, obj); err != nil {
if kube_apierrs.IsNotFound(err) {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return errors.Wrap(err, "failed to get k8s resource")
}
if name, err = DecodeK8sResName(obj.GetName()); err != nil {
return err
} else {
obj.SetName(name)
}
if err := s.Converter.ToCoreResource(obj, r); err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
if opts.Version != "" && r.GetMeta().GetVersion() != opts.Version {
return store.ErrorResourceConflict(r.Descriptor().Name, opts.Name, opts.Mesh)
}
if r.GetMeta().GetMesh() != opts.Mesh {
return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh)
}
return nil
}
func (s *KubernetesStore) List(ctx context.Context, rs core_model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
obj, err := s.Converter.ToKubernetesList(rs)
if err != nil {
if typeIsUnregistered(err) {
return nil
}
return errors.Wrapf(err, "failed to convert core list model of type %s into k8s counterpart", rs.GetItemType())
}
if err := s.Client.List(ctx, obj); err != nil {
return errors.Wrap(err, "failed to list k8s resources")
}
predicate := func(r core_model.Resource) bool {
if opts.Mesh != "" && r.GetMeta().GetMesh() != opts.Mesh {
return false
}
if opts.NameContains != "" && !strings.Contains(r.GetMeta().GetName(), opts.NameContains) {
return false
}
return true
}
fullList, err := registry.Global().NewList(rs.GetItemType())
if err != nil {
return err
}
for _, object := range obj.GetItems() {
name, _ := DecodeK8sResName(object.GetName())
object.SetName(name)
}
if err := s.Converter.ToCoreList(obj, fullList, predicate); err != nil {
return errors.Wrap(err, "failed to convert k8s model into core counterpart")
}
for _, item := range fullList.GetItems() {
_ = rs.AddItem(item)
}
rs.GetPagination().SetTotal(uint32(len(fullList.GetItems())))
return nil
}
func k8sNameNamespace(coreName string, scope k8s_model.Scope) (string, string, error) {
if coreName == "" {
return "", "", store.PreconditionFormatError("name can't be empty")
}
switch scope {
case k8s_model.ScopeCluster:
return coreName, "", nil
case k8s_model.ScopeNamespace:
name, ns, err := util_k8s.CoreNameToK8sName(coreName)
if err != nil {
return "", "", store.PreconditionFormatError(err.Error())
}
return name, ns, nil
default:
return "", "", errors.Errorf("unknown scope %s", scope)
}
}
var _ core_model.ResourceMeta = &KubernetesMetaAdapter{}
type KubernetesMetaAdapter struct {
kube_meta.ObjectMeta
Mesh string
}
func (m *KubernetesMetaAdapter) GetName() string {
if m.Namespace == "" { // it's cluster scoped object
return m.ObjectMeta.Name
}
return util_k8s.K8sNamespacedNameToCoreName(m.ObjectMeta.Name, m.ObjectMeta.Namespace)
}
func (m *KubernetesMetaAdapter) GetNameExtensions() core_model.ResourceNameExtensions {
return k8s_common.ResourceNameExtensions(m.ObjectMeta.Namespace, m.ObjectMeta.Name)
}
func (m *KubernetesMetaAdapter) GetVersion() string {
return m.ObjectMeta.GetResourceVersion()
}
func (m *KubernetesMetaAdapter) GetMesh() string {
return m.Mesh
}
func (m *KubernetesMetaAdapter) GetCreationTime() time.Time {
return m.GetObjectMeta().GetCreationTimestamp().Time
}
func (m *KubernetesMetaAdapter) GetModificationTime() time.Time {
return m.GetObjectMeta().GetCreationTimestamp().Time
}
func (m *KubernetesMetaAdapter) GetLabels() map[string]string {
labels := maps.Clone(m.GetObjectMeta().GetLabels())
if labels == nil {
labels = map[string]string{}
}
if _, ok := labels[v1alpha1.DisplayName]; !ok {
labels[v1alpha1.DisplayName] = m.GetObjectMeta().GetName()
}
if m.Namespace != "" {
labels[v1alpha1.KubeNamespaceTag] = m.Namespace
}
return labels
}
type KubeFactory interface {
NewObject(r core_model.Resource) (k8s_model.KubernetesObject, error)
NewList(rl core_model.ResourceList) (k8s_model.KubernetesList, error)
}
var _ KubeFactory = &SimpleKubeFactory{}
type SimpleKubeFactory struct {
KubeTypes k8s_registry.TypeRegistry
}
func (f *SimpleKubeFactory) NewObject(r core_model.Resource) (k8s_model.KubernetesObject, error) {
return f.KubeTypes.NewObject(r.GetSpec())
}
func (f *SimpleKubeFactory) NewList(rl core_model.ResourceList) (k8s_model.KubernetesList, error) {
return f.KubeTypes.NewList(rl.NewItem().GetSpec())
}
// Define the regex pattern for a valid RFC 1123 subdomain
// for k8s [metadata.name] require.
var k8sNameCheck = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`)
var (
_Aa = []byte{'A': 'a', 'B': 'b', 'C': 'c', 'D': 'd', 'E': 'e', 'F': 'f', 'G': 'g', 'H': 'h', 'I': 'i', 'J': 'j', 'K': 'k', 'L': 'l', 'M': 'm', 'N': 'n', 'O': 'o', 'P': 'p', 'Q': 'q', 'R': 'r', 'S': 's', 'T': 't', 'U': 'u', 'V': 'v', 'W': 'w', 'X': 'x', 'Y': 'y', 'Z': 'z', ':': '0', '_': '1', '@': '2'}
_aA = []byte{'a': 'A', 'b': 'B', 'c': 'C', 'd': 'D', 'e': 'E', 'f': 'F', 'g': 'G', 'h': 'H', 'i': 'I', 'j': 'J', 'k': 'K', 'l': 'L', 'm': 'M', 'n': 'N', 'o': 'O', 'p': 'P', 'q': 'Q', 'r': 'R', 's': 'S', 't': 'T', 'u': 'U', 'v': 'V', 'w': 'W', 'x': 'X', 'y': 'Y', 'z': 'Z', '0': ':', '1': '_', '2': '@'}
)
func EncodeK8sResName(name string) (string, error) {
// if match success, return
if k8sNameCheck.MatchString(name) {
return name, nil
}
bs := util_store.NewBitset()
bf := []byte(name)
for index, char := range bf {
if char < uint8(len(_Aa)) && _Aa[char] != 0 {
bf[index] = _Aa[char]
bs.Set(int32(index))
}
}
// return transString.base32toTransIndex.re
return string(bf) + "." + bs.Encode() + ".re", nil
}
func DecodeK8sResName(name string) (string, error) {
// check is encoded name
raw, ok := strings.CutSuffix(name, ".re")
if !ok {
return name, nil
}
i := strings.LastIndex(raw, ".")
if i == -1 {
return "", store.PreconditionFormatError("DecodeK8sResName fail")
}
bs := util_store.NewBitset()
err := bs.Decode(raw[i+1:])
if err != nil {
return "", store.PreconditionFormatError(err.Error())
}
bt := []byte(raw[:i])
bs.Range(func(idx int, val bool) bool {
if idx >= len(bt) {
return false
}
if val && len(_aA) > int(bt[idx]) && _aA[bt[idx]] != 0 {
bt[idx] = _aA[bt[idx]]
}
return true
})
return string(bt), nil
}