service/general_rank/base_general_rank.go (184 lines of code) (raw):

package general_rank import ( "fmt" "sync" "time" "github.com/alibaba/pairec/v2/algorithm" "github.com/alibaba/pairec/v2/algorithm/eas" "github.com/alibaba/pairec/v2/algorithm/response" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/service/debug" "github.com/alibaba/pairec/v2/service/feature" "github.com/alibaba/pairec/v2/service/rank" "github.com/alibaba/pairec/v2/utils" "github.com/alibaba/pairec/v2/utils/ast" ) type BaseGeneralRank struct { scene string featureService *feature.FeatureService rankConfig *recconf.RankConfig actions []*Action featureConsistencyJobService *FeatureConsistencyJobService } func NewBaseGeneralRankWithConfig(scene string, config recconf.GeneralRankConfig) *BaseGeneralRank { preRank := BaseGeneralRank{ featureService: feature.NewFeatureService(), rankConfig: &config.RankConf, scene: scene, featureConsistencyJobService: new(FeatureConsistencyJobService), } var features []*feature.Feature for _, conf := range config.FeatureLoadConfs { f := feature.LoadWithConfig(conf) features = append(features, f) } preRank.featureService.SetFeatureSceneAsync(scene, true) preRank.featureService.SetFeatures(scene, features) for _, conf := range config.ActionConfs { if action, err := NewAction(&conf); err == nil { preRank.actions = append(preRank.actions, action) } else { log.Error(fmt.Sprintf("create action error:%v", err)) } } if preRank.rankConfig.BatchCount <= 0 { preRank.rankConfig.BatchCount = 100 } return &preRank } // Rank GeneralRank for items , return the items for Rank // 1. first load user features // 2. construct data use AlgoDataGenerator, if processor is easyrec, create easyrecRequest // 3. use goroutines to invoke eas module // 4. iterator actions invoke Do function to apply items func (r *BaseGeneralRank) DoRank(user *module.User, items []*module.Item, context *context.RecommendContext, pipeline string) (ret []*module.Item) { debugService := debug.NewDebugService(user, context) rankItems := items // get user feature by the FeatureService r.featureService.LoadFeaturesForGeneralRank(user, rankItems, context, pipeline) if context.Debug { data, _ := json.Marshal(user) size := len(data) for i := 0; i < size; { end := i + 4096 if end >= size { end = size } else { for end > i { if data[end] == ',' { end++ break } end-- } if end == i { end = i + 4096 } } log.Info(fmt.Sprintf("requestId=%s\tmodule=general_rank\tuser=%s", context.RecommendId, string(data[i:end]))) i = end } } if len(r.rankConfig.RankAlgoList) > 0 { rankItems = r.doRankWithAlgo(user, rankItems, context) } debugService.WriteGeneralLog(user, rankItems, context) for _, action := range r.actions { rankItems = action.Do(user, rankItems, context) } ret = append(ret, rankItems...) return } func (r *BaseGeneralRank) doRankWithAlgo(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item { start := time.Now() algoDataList := make([]rank.IAlgoData, 0) i := 0 algoGenerator := rank.CreateAlgoDataGenerator(r.rankConfig.Processor, r.rankConfig.ContextFeatures) algoGenerator.SetItemFeatures(r.rankConfig.ItemFeatures) userFeatures := user.MakeUserFeatures2() //emptyFeatures := make(map[string]interface{}, 0) /** go func() { for _, item := range items { item.AddRecallNameFeature() } }() **/ for _, item := range items { features := item.GetFeatures() if r.rankConfig.Processor == eas.Eas_Processor_EASYREC { algoGenerator.AddFeatures(item, features, userFeatures) } else { algoGenerator.AddFeatures(item, features, userFeatures) } i++ if i%r.rankConfig.BatchCount == 0 { algoData := algoGenerator.GeneratorAlgoData() algoDataList = append(algoDataList, algoData) } } if algoGenerator.HasFeatures() { algoData := algoGenerator.GeneratorAlgoData() algoDataList = append(algoDataList, algoData) } if len(algoDataList) == 0 { return items } requestCh := make(chan rank.IAlgoData, len(algoDataList)) responseCh := make(chan rank.IAlgoData, len(algoDataList)) defer close(requestCh) defer close(responseCh) for _, data := range algoDataList { requestCh <- data } rankConfig := r.rankConfig gCount := len(algoDataList) for i := 0; i < gCount; i++ { go func() { algoData := <-requestCh var wg sync.WaitGroup for _, algoName := range rankConfig.RankAlgoList { wg.Add(1) go func(algo string) { defer wg.Done() algoResponses, err := algorithm.Run(algo, algoData.GetFeatures()) if err != nil { log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err)) algoData.SetError(err) } else { if result, ok := algoResponses.([]response.AlgoResponse); ok { algoData.SetAlgoResult(algo, result) } } }(algoName) } wg.Wait() responseCh <- algoData }() } exprAst, err := ast.GetExpAST(rankConfig.RankScore) if err != nil { log.Error(fmt.Sprintf("requestId=%s\trankscore=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err)) } for i := 0; i < gCount; i++ { algoData := <-responseCh if algoData.Error() == nil && algoData.GetAlgoResult() != nil { for name, algoResult := range algoData.GetAlgoResult() { itemList := algoData.GetItems() for j := 0; j < len(algoResult) && j < len(itemList); j++ { if algoResult[j].GetModuleType() { scoreMap := algoResult[j].GetScoreMap() for k, v := range scoreMap { itemList[j].AddAlgoScore(name+"_"+k, v) } } else { itemList[j].AddAlgoScore(name, algoResult[j].GetScore()) } } } } if rankConfig.RankScore != "" { itemList := algoData.GetItems() for k := range itemList { if exprAst != nil { itemList[k].Score = ast.ExprASTResult(exprAst, itemList[k]) } } } } if context.Debug && len(items) > 0 { fmt.Println("general rank", items[0]) } go r.featureConsistencyJobService.LogRankResult(user, items, context) log.Info(fmt.Sprintf("requestId=%s\tmodule=GeneralRankWithAlgo\tcount=%d\tcost=%d", context.RecommendId, len(items), utils.CostTime(start))) return items }