pkg/config/api_config.go (371 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package config import ( "regexp" "strconv" "strings" "sync" "time" ) import ( fc "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" perrors "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/common/yaml" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" ) var ( apiConfig *fc.APIConfig client *etcdv3.Client listener APIConfigResourceListener lock sync.RWMutex ) var ( BASE_INFO_NAME = "name" BASE_INFO_DESC = "description" ) // APIConfigResourceListener defines api resource and method config listener interface type APIConfigResourceListener interface { // ResourceChange handle modify resource event ResourceChange(new fc.Resource, old fc.Resource) bool // bool is return for interface implement is interesting // ResourceAdd handle add resource event ResourceAdd(res fc.Resource) bool // ResourceDelete handle delete resource event ResourceDelete(deleted fc.Resource) bool // MethodChange handle modify method event MethodChange(res fc.Resource, method fc.Method, old fc.Method) bool // MethodAdd handle add method below one resource event MethodAdd(res fc.Resource, method fc.Method) bool // MethodDelete handle delete method event MethodDelete(res fc.Resource, method fc.Method) bool } // LoadAPIConfigFromFile load the api config from file func LoadAPIConfigFromFile(path string) (*fc.APIConfig, error) { if len(path) == 0 { return nil, perrors.Errorf("Config file not specified") } logger.Infof("Load API configuration file form %s", path) apiConf := &fc.APIConfig{} err := yaml.UnmarshalYMLConfig(path, apiConf) if err != nil { return nil, perrors.Errorf("unmarshalYmlConfig error %s", perrors.WithStack(err)) } apiConfig = apiConf return apiConf, nil } // LoadAPIConfig load the api config from config center func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) { tmpClient, err := etcdv3.NewConfigClientWithErr( etcdv3.WithName(etcdv3.RegistryETCDV3Client), etcdv3.WithTimeout(10*time.Second), etcdv3.WithEndpoints(strings.Split(metaConfig.Address, ",")...), ) if err != nil { return nil, perrors.Errorf("Init etcd client fail error %s", err) } client = tmpClient kList, vList, err := client.GetChildren(metaConfig.APIConfigPath) if err != nil { return nil, perrors.Errorf("Get remote config fail error %s", err) } if err = initAPIConfigFromKVList(kList, vList); err != nil { return nil, err } // TODO: init other setting which need fetch from remote go listenResourceAndMethodEvent(metaConfig.APIConfigPath) // TODO: watch other setting which need fetch from remote return apiConfig, nil } func initAPIConfigFromKVList(kList, vList []string) error { var skList, svList, mkList, mvList []string var baseInfo string for i, k := range kList { v := vList[i] //handle base info re := getCheckBaseInfoRegexp() if m := re.Match([]byte(k)); m { baseInfo = v continue } // handle resource re = getCheckResourceRegexp() if m := re.Match([]byte(k)); m { skList = append(skList, k) svList = append(svList, v) continue } // handle method re = getExtractMethodRegexp() if m := re.Match([]byte(k)); m { mkList = append(mkList, k) mvList = append(mvList, v) continue } } lock.Lock() defer lock.Unlock() tmpApiConf := &fc.APIConfig{} if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil { logger.Errorf("initBaseInfoFromString error %s", err) return err } if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil { logger.Errorf("initAPIConfigServiceFromKvList error %s", err) return err } if err := initAPIConfigMethodFromKvList(tmpApiConf, mkList, mvList); err != nil { logger.Errorf("initAPIConfigMethodFromKvList error %s", err) return err } apiConfig = tmpApiConf return nil } func initBaseInfoFromString(conf *fc.APIConfig, str string) error { properties := make(map[string]string, 8) if err := yaml.UnmarshalYML([]byte(str), properties); err != nil { logger.Errorf("unmarshalYmlConfig error %s", err) return err } if v, ok := properties[BASE_INFO_NAME]; ok { conf.Name = v } if v, ok := properties[BASE_INFO_DESC]; ok { conf.Description = v } return nil } func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error { for i := range kList { v := vList[i] method := &fc.Method{} err := yaml.UnmarshalYML([]byte(v), method) if err != nil { logger.Errorf("unmarshalYmlConfig error %s", err) return err } found := false for r, resource := range config.Resources { if method.ResourcePath != resource.Path { continue } for j, old := range resource.Methods { if old.HTTPVerb == method.HTTPVerb { // modify one method resource.Methods[j] = *method found = true } } if !found { resource.Methods = append(resource.Methods, *method) config.Resources[r] = resource found = true } } // not found one resource, so need add empty resource first if !found { resource := &fc.Resource{} resource.Methods = append(resource.Methods, *method) resource.Path = method.ResourcePath config.Resources = append(config.Resources, *resource) } } return nil } func initAPIConfigServiceFromKvList(config *fc.APIConfig, kList, vList []string) error { for i := range kList { v := vList[i] resource := &fc.Resource{} err := yaml.UnmarshalYML([]byte(v), resource) if err != nil { logger.Errorf("unmarshalYmlConfig error %s", err) return err } found := false if config.Resources == nil { config.Resources = make([]fc.Resource, 0) } for i, old := range config.Resources { if old.Path != resource.Path { continue } // replace old with new one except method list resource.Methods = old.Methods config.Resources[i] = *resource found = true } if !found { config.Resources = append(config.Resources, *resource) } continue } return nil } func listenResourceAndMethodEvent(key string) bool { for { wc, err := client.WatchWithPrefix(key) if err != nil { logger.Warnf("Watch api config {key:%s} = error{%s}", key, err) return false } select { // client stopped case <-client.Done(): logger.Warnf("client stopped") return false // client ctx stop // handle etcd events case e, ok := <-wc: if !ok { logger.Warnf("watch-chan closed") return false } if e.Err() != nil { logger.Errorf("watch ERR {err: %s}", e.Err()) continue } for _, event := range e.Events { switch event.Type { case mvccpb.PUT: logger.Infof("get event (key{%s}) = event{EventNodePut}", event.Kv.Key) handlePutEvent(event.Kv.Key, event.Kv.Value) case mvccpb.DELETE: logger.Infof("get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) handleDeleteEvent(event.Kv.Key, event.Kv.Value) default: logger.Infof("get event (key{%s}) = event{%d}", event.Kv.Key, event.Type) } } } } } func handleDeleteEvent(key, val []byte) { lock.Lock() defer lock.Unlock() keyStr := string(key) keyStr = strings.TrimSuffix(keyStr, "/") re := getCheckResourceRegexp() if m := re.Match(key); m { pathArray := strings.Split(keyStr, "/") if len(pathArray) == 0 { logger.Errorf("handleDeleteEvent key format error") return } resourceIdStr := pathArray[len(pathArray)-1] ID, err := strconv.Atoi(resourceIdStr) if err != nil { logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } deleteApiConfigResource(ID) return } re = getExtractMethodRegexp() if m := re.Match(key); m { pathArray := strings.Split(keyStr, "/") if len(pathArray) < 3 { logger.Errorf("handleDeleteEvent key format error") return } resourceIdStr := pathArray[len(pathArray)-3] resourceId, err := strconv.Atoi(resourceIdStr) if err != nil { logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } methodIdStr := pathArray[len(pathArray)-1] methodId, err := strconv.Atoi(methodIdStr) if err != nil { logger.Errorf("handleDeleteEvent ID is not int error %s", err) return } deleteApiConfigMethod(resourceId, methodId) } } func handlePutEvent(key, val []byte) { lock.Lock() defer lock.Unlock() re := getCheckResourceRegexp() if m := re.Match(key); m { res := &fc.Resource{} err := yaml.UnmarshalYML(val, res) if err != nil { logger.Errorf("handlePutEvent UnmarshalYML error %s", err) return } mergeApiConfigResource(*res) return } re = getExtractMethodRegexp() if m := re.Match(key); m { res := &fc.Method{} err := yaml.UnmarshalYML(val, res) if err != nil { logger.Errorf("handlePutEvent UnmarshalYML error %s", err) return } mergeApiConfigMethod(res.ResourcePath, *res) return } //handle base info re = getCheckBaseInfoRegexp() if m := re.Match(key); m { mergeBaseInfo(val) return } } func deleteApiConfigResource(resourceId int) { for i := 0; i < len(apiConfig.Resources); i++ { itr := apiConfig.Resources[i] if itr.ID == resourceId { apiConfig.Resources = append(apiConfig.Resources[:i], apiConfig.Resources[i+1:]...) listener.ResourceDelete(itr) return } } } func mergeApiConfigResource(val fc.Resource) { for i, resource := range apiConfig.Resources { if val.ID != resource.ID { continue } // modify one resource val.Methods = resource.Methods apiConfig.Resources[i] = val listener.ResourceChange(val, resource) return } // add one resource apiConfig.Resources = append(apiConfig.Resources, val) listener.ResourceAdd(val) } func mergeBaseInfo(val []byte) { _ = initBaseInfoFromString(apiConfig, string(val)) } func deleteApiConfigMethod(resourceId, methodId int) { for _, resource := range apiConfig.Resources { if resource.ID != resourceId { continue } for i := 0; i < len(resource.Methods); i++ { method := resource.Methods[i] if method.ID == methodId { resource.Methods = append(resource.Methods[:i], resource.Methods[i+1:]...) listener.MethodDelete(resource, method) return } } } } func mergeApiConfigMethod(path string, val fc.Method) { for i, resource := range apiConfig.Resources { if path != resource.Path { continue } for j, method := range resource.Methods { if method.ID == val.ID { // modify one method resource.Methods[j] = val listener.MethodChange(resource, val, method) apiConfig.Resources[i] = resource return } } // add one method resource.Methods = append(resource.Methods, val) apiConfig.Resources[i] = resource listener.MethodAdd(resource, val) } } func getCheckBaseInfoRegexp() *regexp.Regexp { return regexp.MustCompile(".+/base$") } func getCheckResourceRegexp() *regexp.Regexp { return regexp.MustCompile(".+/resources/[^/]+/?$") } func getExtractMethodRegexp() *regexp.Regexp { return regexp.MustCompile(".+/resources/([^/]+)/method/[^/]+/?$") } // nolint func getCheckRatelimitRegexp() *regexp.Regexp { return regexp.MustCompile(".+/filter/ratelimit") } // RegisterConfigListener register APIConfigListener func RegisterConfigListener(li APIConfigResourceListener) { listener = li }