service/rank/feature_consistency_job_service.go (344 lines of code) (raw):

package rank import ( "encoding/json" "fmt" "math/rand" "os" "strconv" "strings" "sync" "time" "github.com/alibaba/pairec/v2/datasource/beengine" be "github.com/aliyun/aliyun-be-go-sdk" "github.com/alibaba/pairec/v2/algorithm" "github.com/alibaba/pairec/v2/algorithm/eas" "github.com/alibaba/pairec/v2/algorithm/eas/easyrec" "github.com/alibaba/pairec/v2/algorithm/response" jsoniter "github.com/json-iterator/go" "github.com/alibaba/pairec/v2/abtest" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/datasource/datahub" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/module" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" "github.com/aliyun/aliyun-pairec-config-go-sdk/v2/common" "github.com/aliyun/aliyun-pairec-config-go-sdk/v2/model" ) const ( DssmO2o = "dssm_o2o" MindO2o = "mind_o2o" ) var serviceName string func init() { serviceName = os.Getenv("SERVICE_NAME") // serviceName: name@region => name ss := strings.Split(serviceName, "@") if len(ss) > 1 { serviceName = strings.Join(ss[:len(ss)-1], "") } } type FeatureConsistencyJobService struct { mutex sync.Mutex client *beengine.BeClient } func (r *FeatureConsistencyJobService) LogRecallResult(user *module.User, items []*module.Item, context *context.RecommendContext, triggerType, userEmbedding, triggerItem, recallAlgo, recallAlgoType, beName, bizName, recallName string) { if abtest.GetExperimentClient() != nil { scene := context.Param.GetParameter("scene").(string) jobs := abtest.GetExperimentClient().GetSceneParams(scene).GetFeatureConsistencyJobs() for _, job := range jobs { if r.checkRecallFeatureConsistencyJobForRunning(job, user, items, context, triggerType, recallAlgo) { log.Info(fmt.Sprintf("requestId=%s\tevent=logRecallResult\tname=%s", context.RecommendId, job.JobName)) if job.FeatureBackflowQueueType == "datahub" { r.logRecallResultToDatahub(user, items, context, triggerType, job, userEmbedding, triggerItem, recallAlgo, recallAlgoType, beName, bizName, recallName) } } } } } func (r *FeatureConsistencyJobService) checkRecallFeatureConsistencyJobForRunning(job *model.FeatureConsistencyJob, user *module.User, items []*module.Item, context *context.RecommendContext, triggerType, recallAlgo string) bool { if job.Status != common.Feature_Consistency_Job_State_RUNNING { return false } if triggerType == DssmO2o || triggerType == MindO2o { return true } currTime := time.Now().Unix() if currTime >= job.StartTime && currTime <= job.EndTime { var easModelAlgoNames []string for _, algoConfig := range context.Config.AlgoConfs { if algoConfig.EasConf.Url == job.EasModelUrl { easModelAlgoNames = append(easModelAlgoNames, algoConfig.Name) user.AddProperty("_algo_", algoConfig.Name) break } } found := false for _, name := range easModelAlgoNames { if name == recallAlgo { found = true break } } if !found { return false } // sample rate check if job.SampleRate >= 100 { return true } if rand.Intn(100) < job.SampleRate { return true } } return false } func (r *FeatureConsistencyJobService) LogRankResult(user *module.User, items []*module.Item, context *context.RecommendContext) { if abtest.GetExperimentClient() != nil { scene := context.Param.GetParameter("scene").(string) jobs := abtest.GetExperimentClient().GetSceneParams(scene).GetFeatureConsistencyJobs() for _, job := range jobs { if job.ModelType == "rank_sample" { continue } if r.checkFeatureConsistencyJobForRunning(job, user, items, context) { log.Info(fmt.Sprintf("requestId=%s\tevent=logRankResult\tname=%s", context.RecommendId, job.JobName)) r.logRankResultToPaiConfigServer(user, items, context, job) } } } } func (r *FeatureConsistencyJobService) checkFeatureConsistencyJobForRunning(job *model.FeatureConsistencyJob, user *module.User, items []*module.Item, context *context.RecommendContext) bool { scene := context.Param.GetParameter("scene").(string) if job.Status != common.Feature_Consistency_Job_State_RUNNING { return false } currTime := time.Now().Unix() if currTime >= job.StartTime && currTime <= job.EndTime { rankAlgoNames := r.findRankAlgoNames(scene, context) var easModelAlgoNames []string for _, algoConfig := range context.Config.AlgoConfs { urls := strings.Split(algoConfig.EasConf.Url, "/api/predict/") name := urls[1] if name == job.EasModelServiceName { easModelAlgoNames = append(easModelAlgoNames, algoConfig.Name) user.AddProperty("_algo_", algoConfig.Name) break } } found := false for _, name := range easModelAlgoNames { for _, rankAlgoName := range rankAlgoNames { if name == rankAlgoName { found = true break } } } if !found { return false } // sample rate check if job.SampleRate >= 100 { return true } if rand.Intn(100) < job.SampleRate { return true } } return false } func (r *FeatureConsistencyJobService) LogSampleResult(user *module.User, items []*module.Item, context *context.RecommendContext) { if abtest.GetExperimentClient() != nil { scene := context.Param.GetParameter("scene").(string) jobs := abtest.GetExperimentClient().GetSceneParams(scene).GetFeatureConsistencyJobs() for _, job := range jobs { if job.ModelType != "rank_sample" { continue } if r.checkFeatureConsistencyJobForRunning(job, user, items, context) { log.Info(fmt.Sprintf("requestId=%s\tevent=LogSampleResult\tname=%s", context.RecommendId, job.JobName)) r.logRankResultToPaiConfigServer(user, items, context, job) } } } } func (r *FeatureConsistencyJobService) findRankAlgoNames(scene string, context *context.RecommendContext) []string { // find rank config var rankConfig recconf.RankConfig found := false if context.ExperimentResult != nil { rankconf := context.ExperimentResult.GetExperimentParams().Get("rankconf", "") if rankconf != "" { d, _ := json.Marshal(rankconf) if err := json.Unmarshal(d, &rankConfig); err == nil { found = true } } } if !found { if rankConfigs, ok := recconf.Config.RankConf[scene]; ok { rankConfig = rankConfigs } } return rankConfig.RankAlgoList } func (r *FeatureConsistencyJobService) logRankResultToPaiConfigServer(user *module.User, items []*module.Item, context *context.RecommendContext, job *model.FeatureConsistencyJob) { var json = jsoniter.ConfigCompatibleWithStandardLibrary userFeatures := utils.ConvertFeatures(user.MakeUserFeatures2()) userData, _ := json.Marshal(userFeatures) userDataStr := utils.Byte2string(userData) scene := context.Param.GetParameter("scene").(string) backflowData := model.FeatureConsistencyBackflowData{} backflowData.FeatureConsistencyCheckJobConfigId = strconv.Itoa(job.JobId) backflowData.LogRequestId = context.RecommendId backflowData.SceneName = scene backflowData.LogRequestTime = time.Now().UnixMilli() backflowData.LogUserId = string(user.Id) backflowData.UserFeatures = userDataStr if job.ModelType == "rank_sample" { backflowData.ServiceName = serviceName } i := 0 var itemIds []string var itemFeatures []string var itemScores []string for _, item := range items { i++ itemId := string(item.Id) itemIds = append(itemIds, itemId) j, _ := json.Marshal(utils.ConvertFeatures(item.GetCloneFeatures())) itemFeatures = append(itemFeatures, string(j)) j, _ = json.Marshal(item.GetAlgoScores()) itemScores = append(itemScores, string(j)) if i%20 == 0 { j, _ := json.Marshal(itemIds) backflowData.LogItemId = string(j) j, _ = json.Marshal(itemFeatures) backflowData.ItemFeatures = string(j) j, _ = json.Marshal(itemScores) backflowData.Scores = string(j) resp, err := abtest.GetExperimentClient().BackflowFeatureConsistencyCheckJobData(&backflowData) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRankResultToPaiConfigServer\tresponse=%v\terror=%v", context.RecommendId, resp, err)) continue } itemIds = itemIds[:0] itemFeatures = itemFeatures[:0] itemScores = itemScores[:0] i = 0 } } if i > 0 { j, _ := json.Marshal(itemIds) backflowData.LogItemId = string(j) j, _ = json.Marshal(itemFeatures) backflowData.ItemFeatures = string(j) j, _ = json.Marshal(itemScores) backflowData.Scores = string(j) resp, err := abtest.GetExperimentClient().BackflowFeatureConsistencyCheckJobData(&backflowData) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRankResultToPaiConfigServer\tresponse=%v\terror=%v", context.RecommendId, resp, err)) return } } } func (r *FeatureConsistencyJobService) logRecallResultToDatahub(user *module.User, items []*module.Item, context *context.RecommendContext, triggerType string, job *model.FeatureConsistencyJob, userEmbedding, triggerItem, recallAlgo, recallAlgoType, beName, bizName, recallName string) { name := fmt.Sprintf("%s-%s-%s-%s-%s", job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint, job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic) var dh *datahub.Datahub dh, err := datahub.GetDatahub(name) if err != nil { r.mutex.Lock() dh, err = datahub.GetDatahub(name) if err != nil { dh = datahub.NewDatahub(job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint, job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic, nil) err = dh.Init() datahub.RegisterDatahub(name, dh) } r.mutex.Unlock() } if dh == nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\tmsg=create datahub error\terror=%v", context.RecommendId, err)) return } message := make(map[string]interface{}) message["request_id"] = context.RecommendId message["uid"] = string(user.Id) features := user.MakeUserFeatures2() j, _ := json.Marshal(utils.ConvertFeatures(features)) message["user_features"] = string(j) message["trigger"] = triggerItem message["module"] = triggerType if triggerType == DssmO2o || triggerType == MindO2o { readRequest := be.NewReadRequest(bizName, 10) readRequest.IsRawRequest = true readRequest.IsPost = true params := make(map[string]string) params[fmt.Sprintf("%s_qinfo", recallName)] = triggerItem params[fmt.Sprintf("%s_return_count", recallName)] = "10" params["trace"] = "ALL" readRequest.SetQueryParams(params) if context.Debug { uri := readRequest.BuildUri() log.Info(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\tbizName=%s\turl=%s", context.RecommendId, bizName, uri.RequestURI())) } if r.client == nil { client, err := beengine.GetBeClient(beName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return } r.client = client } readResponse, err := r.client.BeClient.Read(*readRequest) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\terror=be error(%v)", context.RecommendId, err)) } else { traceInfo := *readResponse.Result.TraceInfo if traceInfo != nil { traceInfoMap := traceInfo.(map[string]interface{}) valueMap := traceInfoMap["0[]"].(map[string]interface{}) key := fmt.Sprintf("ExtractTensorFromRawTensorOpV2[recall/%s/extract_%s_user_embedding]", recallName, recallName) userEmbeddingArr := valueMap[key].(map[string]interface{}) arr := userEmbeddingArr["__arr"].([]interface{}) v := arr[0].(string) splitArr := strings.Split(v, "[[") result := strings.TrimRight(splitArr[len(splitArr)-1], "]]") embeddingArr := strings.Split(result, " ") userEmbedding = strings.Join(embeddingArr, ",") } message["user_embedding"] = userEmbedding message["generate_features"] = "" dh.SendMessage([]map[string]interface{}{message}) } } else { message["user_embedding"] = userEmbedding algoGenerator := CreateAlgoDataGenerator(recallAlgoType, nil) algoGenerator.SetItemFeatures(nil) item := module.NewItem("1") algoGenerator.AddFeatures(item, nil, features) algoData := algoGenerator.GeneratorAlgoDataDebugWithLevel(1) easyrecRequest := algoData.GetFeatures().(*easyrec.PBRequest) algoRet, err := algorithm.Run(recallAlgo, easyrecRequest) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\terr=%v", context.RecommendId, err)) } else { // eas model invoke success if triggerType == "dssm" { // eas model invoke success if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 { if embeddingResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingResponse); ok { if embeddingResponse.GenerateFeatures != nil { message["generate_features"] = embeddingResponse.GenerateFeatures.String() } } } } else if triggerType == "mind" { if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 { if embeddingMindResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingMindResponse); ok { if embeddingMindResponse.GenerateFeatures != nil { message["generate_features"] = embeddingMindResponse.GenerateFeatures.String() } } } } } dh.SendMessage([]map[string]interface{}{message}) } }