pkg/apisix/utils.go (264 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apisix import ( "bytes" "context" "crypto/aes" "crypto/cipher" "encoding/base64" "encoding/json" "errors" "reflect" "go.uber.org/zap" "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" "github.com/apache/apisix-ingress-controller/pkg/log" v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) var ( ErrUnknownApisixResourceType = errors.New("unknown apisix resource type") ) type ResourceTypes interface { *v1.Route | *v1.Ssl | *v1.Upstream | *v1.StreamRoute | *v1.GlobalRule | *v1.Consumer | *v1.PluginConfig } func skipRequest[T ResourceTypes](cluster *cluster, shouldCompare bool, url, id string, obj T) (T, bool) { if cluster.syncComparison && shouldCompare { var generatedObj T resourceType := "" // GlobalRule and Consumer has Plugins field which type will mismatch in DeepEqual switch (interface{})(generatedObj).(type) { case *v1.GlobalRule: generatedObj = obj resourceType = "global_rule" case *v1.Consumer: generatedObj = obj resourceType = "consumer" } if generatedObj == nil { generatedObj = obj } else { j, err := json.Marshal(generatedObj) if err != nil { log.Debugw("sync comparison continue operation", zap.String("reason", "failed to marshal object"), zap.Error(err), zap.Any("resource", resourceType), zap.Any("obj", generatedObj), ) return nil, false } err = json.Unmarshal(j, &generatedObj) if err != nil { log.Debugw("sync comparison continue operation", zap.String("reason", "failed to unmarshal object"), zap.Error(err), zap.Any("resource", resourceType), zap.Any("json", j), ) return nil, false } } var ( // generated object may be different from server response object, // so we need another cache to store generated objs cachedGeneratedObj interface{} err error ) // type-switch on parametric types is not implemented yet switch (interface{})(generatedObj).(type) { case *v1.Route: cachedGeneratedObj, err = cluster.generatedObjCache.GetRoute(id) resourceType = "route" case *v1.Ssl: cachedGeneratedObj, err = cluster.generatedObjCache.GetSSL(id) resourceType = "ssl" case *v1.Upstream: cachedGeneratedObj, err = cluster.generatedObjCache.GetUpstream(id) resourceType = "upstream" case *v1.StreamRoute: cachedGeneratedObj, err = cluster.generatedObjCache.GetStreamRoute(id) resourceType = "stream_route" case *v1.GlobalRule: cachedGeneratedObj, err = cluster.generatedObjCache.GetGlobalRule(id) resourceType = "global_rule" case *v1.Consumer: cachedGeneratedObj, err = cluster.generatedObjCache.GetConsumer(id) resourceType = "consumer" case *v1.PluginConfig: cachedGeneratedObj, err = cluster.generatedObjCache.GetPluginConfig(id) resourceType = "plugin_config" //case *v1.PluginMetadata: default: log.Errorw("resource comparison aborted", zap.Error(ErrUnknownApisixResourceType), zap.Any("obj", generatedObj), ) return nil, false } if err == nil && cachedGeneratedObj != nil { if reflect.DeepEqual(cachedGeneratedObj, generatedObj) { var ( expectedServerObj interface{} ) switch (interface{})(generatedObj).(type) { case *v1.Route: expectedServerObj, err = cluster.cache.GetRoute(id) case *v1.Ssl: expectedServerObj, err = cluster.cache.GetSSL(id) if err == nil { expectedServerObj.(*v1.Ssl).Key = "" } case *v1.Upstream: expectedServerObj, err = cluster.cache.GetUpstream(id) case *v1.StreamRoute: expectedServerObj, err = cluster.cache.GetStreamRoute(id) case *v1.GlobalRule: expectedServerObj, err = cluster.cache.GetGlobalRule(id) case *v1.Consumer: expectedServerObj, err = cluster.cache.GetConsumer(id) case *v1.PluginConfig: expectedServerObj, err = cluster.cache.GetPluginConfig(id) } if err == nil && expectedServerObj != nil { // Now we have the expected server obj, compare to actual object in APISIX var ( serverObj interface{} ) switch (interface{})(generatedObj).(type) { case *v1.Route: serverObj, err = cluster.GetRoute(context.Background(), url, id) case *v1.Ssl: serverObj, err = cluster.GetSSL(context.Background(), url, id) case *v1.Upstream: serverObj, err = cluster.GetUpstream(context.Background(), url, id) case *v1.StreamRoute: serverObj, err = cluster.GetStreamRoute(context.Background(), url, id) case *v1.GlobalRule: serverObj, err = cluster.GetGlobalRule(context.Background(), url, id) case *v1.Consumer: serverObj, err = cluster.GetConsumer(context.Background(), url, id) case *v1.PluginConfig: serverObj, err = cluster.GetPluginConfig(context.Background(), url, id) } if err == nil && serverObj != nil { if reflect.DeepEqual(expectedServerObj, serverObj) { log.Debugw("sync comparison skipped same resource", zap.String("reason", "equal"), zap.String("resource", resourceType), zap.Any("obj", generatedObj), zap.Any("cached", cachedGeneratedObj), ) return expectedServerObj.(T), true } log.Debugw("sync comparison continue operation", zap.String("reason", "cached server object doesn't match APISIX object"), zap.String("resource", resourceType), zap.Error(err), zap.String("id", id), zap.Any("cached_obj", expectedServerObj), zap.Any("server_obj", serverObj), ) return nil, false } log.Debugw("sync comparison continue operation", zap.String("reason", "failed to get object from APISIX"), zap.String("resource", resourceType), zap.Error(err), zap.String("id", id), ) return nil, false } log.Debugw("sync comparison continue operation", zap.String("reason", "failed to get cached server object"), zap.String("resource", resourceType), zap.Error(err), zap.String("id", id), ) return nil, false } log.Debugw("sync comparison continue operation", zap.String("reason", "controller generated object doesn't match"), zap.String("resource", resourceType), zap.Any("obj", generatedObj), zap.Any("cached", cachedGeneratedObj), ) return nil, false } else if err == cache.ErrNotFound { log.Debugw("sync comparison continue operation", zap.String("reason", "not in cache"), zap.String("resource", resourceType), zap.String("id", id), zap.Any("obj", generatedObj), zap.Any("cached", cachedGeneratedObj), ) return nil, false } else { log.Debugw("sync comparison continue operation", zap.Error(err), zap.String("reason", "failed to get cached generated object"), zap.String("resource", resourceType), zap.String("id", id), zap.Any("obj", generatedObj), zap.Any("cached", cachedGeneratedObj), ) return nil, false } } return nil, false } func CompareResourceEqualFromCluster[T ResourceTypes](cluster *cluster, id string, Resource T) bool { var old any switch (interface{})(Resource).(type) { case *v1.Route: old, _ = cluster.cache.GetRoute(id) case *v1.Ssl: old, _ = cluster.cache.GetSSL(id) case *v1.Upstream: old, _ = cluster.cache.GetUpstream(id) case *v1.StreamRoute: old, _ = cluster.cache.GetStreamRoute(id) case *v1.GlobalRule: old, _ = cluster.cache.GetGlobalRule(id) case *v1.Consumer: old, _ = cluster.cache.GetConsumer(id) case *v1.PluginConfig: old, _ = cluster.cache.GetPluginConfig(id) } if old == nil { return false } return reflect.DeepEqual(old, Resource) } func PKCS5Padding(plaintext []byte, blockSize int) []byte { padding := blockSize - len(plaintext)%blockSize padtext := bytes.Repeat([]byte{byte(padding)}, padding) return append(plaintext, padtext...) } func PKCS5UnPadding(origData []byte) []byte { length := len(origData) unpadding := int(origData[length-1]) return origData[:(length - unpadding)] } func AesEncrypt(origData, key []byte) ([]byte, error) { block, err := aes.NewCipher(key) if err != nil { return nil, err } blockSize := block.BlockSize() origData = PKCS5Padding(origData, blockSize) blockMode := cipher.NewCBCEncrypter(block, key[:blockSize]) crypted := make([]byte, len(origData)) blockMode.CryptBlocks(crypted, origData) return crypted, nil } func AesDecrypt(crypted, key []byte) ([]byte, error) { block, err := aes.NewCipher(key) if err != nil { return nil, err } blockSize := block.BlockSize() blockMode := cipher.NewCBCDecrypter(block, key[:blockSize]) origData := make([]byte, len(crypted)) blockMode.CryptBlocks(origData, crypted) origData = PKCS5UnPadding(origData) return origData, nil } func AesEencryptPrivatekey(data []byte, aeskey []byte) (string, error) { xcode, err := AesEncrypt(data, aeskey) if err != nil { return "", err } return base64.StdEncoding.EncodeToString(xcode), nil }