func()

in service/general_rank/base_general_rank.go [111:228]


func (r *BaseGeneralRank) doRankWithAlgo(user *module.User, items []*module.Item, context *context.RecommendContext) []*module.Item {
	start := time.Now()
	algoDataList := make([]rank.IAlgoData, 0)
	i := 0
	algoGenerator := rank.CreateAlgoDataGenerator(r.rankConfig.Processor, r.rankConfig.ContextFeatures)
	algoGenerator.SetItemFeatures(r.rankConfig.ItemFeatures)

	userFeatures := user.MakeUserFeatures2()

	//emptyFeatures := make(map[string]interface{}, 0)
	/**
	go func() {
		for _, item := range items {
			item.AddRecallNameFeature()
		}
	}()
	**/
	for _, item := range items {
		features := item.GetFeatures()
		if r.rankConfig.Processor == eas.Eas_Processor_EASYREC {
			algoGenerator.AddFeatures(item, features, userFeatures)
		} else {
			algoGenerator.AddFeatures(item, features, userFeatures)
		}

		i++
		if i%r.rankConfig.BatchCount == 0 {
			algoData := algoGenerator.GeneratorAlgoData()
			algoDataList = append(algoDataList, algoData)
		}
	}

	if algoGenerator.HasFeatures() {
		algoData := algoGenerator.GeneratorAlgoData()
		algoDataList = append(algoDataList, algoData)
	}

	if len(algoDataList) == 0 {
		return items
	}

	requestCh := make(chan rank.IAlgoData, len(algoDataList))
	responseCh := make(chan rank.IAlgoData, len(algoDataList))
	defer close(requestCh)
	defer close(responseCh)

	for _, data := range algoDataList {
		requestCh <- data
	}

	rankConfig := r.rankConfig

	gCount := len(algoDataList)
	for i := 0; i < gCount; i++ {
		go func() {
			algoData := <-requestCh

			var wg sync.WaitGroup
			for _, algoName := range rankConfig.RankAlgoList {
				wg.Add(1)
				go func(algo string) {
					defer wg.Done()
					algoResponses, err := algorithm.Run(algo, algoData.GetFeatures())
					if err != nil {
						log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
						algoData.SetError(err)
					} else {
						if result, ok := algoResponses.([]response.AlgoResponse); ok {
							algoData.SetAlgoResult(algo, result)
						}
					}
				}(algoName)
			}
			wg.Wait()
			responseCh <- algoData
		}()
	}

	exprAst, err := ast.GetExpAST(rankConfig.RankScore)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\trankscore=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err))
	}
	for i := 0; i < gCount; i++ {
		algoData := <-responseCh
		if algoData.Error() == nil && algoData.GetAlgoResult() != nil {
			for name, algoResult := range algoData.GetAlgoResult() {
				itemList := algoData.GetItems()
				for j := 0; j < len(algoResult) && j < len(itemList); j++ {
					if algoResult[j].GetModuleType() {
						scoreMap := algoResult[j].GetScoreMap()
						for k, v := range scoreMap {
							itemList[j].AddAlgoScore(name+"_"+k, v)
						}
					} else {
						itemList[j].AddAlgoScore(name, algoResult[j].GetScore())
					}
				}
			}
		}

		if rankConfig.RankScore != "" {
			itemList := algoData.GetItems()
			for k := range itemList {
				if exprAst != nil {
					itemList[k].Score = ast.ExprASTResult(exprAst, itemList[k])
				}
			}
		}
	}

	if context.Debug && len(items) > 0 {
		fmt.Println("general rank", items[0])
	}

	go r.featureConsistencyJobService.LogRankResult(user, items, context)
	log.Info(fmt.Sprintf("requestId=%s\tmodule=GeneralRankWithAlgo\tcount=%d\tcost=%d", context.RecommendId, len(items), utils.CostTime(start)))
	return items
}