service/user_recommend.go (131 lines of code) (raw):

package service import ( "fmt" "sync" "time" "github.com/alibaba/pairec/v2/log/feature_log" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/service/debug" "github.com/alibaba/pairec/v2/service/feature" "github.com/alibaba/pairec/v2/service/general_rank" "github.com/alibaba/pairec/v2/service/hook" "github.com/alibaba/pairec/v2/service/metrics" "github.com/alibaba/pairec/v2/service/pipeline" "github.com/alibaba/pairec/v2/service/rank" ) type UserRecommendService struct { RecommendService recallService *RecallService generalRankService *general_rank.GeneralRankService rankService *rank.RankService userFeatureService *feature.UserFeatureService featureService *feature.FeatureService featureConsistencyJobService *rank.FeatureConsistencyJobService } func NewUserRecommendService() *UserRecommendService { service := UserRecommendService{ recallService: &RecallService{}, rankService: rank.DefaultRankService(), userFeatureService: feature.DefaultUserFeatureService(), featureService: feature.DefaultFeatureService(), featureConsistencyJobService: new(rank.FeatureConsistencyJobService), generalRankService: general_rank.DefaultGeneralRankService(), } return &service } func (r *UserRecommendService) Recommend(context *context.RecommendContext) []*module.Item { start := time.Now() var scene, expId string if context.ExperimentResult != nil { scene = context.ExperimentResult.SceneName //expId = context.ExperimentResult.ExpId } else { scene, _ = context.Param.GetParameter("scene").(string) } userId := r.GetUID(context) user := module.NewUserWithContext(userId, context) //loadFeatureStart := time.Now() // load user features r.userFeatureService.LoadUserFeatures(user, context) //if metrics.Enabled() { //metrics.LoadFeatureDurSecs.WithLabelValues(scene, expId, "before_recall").Observe(time.Since(loadFeatureStart).Seconds()) //} debugService := debug.NewDebugService(user, context) var pipelineItems []*module.Item var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() pipelineItems = pipeline.Recommend(user, context, debugService) }() recallStart := time.Now() items := r.recallService.GetItems(user, context) if metrics.Enabled() { metrics.RecallDurSecs.WithLabelValues(scene, expId).Observe(time.Since(recallStart).Seconds()) recallCountMap := map[string]int{} for _, item := range items { recallCountMap[item.RetrieveId]++ } for src, count := range recallCountMap { metrics.RecallItemsPercentage.WithLabelValues(src).Set(float64(count) / float64(len(items))) } } debugService.WriteRecallLog(user, items, context) filterStart := time.Now() // filter items = r.Filter(user, items, context) if metrics.Enabled() { metrics.FilterDurSecs.WithLabelValues(scene, expId).Observe(time.Since(filterStart).Seconds()) } debugService.WriteFilterLog(user, items, context) generalRankStart := time.Now() // general rank items = r.generalRankService.Rank(user, items, context) if metrics.Enabled() { metrics.GeneralRankDurSecs.WithLabelValues(scene, expId).Observe(time.Since(generalRankStart).Seconds()) } //debugService.WriteGeneralLog(user, items, context) //loadFeatureStart = time.Now() // load user or item features // can load data from datasource(holo, ots, redis) // after load data, use feature engine to create or modify features items = r.featureService.LoadFeatures(user, items, context) //if metrics.Enabled() { //metrics.LoadFeatureDurSecs.WithLabelValues(scene, expId, "before_rank").Observe(time.Since(loadFeatureStart).Seconds()) //} rankStart := time.Now() r.rankService.Rank(user, items, context) if metrics.Enabled() { metrics.RankDurSecs.WithLabelValues(scene, expId).Observe(time.Since(rankStart).Seconds()) } wg.Wait() items = r.mergePipelineItems(items, pipelineItems) debugService.WriteRankLog(user, items, context) sortStart := time.Now() // sort items items = r.Sort(user, items, context) if metrics.Enabled() { metrics.SortDurSecs.WithLabelValues(scene, expId).Observe(time.Since(sortStart).Seconds()) } debugService.WriteSortLog(user, items, context) size := context.Size if size > len(items) { size = len(items) log.Warning(fmt.Sprintf("requestId=%s\tmodule=recommend\tevent=filter\tuid=%s\tmsg=length of items less than size\tcount=%d", context.RecommendId, userId, len(items))) if metrics.Enabled() { metrics.SizeNotEnoughTotal.WithLabelValues(scene, expId).Inc() } } items = items[:size] go feature_log.FeatureLog(user, items, context) go r.featureConsistencyJobService.LogSampleResult(user, items, context) debugService.WriteRecommendLog(user, items, context) // asynchronous clean hook func for _, hf := range hook.RecommendCleanHooks { go hf(context, user, items) } if metrics.Enabled() { metrics.RecTotal.WithLabelValues(scene, expId).Inc() metrics.RecDurSecs.WithLabelValues(scene, expId).Observe(time.Since(start).Seconds()) } return items } func (r *UserRecommendService) mergePipelineItems(items []*module.Item, pipelineItems []*module.Item) []*module.Item { itemMap := make(map[module.ItemId]*module.Item, len(items)) for _, item := range items { itemMap[item.Id] = item } // need to merge item properties of different pipelines for _, item := range pipelineItems { if exist, ok := itemMap[item.Id]; ok { exist.AddProperties(item.Properties) } else { itemMap[item.Id] = item items = append(items, item) } } return items }