configloader.go (141 lines of code) (raw):

package pairec import ( "encoding/json" "fmt" "os" "strings" "time" "github.com/alibaba/pairec/v2/datasource/graph" "github.com/alibaba/pairec/v2/datasource/hbase_thrift" "github.com/alibaba/pairec/v2/datasource/kafka" "github.com/alibaba/pairec/v2/datasource/opensearch" "github.com/alibaba/pairec/v2/abtest" "github.com/alibaba/pairec/v2/algorithm" "github.com/alibaba/pairec/v2/config" "github.com/alibaba/pairec/v2/config/pairec_config" "github.com/alibaba/pairec/v2/datasource/beengine" "github.com/alibaba/pairec/v2/datasource/datahub" "github.com/alibaba/pairec/v2/datasource/ha3engine" "github.com/alibaba/pairec/v2/datasource/hbase" "github.com/alibaba/pairec/v2/datasource/sls" "github.com/alibaba/pairec/v2/filter" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/clickhouse" "github.com/alibaba/pairec/v2/persist/fs" "github.com/alibaba/pairec/v2/persist/holo" "github.com/alibaba/pairec/v2/persist/lindorm" "github.com/alibaba/pairec/v2/persist/mysqldb" "github.com/alibaba/pairec/v2/persist/redisdb" "github.com/alibaba/pairec/v2/persist/tablestoredb" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/service" "github.com/alibaba/pairec/v2/service/feature" "github.com/alibaba/pairec/v2/service/general_rank" "github.com/alibaba/pairec/v2/service/pipeline" "github.com/alibaba/pairec/v2/service/recall/berecall" "github.com/alibaba/pairec/v2/sort" ) var ( loader *ConfigLoader pairecConfigAdapterName = "pairec_config" ) type ConfigLoader struct { configName string configVersion string configVersionValue string } func NewConfigLoader(configName string) *ConfigLoader { l := &ConfigLoader{configName: configName, configVersion: configName + "_version"} return l } func (l *ConfigLoader) loopLoadConfig() { for { time.Sleep(time.Second * 10) version := l.loadConfigVersion() // version changed if version != "" && version != l.configVersionValue { config, err := l.loadConfigFromConfigServer() if err != nil { fmt.Println(err) continue } l.configVersionValue = version log.Info(fmt.Sprintf("config version changed, version:%s, reload config", l.configVersionValue)) func() { defer func() { if err := recover(); err != nil { log.Error(fmt.Sprintf("reload config error, error:%v", err)) } }() l.reloadConfig(config) }() recconf.UpdateConf(config) } } } func (l *ConfigLoader) reloadConfig(config *recconf.RecommendConfig) { mysqldb.Load(config) redisdb.Load(config) tablestoredb.Load(config) sls.Load(config) kafka.Load(config) datahub.Load(config) beengine.Load(config) graph.Load(config) ha3engine.Load(recconf.Config) opensearch.Load(recconf.Config) hbase.Load(config) holo.Load(config) lindorm.Load(config) hbase_thrift.Load(config) fs.Load(config) clickhouse.Load(config) algorithm.Load(config) // holo must be loaded before loading some algorithm register(config) filter.RegisterFilterWithConfig(config) filter.Load(config) berecall.RegisterFilterWithConfig(config) sort.Load(config) service.Load(config) feature.UserLoadFeatureConfig(config) feature.LoadFeatureConfig(config) general_rank.LoadGeneralRankWithConfig(config) pipeline.LoadPipelineConfigs(config) } func (l *ConfigLoader) loadConfigFromConfigServer() (*recconf.RecommendConfig, error) { configer, err := config.NewConfig(pairecConfigAdapterName, l.configName) if err != nil { return nil, err } rawdata := configer.RawData() accessId := os.Getenv("AccessKey") accessSecret := os.Getenv("AccessSecret") data := string(rawdata) data = strings.ReplaceAll(data, "${AccessKey}", accessId) data = strings.ReplaceAll(data, "${AccessSecret}", accessSecret) configD := &recconf.RecommendConfig{} err = json.Unmarshal([]byte(data), configD) if err != nil { return nil, err } runMode := os.Getenv("PAIREC_ENVIRONMENT") if runMode == "" { configD.RunMode = "product" } else { configD.RunMode = runMode } return configD, nil } func (l *ConfigLoader) loadConfigVersion() string { return abtest.GetParams(pairec_config.Pairec_Config_Scene_Name).GetString(l.configVersion, "") } // ListenConfig init a instace of ConfigLoader // ConfigLoader will loop load paire config from server when the config version change func ListenConfig(configName string) { loader = NewConfigLoader(configName) config, err := loader.loadConfigFromConfigServer() if err != nil { panic(err) } recconf.UpdateConf(config) version := loader.loadConfigVersion() loader.configVersionValue = version go loader.loopLoadConfig() }