func()

in service/rank/feature_consistency_job_service.go [299:412]


func (r *FeatureConsistencyJobService) logRecallResultToDatahub(user *module.User, items []*module.Item, context *context.RecommendContext, triggerType string, job *model.FeatureConsistencyJob, userEmbedding, triggerItem, recallAlgo, recallAlgoType, beName, bizName, recallName string) {
	name := fmt.Sprintf("%s-%s-%s-%s-%s", job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint,
		job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic)
	var dh *datahub.Datahub
	dh, err := datahub.GetDatahub(name)
	if err != nil {
		r.mutex.Lock()
		dh, err = datahub.GetDatahub(name)
		if err != nil {
			dh = datahub.NewDatahub(job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint,
				job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic, nil)
			err = dh.Init()
			datahub.RegisterDatahub(name, dh)
		}
		r.mutex.Unlock()
	}

	if dh == nil {
		log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\tmsg=create datahub error\terror=%v", context.RecommendId, err))
		return
	}

	message := make(map[string]interface{})

	message["request_id"] = context.RecommendId
	message["uid"] = string(user.Id)

	features := user.MakeUserFeatures2()
	j, _ := json.Marshal(utils.ConvertFeatures(features))
	message["user_features"] = string(j)
	message["trigger"] = triggerItem
	message["module"] = triggerType

	if triggerType == DssmO2o || triggerType == MindO2o {
		readRequest := be.NewReadRequest(bizName, 10)
		readRequest.IsRawRequest = true
		readRequest.IsPost = true

		params := make(map[string]string)
		params[fmt.Sprintf("%s_qinfo", recallName)] = triggerItem
		params[fmt.Sprintf("%s_return_count", recallName)] = "10"
		params["trace"] = "ALL"
		readRequest.SetQueryParams(params)

		if context.Debug {
			uri := readRequest.BuildUri()
			log.Info(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\tbizName=%s\turl=%s", context.RecommendId, bizName, uri.RequestURI()))
		}
		if r.client == nil {
			client, err := beengine.GetBeClient(beName)
			if err != nil {
				log.Error(fmt.Sprintf("error=%v", err))
				return
			}
			r.client = client
		}
		readResponse, err := r.client.BeClient.Read(*readRequest)
		if err != nil {
			log.Error(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\terror=be error(%v)", context.RecommendId, err))
		} else {
			traceInfo := *readResponse.Result.TraceInfo
			if traceInfo != nil {
				traceInfoMap := traceInfo.(map[string]interface{})
				valueMap := traceInfoMap["0[]"].(map[string]interface{})
				key := fmt.Sprintf("ExtractTensorFromRawTensorOpV2[recall/%s/extract_%s_user_embedding]", recallName, recallName)
				userEmbeddingArr := valueMap[key].(map[string]interface{})
				arr := userEmbeddingArr["__arr"].([]interface{})
				v := arr[0].(string)
				splitArr := strings.Split(v, "[[")
				result := strings.TrimRight(splitArr[len(splitArr)-1], "]]")
				embeddingArr := strings.Split(result, " ")
				userEmbedding = strings.Join(embeddingArr, ",")
			}
			message["user_embedding"] = userEmbedding
			message["generate_features"] = ""
			dh.SendMessage([]map[string]interface{}{message})
		}
	} else {
		message["user_embedding"] = userEmbedding
		algoGenerator := CreateAlgoDataGenerator(recallAlgoType, nil)
		algoGenerator.SetItemFeatures(nil)
		item := module.NewItem("1")
		algoGenerator.AddFeatures(item, nil, features)
		algoData := algoGenerator.GeneratorAlgoDataDebugWithLevel(1)
		easyrecRequest := algoData.GetFeatures().(*easyrec.PBRequest)
		algoRet, err := algorithm.Run(recallAlgo, easyrecRequest)

		if err != nil {
			log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\terr=%v", context.RecommendId, err))
		} else {
			// eas model invoke success
			if triggerType == "dssm" {
				// eas model invoke success
				if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 {
					if embeddingResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingResponse); ok {
						if embeddingResponse.GenerateFeatures != nil {
							message["generate_features"] = embeddingResponse.GenerateFeatures.String()
						}
					}
				}
			} else if triggerType == "mind" {
				if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 {
					if embeddingMindResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingMindResponse); ok {
						if embeddingMindResponse.GenerateFeatures != nil {
							message["generate_features"] = embeddingMindResponse.GenerateFeatures.String()
						}
					}
				}

			}
		}
		dh.SendMessage([]map[string]interface{}{message})
	}
}