service/general_rank/feature_consistency_job_service.go (150 lines of code) (raw):

package general_rank import ( "fmt" "math/rand" "strconv" "strings" "time" jsoniter "github.com/json-iterator/go" "github.com/alibaba/pairec/v2/abtest" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" "github.com/aliyun/aliyun-pairec-config-go-sdk/v2/common" "github.com/aliyun/aliyun-pairec-config-go-sdk/v2/model" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary type FeatureConsistencyJobService struct { } func (r *FeatureConsistencyJobService) LogRankResult(user *module.User, items []*module.Item, context *context.RecommendContext) { if abtest.GetExperimentClient() != nil { scene := context.Param.GetParameter("scene").(string) jobs := abtest.GetExperimentClient().GetSceneParams(scene).GetFeatureConsistencyJobs() for _, job := range jobs { if r.checkFeatureConsistencyJobForRunning(job, user, items, context) { log.Info(fmt.Sprintf("requestId=%s\tevent=logRankResult\tname=%s", context.RecommendId, job.JobName)) r.logRankResultToPaiConfigServer(user, items, context, job) } } } } func (s *FeatureConsistencyJobService) checkFeatureConsistencyJobForRunning(job *model.FeatureConsistencyJob, user *module.User, items []*module.Item, context *context.RecommendContext) bool { scene := context.Param.GetParameter("scene").(string) if job.Status != common.Feature_Consistency_Job_State_RUNNING { return false } currTime := time.Now().Unix() if currTime >= job.StartTime && currTime <= job.EndTime { rankAlgoNames := s.findRankAlgoNames(scene, context) var easModelAlgoNames []string for _, algoConfig := range context.Config.AlgoConfs { urls := strings.Split(algoConfig.EasConf.Url, "/api/predict/") name := urls[1] if name == job.EasModelServiceName { easModelAlgoNames = append(easModelAlgoNames, algoConfig.Name) user.AddProperty("_algo_", algoConfig.Name) break } } found := false for _, name := range easModelAlgoNames { for _, rankAlgoName := range rankAlgoNames { if name == rankAlgoName { found = true break } } if found { break } } if !found { return false } // sample rate check if job.SampleRate >= 100 { return true } if rand.Intn(100) < job.SampleRate { return true } } return false } func (s *FeatureConsistencyJobService) findRankAlgoNames(scene string, context *context.RecommendContext) []string { // find rank config var rankConfig recconf.GeneralRankConfig found := false if context.ExperimentResult != nil { rankconf := context.ExperimentResult.GetExperimentParams().Get("generalRankConf", "") if rankconf != "" { d, _ := json.Marshal(rankconf) if err := json.Unmarshal(d, &rankConfig); err == nil { found = true } } } if !found { if rankConfigs, ok := recconf.Config.GeneralRankConfs[scene]; ok { rankConfig = rankConfigs } } return rankConfig.RankConf.RankAlgoList } func (r *FeatureConsistencyJobService) logRankResultToPaiConfigServer(user *module.User, items []*module.Item, context *context.RecommendContext, job *model.FeatureConsistencyJob) { userProperties := user.MakeUserFeatures2() userProperties["_module_"] = "general_rank" userFeatures := utils.ConvertFeatures(userProperties) userData, _ := json.Marshal(userFeatures) userDataStr := utils.Byte2string(userData) scene := context.Param.GetParameter("scene").(string) i := 0 backflowData := model.FeatureConsistencyBackflowData{} backflowData.FeatureConsistencyCheckJobConfigId = strconv.Itoa(job.JobId) backflowData.LogRequestId = context.RecommendId backflowData.SceneName = scene backflowData.LogRequestTime = time.Now().Unix() backflowData.LogUserId = string(user.Id) backflowData.UserFeatures = userDataStr var itemIds []string var itemFeatures []string var itemScores []string for _, item := range items { i++ itemId := string(item.Id) itemIds = append(itemIds, itemId) j, _ := json.Marshal(utils.ConvertFeatures(item.GetCloneFeatures())) itemFeatures = append(itemFeatures, string(j)) j, _ = json.Marshal(item.CloneAlgoScores()) itemScores = append(itemScores, string(j)) if i%20 == 0 { j, _ := json.Marshal(itemIds) backflowData.LogItemId = string(j) j, _ = json.Marshal(itemFeatures) backflowData.ItemFeatures = string(j) j, _ = json.Marshal(itemScores) backflowData.Scores = string(j) resp, err := abtest.GetExperimentClient().BackflowFeatureConsistencyCheckJobData(&backflowData) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRankResultToPaiConfigServer\tresponse=%v\terror=%v", context.RecommendId, resp, err)) continue } itemIds = itemIds[:0] itemFeatures = itemFeatures[:0] itemScores = itemScores[:0] i = 0 } } if i > 0 { j, _ := json.Marshal(itemIds) backflowData.LogItemId = string(j) j, _ = json.Marshal(itemFeatures) backflowData.ItemFeatures = string(j) j, _ = json.Marshal(itemScores) backflowData.Scores = string(j) resp, err := abtest.GetExperimentClient().BackflowFeatureConsistencyCheckJobData(&backflowData) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRankResultToPaiConfigServer\tresponse=%v\terror=%v", context.RecommendId, resp, err)) } } }