func()

in service/rank/rank_service.go [102:366]


func (r *RankService) Rank(user *module.User, items []*module.Item, context *context.RecommendContext) {
	start := time.Now()
	if context.Debug {
		data, _ := json.Marshal(user)
		size := len(data)
		for i := 0; i < size; {
			end := i + 4096
			if end >= size {
				end = size
			} else {
				for end > i {
					if data[end] == ',' {
						end++
						break
					}
					end--
				}
				if end == i {
					end = i + 4096
				}
			}
			log.Info(fmt.Sprintf("requestId=%s\tuser=%s", context.RecommendId, string(data[i:end])))
			i = end
		}
	}

	rankItems := items
	algoDataList := make([]IAlgoData, 0)
	scene := context.GetParameter("scene").(string)

	i := 0
	var customRanks []*customRank
	for _, rank := range r.GetRanks(scene, context) {
		customRank := newCustomRank(rank)
		customRanks = append(customRanks, customRank)
	}

	// find rank config
	var rankConfig recconf.RankConfig
	var rankscore string
	found := false
	if context.ExperimentResult != nil {
		rankscore = context.ExperimentResult.GetExperimentParams().GetString("rankscore", "")
		rankconf := context.ExperimentResult.GetExperimentParams().Get("rankconf", "")
		if rankconf != "" {
			d, _ := json.Marshal(rankconf)
			if err := json.Unmarshal(d, &rankConfig); err == nil {
				found = true
			}
		}
	}
	if !found {
		if rankConfigs, ok := recconf.Config.RankConf[scene]; ok {
			rankConfig = rankConfigs
		}
	}

	if rankscore != "" {
		rankConfig.RankScore = rankscore
	}

	batchCount := 100
	if rankConfig.BatchCount > 0 {
		batchCount = rankConfig.BatchCount
	}
	algoGenerator := CreateAlgoDataGenerator(rankConfig.Processor, rankConfig.ContextFeatures)

	var userFeatures map[string]interface{}

	if rankConfig.Processor == eas.Eas_Processor_EASYREC {
		userFeatures = user.MakeUserFeatures2()
		algoGenerator.SetItemFeatures(rankConfig.ItemFeatures)
	} else {
		userFeatures = user.MakeUserFeatures()
	}

	var filter bool
	for _, item := range rankItems {
		filter = false
		if len(customRanks) > 0 {
			for _, rank := range customRanks {
				if rank.rankInter.Filter(user, item, context) {
					filter = true
					if _, ok := rank.rankInter.(*ColdStartRank); ok {
						rank.appendFeature(nil, item, context)
					} else {
						rank.appendFeature(userFeatures, item, context)
					}
					break
				}
			}
		}

		if filter {
			continue
		}

		var features map[string]any
		if rankConfig.Processor == eas.Eas_Processor_EASYREC {
			if len(rankConfig.ContextFeatures) > 0 || len(rankConfig.ItemFeatures) > 0 {
				features = item.GetFeatures()
			}
		} else {
			features = item.GetFeatures()
		}
		algoGenerator.AddFeatures(item, features, userFeatures)
		i++
		if i%batchCount == 0 {
			var algoData IAlgoData
			if context.Debug {
				algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(100)
			} else {
				algoData = algoGenerator.GeneratorAlgoData()
			}
			algoDataList = append(algoDataList, algoData)
		}
	}

	if algoGenerator.HasFeatures() {
		var algoData IAlgoData
		if context.Debug {
			algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(100)
		} else {
			algoData = algoGenerator.GeneratorAlgoData()
		}
		algoDataList = append(algoDataList, algoData)
	}

	var rankWG sync.WaitGroup
	// invoke custom rank
	if len(customRanks) > 0 {
		for _, rank := range customRanks {
			rankWG.Add(1)
			go func(customRank *customRank) {
				defer rankWG.Done()
				customRank.rankInter.Rank(user, customRank.items, customRank.requestData, context)
			}(rank)
		}
	}

	if len(algoDataList) == 0 {
		if len(customRanks) > 0 {
			rankWG.Wait()
		}
		return
	}
	requestCh := make(chan IAlgoData, len(algoDataList))
	responseCh := make(chan IAlgoData, len(algoDataList))
	defer close(requestCh)
	defer close(responseCh)

	for _, data := range algoDataList {
		requestCh <- data
	}

	gCount := len(algoDataList)
	for i := 0; i < gCount; i++ {
		go func() {
			algoData := <-requestCh

			var wg sync.WaitGroup
			for _, algoName := range rankConfig.RankAlgoList {
				wg.Add(1)
				go func(algo string) {
					defer wg.Done()
					ret, err := algorithm.Run(algo, algoData.GetFeatures())
					if err != nil {
						log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
						algoData.SetError(err)
					} else {
						if result, ok := ret.([]response.AlgoResponse); ok {
							algoData.SetAlgoResult(algo, result)
						}
					}

				}(algoName)

			}
			wg.Wait()
			responseCh <- algoData
		}()
	}

	exprAst, err := ast.GetExpASTWithType(rankConfig.RankScore, rankConfig.ASTType)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=rank\trankscore=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err))
	}

	var scoreRewriteAst map[string]ast.ExprAST
	if len(rankConfig.ScoreRewrite) > 0 {
		scoreRewriteAst = make(map[string]ast.ExprAST, len(rankConfig.ScoreRewrite))
		for source, sourceExpr := range rankConfig.ScoreRewrite {
			ast, err := ast.GetExpASTWithType(sourceExpr, rankConfig.ASTType)
			if err != nil {
				log.Info(fmt.Sprintf("requestId=%s\tmodule=rank\tscorerewrite=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err))
				continue
			}
			scoreRewriteAst[source] = ast
		}
	}

	for i := 0; i < gCount; i++ {
		algoData := <-responseCh
		if algoData.Error() == nil && algoData.GetAlgoResult() != nil {
			for name, algoResult := range algoData.GetAlgoResult() {
				itemList := algoData.GetItems()
				for j := 0; j < len(algoResult) && j < len(itemList); j++ {
					if algoResult[j].GetModuleType() {
						arr_score := algoResult[j].GetScoreMap()
						for k, v := range arr_score {
							itemList[j].AddAlgoScore(name+"_"+k, v)
						}
					} else if resp, ok := algoResult[j].(response.AlgoMultiClassifyResponse); ok {
						arr_score := resp.GetClassifyMap()
						for k, scores := range arr_score {
							if len(scores) == 1 {
								itemList[j].AddAlgoScore(name+"_"+k, scores[0])
							} else if len(scores) > 1 {
								for i, score := range scores {
									itemList[j].AddAlgoScore(name+"_"+k+"_"+strconv.Itoa(i), score)
								}
								itemList[j].AddProperty(name+"_"+k, scores)
							}
						}
					} else {
						itemList[j].AddAlgoScore(name, algoResult[j].GetScore())
					}
				}
			}
		}

		if rankConfig.RankScore != "" {
			itemList := algoData.GetItems()
			for k := range itemList {
				// score rewrite 重写
				if len(rankConfig.ScoreRewrite) > 0 {
					scores := make(map[string]float64, len(rankConfig.ScoreRewrite))
					for source := range rankConfig.ScoreRewrite {
						if exprAst, ok := scoreRewriteAst[source]; ok {
							scores[source] = ast.ExprASTResultWithType(exprAst, itemList[k], rankConfig.ASTType)
						} else {
							scores[source] = 0
						}
					}
					itemList[k].AddAlgoScores(scores)
				}

				if exprAst != nil {
					itemList[k].Score = ast.ExprASTResultWithType(exprAst, itemList[k], rankConfig.ASTType)
				}

				if boostScoreFunc != nil {
					itemList[k].Score = boostScoreFunc(itemList[k].Score, user, itemList[k], context)
				}
			}
		}
	}

	if len(customRanks) > 0 {
		rankWG.Wait()
	}

	go r.featureConsistencyJobService.LogRankResult(user, items, context)
	log.Info(fmt.Sprintf("requestId=%s\tmodule=rank\tcost=%d", context.RecommendId, utils.CostTime(start)))
}