service/rank/rank_service.go (316 lines of code) (raw):
package rank
import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
"github.com/alibaba/pairec/v2/algorithm"
"github.com/alibaba/pairec/v2/algorithm/eas"
"github.com/alibaba/pairec/v2/algorithm/response"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/utils"
"github.com/alibaba/pairec/v2/utils/ast"
)
var rankService *RankService
func init() {
rankService = NewRankService()
}
type RankService struct {
rankInters map[string][]IRank
featureConsistencyJobService *FeatureConsistencyJobService
}
func NewRankService() *RankService {
rank := RankService{
rankInters: make(map[string][]IRank, 0),
featureConsistencyJobService: new(FeatureConsistencyJobService),
}
return &rank
}
func DefaultRankService() *RankService {
return rankService
}
func RegisterRank(sceneName string, ranks ...IRank) {
DefaultRankService().AddRanks(sceneName, ranks...)
}
func (r *RankService) AddRanks(sceneName string, ranks ...IRank) {
var rankInters []IRank
if preRanks, ok := r.rankInters[sceneName]; ok {
rankInters = preRanks
}
for _, rank := range ranks {
rankInters = append(rankInters, rank)
}
r.rankInters[sceneName] = rankInters
}
func (r *RankService) GetRanks(sceneName string, context *context.RecommendContext) (ret []IRank) {
// first find experiment info
if context.ExperimentResult != nil {
rankconf := context.ExperimentResult.GetExperimentParams().Get("coldStartRankConf", "")
if rankconf != "" {
var rankConfig recconf.ColdStartRankConfig
d, _ := json.Marshal(rankconf)
if err := json.Unmarshal(d, &rankConfig); err == nil {
rank := NewColdStartRank(&rankConfig)
ret = append(ret, rank)
// when rank is not ColdStartRank, add it
if ranks, exist := r.rankInters[sceneName]; exist {
for _, rank := range ranks {
if _, ok := rank.(*ColdStartRank); !ok {
ret = append(ret, rank)
}
}
}
return
} else {
context.LogError(fmt.Sprintf("Unmarshal rank config error\terr=%v\tconfig=%s", err, rankconf))
}
}
}
if ranks, ok := r.rankInters[sceneName]; ok {
ret = ranks
}
return
}
type boostFunc func(score float64, user *module.User, item *module.Item, context *context.RecommendContext) float64
var boostScoreFunc boostFunc
func SetBoostFunc(bf boostFunc) {
boostScoreFunc = bf
}
func (r *RankService) Rank(user *module.User, items []*module.Item, context *context.RecommendContext) {
start := time.Now()
if context.Debug {
data, _ := json.Marshal(user)
size := len(data)
for i := 0; i < size; {
end := i + 4096
if end >= size {
end = size
} else {
for end > i {
if data[end] == ',' {
end++
break
}
end--
}
if end == i {
end = i + 4096
}
}
log.Info(fmt.Sprintf("requestId=%s\tuser=%s", context.RecommendId, string(data[i:end])))
i = end
}
}
rankItems := items
algoDataList := make([]IAlgoData, 0)
scene := context.GetParameter("scene").(string)
i := 0
var customRanks []*customRank
for _, rank := range r.GetRanks(scene, context) {
customRank := newCustomRank(rank)
customRanks = append(customRanks, customRank)
}
// find rank config
var rankConfig recconf.RankConfig
var rankscore string
found := false
if context.ExperimentResult != nil {
rankscore = context.ExperimentResult.GetExperimentParams().GetString("rankscore", "")
rankconf := context.ExperimentResult.GetExperimentParams().Get("rankconf", "")
if rankconf != "" {
d, _ := json.Marshal(rankconf)
if err := json.Unmarshal(d, &rankConfig); err == nil {
found = true
}
}
}
if !found {
if rankConfigs, ok := recconf.Config.RankConf[scene]; ok {
rankConfig = rankConfigs
}
}
if rankscore != "" {
rankConfig.RankScore = rankscore
}
batchCount := 100
if rankConfig.BatchCount > 0 {
batchCount = rankConfig.BatchCount
}
algoGenerator := CreateAlgoDataGenerator(rankConfig.Processor, rankConfig.ContextFeatures)
var userFeatures map[string]interface{}
if rankConfig.Processor == eas.Eas_Processor_EASYREC {
userFeatures = user.MakeUserFeatures2()
algoGenerator.SetItemFeatures(rankConfig.ItemFeatures)
} else {
userFeatures = user.MakeUserFeatures()
}
var filter bool
for _, item := range rankItems {
filter = false
if len(customRanks) > 0 {
for _, rank := range customRanks {
if rank.rankInter.Filter(user, item, context) {
filter = true
if _, ok := rank.rankInter.(*ColdStartRank); ok {
rank.appendFeature(nil, item, context)
} else {
rank.appendFeature(userFeatures, item, context)
}
break
}
}
}
if filter {
continue
}
var features map[string]any
if rankConfig.Processor == eas.Eas_Processor_EASYREC {
if len(rankConfig.ContextFeatures) > 0 || len(rankConfig.ItemFeatures) > 0 {
features = item.GetFeatures()
}
} else {
features = item.GetFeatures()
}
algoGenerator.AddFeatures(item, features, userFeatures)
i++
if i%batchCount == 0 {
var algoData IAlgoData
if context.Debug {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(100)
} else {
algoData = algoGenerator.GeneratorAlgoData()
}
algoDataList = append(algoDataList, algoData)
}
}
if algoGenerator.HasFeatures() {
var algoData IAlgoData
if context.Debug {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(100)
} else {
algoData = algoGenerator.GeneratorAlgoData()
}
algoDataList = append(algoDataList, algoData)
}
var rankWG sync.WaitGroup
// invoke custom rank
if len(customRanks) > 0 {
for _, rank := range customRanks {
rankWG.Add(1)
go func(customRank *customRank) {
defer rankWG.Done()
customRank.rankInter.Rank(user, customRank.items, customRank.requestData, context)
}(rank)
}
}
if len(algoDataList) == 0 {
if len(customRanks) > 0 {
rankWG.Wait()
}
return
}
requestCh := make(chan IAlgoData, len(algoDataList))
responseCh := make(chan IAlgoData, len(algoDataList))
defer close(requestCh)
defer close(responseCh)
for _, data := range algoDataList {
requestCh <- data
}
gCount := len(algoDataList)
for i := 0; i < gCount; i++ {
go func() {
algoData := <-requestCh
var wg sync.WaitGroup
for _, algoName := range rankConfig.RankAlgoList {
wg.Add(1)
go func(algo string) {
defer wg.Done()
ret, err := algorithm.Run(algo, algoData.GetFeatures())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
algoData.SetError(err)
} else {
if result, ok := ret.([]response.AlgoResponse); ok {
algoData.SetAlgoResult(algo, result)
}
}
}(algoName)
}
wg.Wait()
responseCh <- algoData
}()
}
exprAst, err := ast.GetExpASTWithType(rankConfig.RankScore, rankConfig.ASTType)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=rank\trankscore=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err))
}
var scoreRewriteAst map[string]ast.ExprAST
if len(rankConfig.ScoreRewrite) > 0 {
scoreRewriteAst = make(map[string]ast.ExprAST, len(rankConfig.ScoreRewrite))
for source, sourceExpr := range rankConfig.ScoreRewrite {
ast, err := ast.GetExpASTWithType(sourceExpr, rankConfig.ASTType)
if err != nil {
log.Info(fmt.Sprintf("requestId=%s\tmodule=rank\tscorerewrite=%s\terror=%v", context.RecommendId, rankConfig.RankScore, err))
continue
}
scoreRewriteAst[source] = ast
}
}
for i := 0; i < gCount; i++ {
algoData := <-responseCh
if algoData.Error() == nil && algoData.GetAlgoResult() != nil {
for name, algoResult := range algoData.GetAlgoResult() {
itemList := algoData.GetItems()
for j := 0; j < len(algoResult) && j < len(itemList); j++ {
if algoResult[j].GetModuleType() {
arr_score := algoResult[j].GetScoreMap()
for k, v := range arr_score {
itemList[j].AddAlgoScore(name+"_"+k, v)
}
} else if resp, ok := algoResult[j].(response.AlgoMultiClassifyResponse); ok {
arr_score := resp.GetClassifyMap()
for k, scores := range arr_score {
if len(scores) == 1 {
itemList[j].AddAlgoScore(name+"_"+k, scores[0])
} else if len(scores) > 1 {
for i, score := range scores {
itemList[j].AddAlgoScore(name+"_"+k+"_"+strconv.Itoa(i), score)
}
itemList[j].AddProperty(name+"_"+k, scores)
}
}
} else {
itemList[j].AddAlgoScore(name, algoResult[j].GetScore())
}
}
}
}
if rankConfig.RankScore != "" {
itemList := algoData.GetItems()
for k := range itemList {
// score rewrite 重写
if len(rankConfig.ScoreRewrite) > 0 {
scores := make(map[string]float64, len(rankConfig.ScoreRewrite))
for source := range rankConfig.ScoreRewrite {
if exprAst, ok := scoreRewriteAst[source]; ok {
scores[source] = ast.ExprASTResultWithType(exprAst, itemList[k], rankConfig.ASTType)
} else {
scores[source] = 0
}
}
itemList[k].AddAlgoScores(scores)
}
if exprAst != nil {
itemList[k].Score = ast.ExprASTResultWithType(exprAst, itemList[k], rankConfig.ASTType)
}
if boostScoreFunc != nil {
itemList[k].Score = boostScoreFunc(itemList[k].Score, user, itemList[k], context)
}
}
}
}
if len(customRanks) > 0 {
rankWG.Wait()
}
go r.featureConsistencyJobService.LogRankResult(user, items, context)
log.Info(fmt.Sprintf("requestId=%s\tmodule=rank\tcost=%d", context.RecommendId, utils.CostTime(start)))
}