in service/rank/feature_consistency_job_service.go [299:412]
func (r *FeatureConsistencyJobService) logRecallResultToDatahub(user *module.User, items []*module.Item, context *context.RecommendContext, triggerType string, job *model.FeatureConsistencyJob, userEmbedding, triggerItem, recallAlgo, recallAlgoType, beName, bizName, recallName string) {
name := fmt.Sprintf("%s-%s-%s-%s-%s", job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint,
job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic)
var dh *datahub.Datahub
dh, err := datahub.GetDatahub(name)
if err != nil {
r.mutex.Lock()
dh, err = datahub.GetDatahub(name)
if err != nil {
dh = datahub.NewDatahub(job.FeatureBackflowQueueDatahubAccessId, job.FeatureBackflowQueueDatahubAccessKey, job.FeatureBackflowQueueDatahubEndpoint,
job.FeatureBackflowQueueDatahubProject, job.FeatureBackflowQueueDatahubTopic, nil)
err = dh.Init()
datahub.RegisterDatahub(name, dh)
}
r.mutex.Unlock()
}
if dh == nil {
log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\tmsg=create datahub error\terror=%v", context.RecommendId, err))
return
}
message := make(map[string]interface{})
message["request_id"] = context.RecommendId
message["uid"] = string(user.Id)
features := user.MakeUserFeatures2()
j, _ := json.Marshal(utils.ConvertFeatures(features))
message["user_features"] = string(j)
message["trigger"] = triggerItem
message["module"] = triggerType
if triggerType == DssmO2o || triggerType == MindO2o {
readRequest := be.NewReadRequest(bizName, 10)
readRequest.IsRawRequest = true
readRequest.IsPost = true
params := make(map[string]string)
params[fmt.Sprintf("%s_qinfo", recallName)] = triggerItem
params[fmt.Sprintf("%s_return_count", recallName)] = "10"
params["trace"] = "ALL"
readRequest.SetQueryParams(params)
if context.Debug {
uri := readRequest.BuildUri()
log.Info(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\tbizName=%s\turl=%s", context.RecommendId, bizName, uri.RequestURI()))
}
if r.client == nil {
client, err := beengine.GetBeClient(beName)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return
}
r.client = client
}
readResponse, err := r.client.BeClient.Read(*readRequest)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tevent=FeatureConsistencyJobService\terror=be error(%v)", context.RecommendId, err))
} else {
traceInfo := *readResponse.Result.TraceInfo
if traceInfo != nil {
traceInfoMap := traceInfo.(map[string]interface{})
valueMap := traceInfoMap["0[]"].(map[string]interface{})
key := fmt.Sprintf("ExtractTensorFromRawTensorOpV2[recall/%s/extract_%s_user_embedding]", recallName, recallName)
userEmbeddingArr := valueMap[key].(map[string]interface{})
arr := userEmbeddingArr["__arr"].([]interface{})
v := arr[0].(string)
splitArr := strings.Split(v, "[[")
result := strings.TrimRight(splitArr[len(splitArr)-1], "]]")
embeddingArr := strings.Split(result, " ")
userEmbedding = strings.Join(embeddingArr, ",")
}
message["user_embedding"] = userEmbedding
message["generate_features"] = ""
dh.SendMessage([]map[string]interface{}{message})
}
} else {
message["user_embedding"] = userEmbedding
algoGenerator := CreateAlgoDataGenerator(recallAlgoType, nil)
algoGenerator.SetItemFeatures(nil)
item := module.NewItem("1")
algoGenerator.AddFeatures(item, nil, features)
algoData := algoGenerator.GeneratorAlgoDataDebugWithLevel(1)
easyrecRequest := algoData.GetFeatures().(*easyrec.PBRequest)
algoRet, err := algorithm.Run(recallAlgo, easyrecRequest)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tevent=logRecallResultToDatahub\terr=%v", context.RecommendId, err))
} else {
// eas model invoke success
if triggerType == "dssm" {
// eas model invoke success
if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 {
if embeddingResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingResponse); ok {
if embeddingResponse.GenerateFeatures != nil {
message["generate_features"] = embeddingResponse.GenerateFeatures.String()
}
}
}
} else if triggerType == "mind" {
if result, ok := algoRet.([]response.AlgoResponse); ok && len(result) > 0 {
if embeddingMindResponse, ok := result[0].(*eas.EasyrecUserRealtimeEmbeddingMindResponse); ok {
if embeddingMindResponse.GenerateFeatures != nil {
message["generate_features"] = embeddingMindResponse.GenerateFeatures.String()
}
}
}
}
}
dh.SendMessage([]map[string]interface{}{message})
}
}