service/feature/feature_service.go (182 lines of code) (raw):

package feature import ( "encoding/json" "fmt" "sync" "time" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/service/metrics" "github.com/alibaba/pairec/v2/utils" ) var featureService *FeatureService // FeatureFunc is use for feature engineering, after all the features loaded type FeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item // LoadFeatureFunc is the function you can custom define to load features. type LoadFeatureFunc func(user *module.User, items []*module.Item, context *context.RecommendContext) func init() { featureService = NewFeatureService() } func DefaultFeatureService() *FeatureService { return featureService } func RegisterFeatureFunc(sceneName string, f FeatureFunc) { DefaultFeatureService().AddFeatureFunc(sceneName, f) } func RegisterLoadFeatureFunc(sceneName string, f LoadFeatureFunc) { DefaultFeatureService().AddLoadFeatureFunc(sceneName, f) } type FeatureService struct { FeatureSceneMap map[string][]*Feature FeatureSceneSigns map[string]string FeatureSceneAsyncMap map[string]bool FeatureFuncMap map[string]FeatureFunc LoadFeatureFuncMap map[string]LoadFeatureFunc } func NewFeatureService() *FeatureService { fs := &FeatureService{ FeatureSceneMap: make(map[string][]*Feature), FeatureSceneSigns: make(map[string]string), FeatureSceneAsyncMap: make(map[string]bool), FeatureFuncMap: make(map[string]FeatureFunc), LoadFeatureFuncMap: make(map[string]LoadFeatureFunc), } return fs } func (s *FeatureService) SetFeatureSceneAsync(sceneName string, async bool) { s.FeatureSceneAsyncMap[sceneName] = async } func (s *FeatureService) SetFeatures(sceneName string, features []*Feature) { s.FeatureSceneMap[sceneName] = features } func (s *FeatureService) AddFeatureFunc(sceneName string, f FeatureFunc) { s.FeatureFuncMap[sceneName] = f } func (s *FeatureService) AddLoadFeatureFunc(sceneName string, f LoadFeatureFunc) { s.LoadFeatureFuncMap[sceneName] = f } // LoadFeatures load user or item feature use feature.Feature func (s *FeatureService) LoadFeatures(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item { start := time.Now() var sceneName string if context.ExperimentResult != nil { sceneName = context.ExperimentResult.GetExperimentParams().GetString("features.scene.name", "") } if sceneName == "" { sceneName = context.GetParameter("scene").(string) } if features, ok := s.FeatureSceneMap[sceneName]; ok { async := s.FeatureSceneAsyncMap[sceneName] if async { var wg sync.WaitGroup for _, fea := range features { wg.Add(1) go func(fea *Feature) { defer wg.Done() fea.LoadFeatures(user, items, context) }(fea) } if loadFeatureFunc, exist := s.LoadFeatureFuncMap[sceneName]; exist { wg.Add(1) go func() { defer wg.Done() loadFeatureFunc(user, items, context) }() } wg.Wait() } else { for _, feature := range features { feature.LoadFeatures(user, items, context) } if loadFeatureFunc, exist := s.LoadFeatureFuncMap[sceneName]; exist { loadFeatureFunc(user, items, context) } } featureFunc, ok := s.FeatureFuncMap[sceneName] if ok { items = featureFunc(user, items, context) } } if metrics.Enabled() { metrics.LoadFeatureDurSecs.WithLabelValues(sceneName, "", "before_rank").Observe(time.Since(start).Seconds()) } log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeatures\tscene=%s\tcost=%d", context.RecommendId, sceneName, utils.CostTime(start))) return items } func (s *FeatureService) LoadFeaturesForGeneralRank(user *module.User, items []*module.Item, context *context.RecommendContext, pipeline string) { start := time.Now() sceneName := context.GetParameter("scene").(string) if features, ok := s.FeatureSceneMap[sceneName]; ok { async := s.FeatureSceneAsyncMap[sceneName] if async { var wg sync.WaitGroup for _, fea := range features { wg.Add(1) go func(fea *Feature) { defer wg.Done() fea.LoadFeatures(user, items, context) }(fea) } wg.Wait() } else { for _, feature := range features { feature.LoadFeatures(user, items, context) } } } if pipeline != "" { log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeaturesForGeneralRank\tpipeline=%s\tcost=%d", context.RecommendId, pipeline, utils.CostTime(start))) } else { log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeaturesForGeneralRank\tcost=%d", context.RecommendId, utils.CostTime(start))) } } func (s *FeatureService) LoadFeaturesForPipelineRank(user *module.User, items []*module.Item, context *context.RecommendContext, pipeline string) { start := time.Now() sceneName := context.GetParameter("scene").(string) if features, ok := s.FeatureSceneMap[sceneName]; ok { async := s.FeatureSceneAsyncMap[sceneName] if async { var wg sync.WaitGroup for _, fea := range features { wg.Add(1) go func(fea *Feature) { defer wg.Done() fea.LoadFeatures(user, items, context) }(fea) } wg.Wait() } else { for _, feature := range features { feature.LoadFeatures(user, items, context) } } } log.Info(fmt.Sprintf("requestId=%s\tmodule=LoadFeatures\tpipeline=%s\tcost=%d", context.RecommendId, pipeline, utils.CostTime(start))) } func LoadFeatureConfig(config *recconf.RecommendConfig) { for name, sceneConf := range config.FeatureConfs { if _, ok := featureService.FeatureSceneMap[name]; ok { if signOfFeatureLoadConfs(sceneConf.FeatureLoadConfs) == featureService.FeatureSceneSigns[name] { continue } } var features []*Feature for _, conf := range sceneConf.FeatureLoadConfs { f := LoadWithConfig(conf) features = append(features, f) } featureService.FeatureSceneAsyncMap[name] = false if sceneConf.AsynLoadFeature { featureService.FeatureSceneAsyncMap[name] = true } featureService.FeatureSceneMap[name] = features featureService.FeatureSceneSigns[name] = signOfFeatureLoadConfs(sceneConf.FeatureLoadConfs) } } func signOfFeatureLoadConfs(confs []recconf.FeatureLoadConfig) string { var signs string for _, conf := range confs { sign, _ := json.Marshal(conf) signs += string(sign) } return utils.Md5(signs) }