recconf/recconf.go (890 lines of code) (raw):

package recconf import ( "encoding/json" "errors" "os" "reflect" "github.com/alibaba/pairec/v2/config" ) var ( Config *RecommendConfig adapterName = "json" //pairecConfigAdapterName = "pairec_config" ) const ( DaoConf_Adapter_Mysql = "mysql" DaoConf_Adapter_Redis = "redis" DaoConf_Adapter_TableStore = "tablestore" DaoConf_Adapter_HBase = "hbase" DaoConf_Adapter_Hologres = "hologres" DataSource_Type_Kafka = "kafka" DataSource_Type_Datahub = "datahub" DataSource_Type_ClickHouse = "clickhouse" DataSource_Type_BE = "be" DataSource_Type_Lindorm = "lindorm" DataSource_Type_HBase_Thrift = "hbase_thrift" DataSource_Type_FeatureStore = "featurestore" Datasource_Type_Graph = "graph" BE_RecallType_X2I = "x2i_recall" BE_RecallType_Vector = "vector_recall" BE_RecallType_MultiMerge = "multi_merge_recall" ) func init() { Config = newRecommendConfig() } type RecommendConfig struct { RunMode string // Run Mode: daily | product Region string ListenConf ListenConfig UserFeatureConfs map[string]SceneFeatureConfig FeatureConfs map[string]SceneFeatureConfig SortNames map[string][]string FilterNames map[string][]string AlgoConfs []AlgoConfig RecallConfs []RecallConfig FilterConfs []FilterConfig BeFilterConfs []BeFilterConfig SortConfs []SortConfig RedisConfs map[string]RedisConfig MysqlConfs map[string]MysqlConfig ClickHouseConfs map[string]ClickHouseConfig HologresConfs map[string]HologresConfig LindormConfs map[string]LindormConfig GraphConfs map[string]GraphConfig FeatureStoreConfs map[string]FeatureStoreConfig KafkaConfs map[string]KafkaConfig SlsConfs map[string]SlsConfig DatahubConfs map[string]DatahubConfig BEConfs map[string]BEConfig Ha3EngineConfs map[string]Ha3EngineConfig OpenSearchConfs map[string]OpenSearchConfig HBaseConfs map[string]HBaseConfig HBaseThriftConfs map[string]HBaseThriftConfig TableStoreConfs map[string]TableStoreConfig SceneConfs map[string]map[string]CategoryConfig RankConf map[string]RankConfig LogConf LogConfig ABTestConf ABTestConfig CallBackConfs map[string]CallBackConfig EmbeddingConfs map[string]EmbeddingConfig GeneralRankConfs map[string]GeneralRankConfig ColdStartGeneralRankConfs map[string]ColdStartGeneralRankConfig ColdStartRankConfs map[string]ColdStartRankConfig DPPConf []DPPSortConfig DebugConfs map[string]DebugConfig FeatureLogConfs map[string]FeatureLogConfig PipelineConfs map[string][]PipelineConfig PrometheusConfig PrometheusConfig UserDefineConfs json.RawMessage } type ListenConfig struct { HttpAddr string HttpPort int } type PrometheusConfig struct { Enable bool Subsystem string PushGatewayURL string PushIntervalSecs int Job string ReqDurBuckets []float64 ReqSizeBuckets []float64 RespSizeBuckets []float64 } type DaoConfig struct { Adapter string AdapterType string RedisName string RedisPrefix string RedisDataType string RedisFieldType string RedisDefaultKey string RedisValueDelimeter string MysqlName string MysqlTable string Config string TableStoreName string TableStoreTableName string HBasePrefix string HBaseName string HBaseTable string ColumnFamily string Qualifier string ItemIdField string ItemScoreField string // hologres HologresName string HologresTableName string // clickhouse ClickHouseName string ClickHouseTableName string // be engine BeName string BizName string BeRecallName string BeTableName string BeExposureUserIdName string BeExposureItemIdName string // feature store FeatureStoreName string FeatureStoreModelName string FeatureStoreEntityName string FeatureStoreViewName string // graph GraphName string InstanceId string TableName string UserNode string ItemNode string Edge string // lindorm LindormTableName string LindormName string } type SceneFeatureConfig struct { FeatureLoadConfs []FeatureLoadConfig AsynLoadFeature bool } type FeatureLoadConfig struct { FeatureDaoConf FeatureDaoConfig Features []FeatureConfig } type BeRTCntFieldConfig struct { FieldNames []string // if not set, Delims is empty string by default // which indicates it is single value with no delimiter Delims []string // if not set, Alias is set to FieldNames[0] by default Alias string } type FeatureDaoConfig struct { DaoConfig NoUsePlayTimeField bool FeatureKey string FeatureStore string // user or item UserFeatureKeyName string ItemFeatureKeyName string TimestampFeatureKeyName string EventFeatureKeyName string PlayTimeFeatureKeyName string TsFeatureKeyName string UserSelectFields string ItemSelectFields string // FeatureAsyncLoad use async goroutine to load feature FeatureAsyncLoad bool FeatureType string // per feature type has different way of build SequenceLength int SequenceName string SequenceEvent string SequenceDelim string // SequencePlayTime filter event by as least play time // like play event need to large than 10s, so set value is "play:10000", timeunit is ms // if has more than one event to filter, use ';' as delim , like "play:10000;read:50000" SequencePlayTime string SequenceOfflineTableName string // SequenceDimFields fetch other dimension fields from db SequenceDimFields string BeItemFeatureKeyName string BeTimestampFeatureKeyName string BeEventFeatureKeyName string BePlayTimeFeatureKeyName string BeIsHomeField string // separate by , if combined by multiple fields separate by : // [depreciated], please set BeRTCntFieldInfo.FieldNames instead BeRTCntFields string BeRTCntFieldInfo []BeRTCntFieldConfig BeRTTable string RTCntWins string RTCntMaxKey int RTCntWinDelay int OutRTCntFeaPattern string OutHomeRTCntFeaPattern string // [depreciated], please set BeRTCntFieldInfo.Alias instead OutRTCntFieldAlias string OutRTCntWinNames string OutEventName string // CacheFeaturesName When use UserFeatureConfs, can cache load features to the user cacheFeatures map. // Only valid when FeatureStore = user CacheFeaturesName string // LoadFromCacheFeaturesName load features from user cacheFeatures map // Only valid when FeatureStore = user LoadFromCacheFeaturesName string // CacheSize cache featrue data size CacheSize int //CacheTime cache featrue data time in second CacheTime int } type FeatureConfig struct { FeatureType string FeatureName string FeatureSource string FeatureValue string FeatureStore string // user or item RemoveFeatureSource bool // delete feature source Normalizer string Expression string } type AlgoConfig struct { Name string Type string EasConf EasConfig VectorConf VectorConfig MilvusConf MilvusConfig LookupConf LookupConfig SeldonConf SeldonConfig TFservingConf TFservingConfig } type PIDControllerConfig struct { AllocateExperimentWise bool DefaultKp float64 DefaultKi float64 DefaultKd float64 Timestamp int64 AheadMinutes int IntegralMin float64 IntegralMax float64 IntegralThreshold float64 ErrThreshold float64 ErrDiscount float64 BoostScoreConditions []BoostScoreCondition } type LookupConfig struct { FieldName string } type EasConfig struct { Processor string Url string Auth string EndpointType string SignatureName string Timeout int RetryTimes int ResponseFuncName string Outputs []string ModelName string } type TFservingConfig struct { Url string SignatureName string Timeout int RetryTimes int ResponseFuncName string Outputs []string } type SeldonConfig struct { Url string ResponseFuncName string } type VectorConfig struct { ServerAddress string Timeout int64 } type MilvusConfig struct { ServerAddress string Timeout int64 } type RecallConfig struct { Name string RecallType string RecallCount int RecallAlgo string VectorAlgoType string ItemType string CacheAdapter string CacheConfig string CachePrefix string CacheTime int // cache time by seconds Triggers []TriggerConfig HologresVectorConf HologresVectorConfig BeVectorConf BeVectorConfig MilvusVectorConf MilvusVectorConfig UserCollaborativeDaoConf UserCollaborativeDaoConfig ItemCollaborativeDaoConf ItemCollaborativeDaoConfig User2ItemDaoConf User2ItemDaoConfig UserTopicDaoConf UserTopicDaoConfig DaoConf DaoConfig VectorDaoConf VectorDaoConfig ColdStartDaoConf ColdStartDaoConfig RealTimeUser2ItemDaoConf RealTimeUser2ItemDaoConfig UserFeatureConfs []FeatureLoadConfig // get user features // be recall config BeConf BeConfig GraphConf GraphConf OpenSearchConf OpenSearchConf } type GraphConf struct { GraphName string ItemId string QueryString string Params []string } type OpenSearchConf struct { OpenSearchName string AppName string ItemId string RequestParams map[string]any Params []string } type BeConfig struct { Count int BeName string BizName string BeRecallType string RecallNameMapping map[string]RecallNameMappingConfig BeRecallParams []BeRecallParam BeFilterNames []string BeABParams map[string]interface{} } type RecallNameMappingConfig struct { Format string Fields []string } type BeRecallParam struct { Count int Priority int RecallType string RecallName string ScorerClause string TriggerType string // user or be or fixvalue or user_vector UserTriggers []TriggerConfig TriggerValue string // TriggerParam BeTriggerParam //RecallParamName string UserVectorTrigger UserVectorTriggerConfig UserTriggerDaoConf UserTriggerDaoConfig // online table for u2i UserTriggerRulesConf UserTriggerRulesConfig // be recall diversity trigger, trigger have diff recall count UserCollaborativeDaoConf UserCollaborativeDaoConfig // offline table for u2i UserRealtimeEmbeddingTrigger UserRealtimeEmbeddingTriggerConfig // get user feature and invoke eas model, get item embedding sink to be UserEmbeddingO2OTrigger UserEmbeddingO2OTriggerConfig ItemIdName string TriggerIdName string RecallTableName string DiversityParam string CustomParams map[string]interface{} } type UserTriggerRulesConfig struct { DefaultValue int TriggerCounts []int } type UserVectorTriggerConfig struct { CacheTime int CachePrefix string RecallAlgo string UserFeatureConfs []FeatureLoadConfig // get user features } type UserEmbeddingO2OTriggerConfig struct { BizName string RecallName string BeName string SeqDelimiter string // seq feature delimiter MultiValueDelimiter string // multi value feature delimiter UserFeatureConfs []FeatureLoadConfig // get user features } type UserRealtimeEmbeddingTriggerConfig struct { Debug bool DebugLogDatahub string EmbeddingNum int RecallAlgo string DistinctParamName string DistinctParamValue string UserFeatureConfs []FeatureLoadConfig // get user features } type BeTriggerParam struct { BizName string FieldName string } type ColdStartDaoConfig struct { SqlDaoConfig TimeInterval int // second } type SqlDaoConfig struct { DaoConfig Limit int WhereClause string PrimaryKey string SelectFields string OrderBy string } type RealTimeUser2ItemDaoConfig struct { UserTriggerDaoConf UserTriggerDaoConfig Item2ItemTable string SimilarItemIdField string SimilarItemScoreField string Item2XTable string X2ItemTable string XKey string XDelimiter string // FeatureStore i2i feature view name Item2ItemFeatureViewName string } type UserTriggerDaoConfig struct { SqlDaoConfig NoUsePlayTimeField bool ItemCount int TriggerCount int EventPlayTime string EventWeight string WeightExpression string WeightMode string PropertyFields []string DiversityRules []TriggerDiversityRuleConfig BeItemFeatureKeyName string BeTimestampFeatureKeyName string BeEventFeatureKeyName string BePlayTimeFeatureKeyName string ItemIdFieldName string TimestampFieldName string EventFieldName string PlayTimeFieldName string } type TriggerDiversityRuleConfig struct { Dimensions []string Size int } type HologresVectorConfig struct { HologresName string VectorTable string // example: "item_emb_{partition}", '{partition}' will be replaced by partition info VectorKeyField string VectorEmbeddingField string WhereClause string TimeInterval int } type BeVectorConfig struct { BizName string // VectorKeyField string VectorEmbeddingField string } type MilvusVectorConfig struct { VectorKeyField string VectorEmbeddingField string CollectionName string MetricType string SearchParams map[string]interface{} } type UserCollaborativeDaoConfig struct { DaoConfig User2ItemTable string Item2ItemTable string User2ItemFeatureViewName string Item2ItemFeatureViewName string Item2XTable string X2ItemTable string XKey string XDelimiter string Normalization string // set "on" to enable it, otherwise set "off", enabled by default } type ItemCollaborativeDaoConfig struct { DaoConfig Item2ItemTable string } type User2ItemDaoConfig struct { DaoConfig User2ItemTable string Item2ItemTable string } type UserTopicDaoConfig struct { DaoConfig UserTopicTable string TopicItemTable string IndexName string } type VectorDaoConfig struct { DaoConfig EmbeddingField string KeyField string // set the following fields to get partition info, // if not set, '{partition}' in table name won't be replaced (if it exists) PartitionInfoTable string PartitionInfoField string } type GraphConfig struct { Host string UserName string Passwd string } type RedisConfig struct { Host string Port int Password string DbNum int MaxIdle int ConnectTimeout int ReadTimeout int WriteTimeout int } type MysqlConfig struct { DSN string } type ClickHouseConfig struct { DSN string } type HologresConfig struct { DSN string } type LindormConfig struct { Url string User string Password string Database string } type FeatureStoreConfig struct { AccessId string AccessKey string RegionId string ProjectName string FeatureDBUsername string FeatureDBPassword string HologresPort int } type KafkaConfig struct { BootstrapServers string Topic string } type DatahubConfig struct { AccessId string AccessKey string Endpoint string ProjectName string TopicName string Schemas []DatahubTopicSchema } type BEConfig struct { Username string Password string Endpoint string ReleaseType string // values: product or dev or prepub } type Ha3EngineConfig struct { Username string Password string Endpoint string InstanceId string } type OpenSearchConfig struct { EndPoint string AccessKeyId string AccessKeySecret string } type DatahubTopicSchema struct { Field string //Type is the type of the datahub tuple field,valid value is string, integer Type string } type HBaseConfig struct { ZKQuorum string } type HBaseThriftConfig struct { Host string User string Password string } type TableStoreConfig struct { EndPoint string InstanceName string AccessKeyId string AccessKeySecret string RoleArn string } type SlsConfig struct { EndPoint string AccessKeyId string AccessKeySecret string ProjectName string LogstoreName string } type SceneConfig struct { Categories []string } type CategoryConfig struct { RecallNames []string } type RankConfig struct { RankAlgoList []string RankScore string Processor string ContextFeatures []string ItemFeatures []string BatchCount int ScoreRewrite map[string]string ASTType string } type ActionConfig struct { ActionType string ActionName string } type OperatorValueConfig struct { Type string // "property", "function" Name string From string // item or user } type LogConfig struct { RetensionDays int DiskSize int // unit : G, if value = 20, the true size is 20G LogLevel string // valid value is DEBUG, INFO , ERROR , FATAL Output string // valid value is file, console SLSName string } type ABTestConfig struct { Host string Token string } type FilterConfig struct { Name string FilterType string DaoConf DaoConfig MaxItems int TimeInterval int // second RetainNum int ShuffleItem bool WriteLog bool ClearLogIfNotEnoughScene string OnlyLogUserExposeFlag bool Dimension string ScoreWeight float64 GroupMinNum int GroupMaxNum int GroupWeightStrategy string GroupWeightDimensionLimit map[string]int WriteLogExcludeScenes []string GenerateItemDataFuncName string AdjustCountConfs []AdjustCountConfig ItemStateDaoConf ItemStateDaoConfig FilterEvaluableExpression string FilterParams []FilterParamConfig DiversityDaoConf DiversityDaoConfig DiversityMinCount int EnsureDiversity bool FilterVal FilterValue ItemStateCacheSize int ItemStateCacheTime int Conditions []FilterParamConfig ConditionFilterConfs struct { FilterConfs []struct { Conditions []FilterParamConfig FilterName string } DefaultFilterName string } } type BeFilterConfig struct { FilterConfig } type FilterValue struct { SelectCol string WhereClause string } type SortConfig struct { Debug bool RemainItem bool Name string SortType string SortByField string SwitchThreshold float64 DiversitySize int Size int DPPConf DPPSortConfig SSDConf SSDSortConfig PIDConf PIDControllerConfig MixSortRules []MixSortConfig BoostScoreConditionsFilterAll bool BoostScoreConditions []BoostScoreCondition DistinctIdConditions []DistinctIdCondition Conditions []FilterParamConfig ExcludeRecalls []string DiversityRules []DiversityRuleConfig TimeInterval int BoostScoreByWeightDao BoostScoreByWeightDaoConfig } type BoostScoreByWeightDaoConfig struct { DaoConfig ItemFieldName string WeightFieldName string } type MixSortConfig struct { MixStrategy string // fix_position, random_position Positions []int PositionField string Number int NumberRate float64 RecallNames []string Conditions []FilterParamConfig } type DiversityRuleConfig struct { Dimensions []string IntervalSize int WindowSize int FrequencySize int } type FilterParamConfig struct { Name string Domain string Operator string Type string // string, int, int64 Value interface{} Configs []FilterParamConfig } type BoostScoreCondition struct { Expression string Conditions []FilterParamConfig } type DistinctIdCondition struct { DistinctId int Conditions []FilterParamConfig } type ItemStateDaoConfig struct { DaoConfig ItemFieldName string WhereClause string SelectFields string } type DiversityDaoConfig struct { DaoConfig ItemKeyField string DistinctFields []string CacheTimeInMinutes int } type AdjustCountConfig struct { RecallName string Count int Type string Expression string } type CallBackConfig struct { DataSource DataSourceConfig RankConf RankConfig RawFeatures bool RawFeaturesRate int } type EmbeddingConfig struct { DataSource DataSourceConfig RankConf RankConfig } type GeneralRankConfig struct { FeatureLoadConfs []FeatureLoadConfig RankConf RankConfig ActionConfs []ActionConfig } type ColdStartGeneralRankConfig struct { GeneralRankConfig RecallNames []string } type ColdStartRankConfig struct { RecallName string AlgoName string OnlyEmbeddingFeature bool } type DataSourceConfig struct { Name string Type string } type TriggerConfig struct { TriggerKey string DefaultValue string Boundaries []int } type DPPSortConfig struct { Name string DaoConf DaoConfig TableName string TableSuffixParam string TablePKey string EmbeddingColumn string EmbeddingSeparator string Alpha float64 CacheTimeInMinutes int EmbeddingHookNames []string NormalizeEmb string WindowSize int AbortRunCount int CandidateCount int MinScorePercent float64 EmbMissedThreshold float64 FilterRetrieveIds []string EnsurePositiveSim string } type SSDSortConfig struct { Name string DaoConf DaoConfig TableName string TableSuffixParam string TablePKey string EmbeddingColumn string EmbeddingSeparator string Gamma float64 UseSSDStar bool CacheTimeInMinutes int NormalizeEmb string WindowSize int AbortRunCount int CandidateCount int MinScorePercent float64 EmbMissedThreshold float64 FilterRetrieveIds []string EnsurePositiveSim string Condition *BoostScoreCondition } type DebugConfig struct { Rate int DebugUsers []string // OutputType represent log write to console or datahub or file OutputType string DatahubName string KafKaName string FilePath string MaxFileNum int } type FeatureLogConfig struct { OutputType string DatahubName string KafKaName string UserFeatures string ItemFeatures string } type PipelineConfig struct { Name string RecallNames []string FilterNames []string GeneralRankConf GeneralRankConfig FeatureLoadConfs []FeatureLoadConfig RankConf RankConfig ColdStartRankConf ColdStartRankConfig SortNames []string } func newRecommendConfig() *RecommendConfig { conf := RecommendConfig{ RunMode: "daily", ListenConf: ListenConfig{ HttpAddr: "", HttpPort: 8000, }, } return &conf } func CopyConfig(src, dst *RecommendConfig, filters ...func(string) bool) { srcVal := reflect.ValueOf(src).Elem() srcType := reflect.TypeOf(src).Elem() dstVal := reflect.ValueOf(dst).Elem() numOfFields := srcVal.NumField() for i := 0; i < numOfFields; i++ { fieldType := srcType.Field(i) flag := true for _, filter := range filters { flag = filter(fieldType.Name) if !flag { break } } if !flag { continue } elemField := dstVal.FieldByName(fieldType.Name) if elemField.CanSet() { fieldVal := srcVal.Field(i) elemField.Set(fieldVal) } } } func loadConfigFromFile(filePath string) error { configer, err := config.NewConfig(adapterName, filePath) if err != nil { return err } rawdata := configer.RawData() err = json.Unmarshal(rawdata, Config) if err != nil { return err } return nil } // LoadConfig load config from file or pairec config server // First check the environment CONFIG_NAME, if exist, load config data from pairec config server func LoadConfig(filePath string) error { if filePath == "" { filePath = os.Getenv("CONFIG_PATH") } if filePath == "" { return errors.New("config file path empty") } return loadConfigFromFile(filePath) } var notifyCh = make([]chan *RecommendConfig, 0) func Subscribe() <-chan *RecommendConfig { ch := make(chan *RecommendConfig) notifyCh = append(notifyCh, ch) return ch } func UpdateConf(conf *RecommendConfig) { Config = conf go func() { for _, ch := range notifyCh { ch <- conf } }() }