service/feature_reply.go (276 lines of code) (raw):
package service
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/alibaba/pairec/v2/abtest"
"github.com/alibaba/pairec/v2/algorithm"
"github.com/alibaba/pairec/v2/algorithm/eas"
"github.com/alibaba/pairec/v2/algorithm/response"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/service/rank"
"github.com/alibaba/pairec/v2/utils"
"github.com/aliyun/aliyun-pairec-config-go-sdk/v2/model"
jsoniter "github.com/json-iterator/go"
)
type FeatureReplyService struct {
RecommendService
}
func NewFeatureReplyService() *FeatureReplyService {
service := FeatureReplyService{}
return &service
}
func (r *FeatureReplyService) FeatureReply(userFeatures string, itemFeatures, itemids []string, context *context.RecommendContext) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
userId := r.GetUID(context)
user := module.NewUserWithContext(userId, context)
userProperties := make(map[string]interface{}, 0)
if userFeatures != "" {
features := make(map[string]*utils.FeatureInfo, 0)
if err := json.Unmarshal([]byte(userFeatures), &features); err != nil {
log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureReply\terror=%v", context.RecommendId, err))
return
}
for name, feature := range features {
userProperties[name] = utils.GetValueByType(feature.Value, feature.Type)
}
}
user.SetProperties(userProperties)
items := make([]*module.Item, 0, len(itemids))
for i, itemFeature := range itemFeatures {
itemProperties := make(map[string]interface{}, 0)
if itemFeature != "" {
features := make(map[string]*utils.FeatureInfo, 0)
if err := json.Unmarshal([]byte(itemFeature), &features); err != nil {
log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureReply\terror=%v", context.RecommendId, err))
return
}
for name, feature := range features {
itemProperties[name] = utils.GetValueByType(feature.Value, feature.Type)
}
}
item := module.NewItemWithProperty(itemids[i], itemProperties)
items = append(items, item)
}
if module, ok := userProperties["_module_"]; ok && module.(string) == "general_rank" {
r.generalRank(user, items, context)
} else {
r.rank(user, items, context)
}
}
func (r *FeatureReplyService) generalRank(user *module.User, items []*module.Item, context *context.RecommendContext) {
start := time.Now()
rankItems := items
scene := context.GetParameter("scene").(string)
// find rank config
var rankConfig recconf.GeneralRankConfig
found := false
if context.ExperimentResult != nil {
rankconf := context.ExperimentResult.GetExperimentParams().Get("generalRankConf", "")
if rankconf != "" {
d, _ := json.Marshal(rankconf)
if err := json.Unmarshal(d, &rankConfig); err == nil {
found = true
}
}
}
if !found {
if rankConfigs, ok := recconf.Config.GeneralRankConfs[scene]; ok {
rankConfig = rankConfigs
}
}
algoGenerator := rank.CreateAlgoDataGenerator(rankConfig.RankConf.Processor, rankConfig.RankConf.ContextFeatures)
var userFeatures map[string]interface{}
if rankConfig.RankConf.Processor == eas.Eas_Processor_EASYREC {
algoGenerator.SetItemFeatures(rankConfig.RankConf.ItemFeatures)
userFeatures = user.MakeUserFeatures2()
} else {
userFeatures = user.MakeUserFeatures()
}
for _, item := range rankItems {
features := item.GetFeatures()
algoGenerator.AddFeatures(item, features, userFeatures)
}
var algoData rank.IAlgoData
if algoGenerator.HasFeatures() {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(1)
}
var wg sync.WaitGroup
for _, algoName := range rankConfig.RankConf.RankAlgoList {
wg.Add(1)
go func(algo string) {
defer wg.Done()
newAlgoName := algo + "_feature_reply"
found := false
var processor string
for _, config := range context.Config.AlgoConfs {
if config.Name == newAlgoName {
found = true
processor = config.EasConf.Processor
break
}
}
if !found {
var algoConfig recconf.AlgoConfig
for _, config := range context.Config.AlgoConfs {
if config.Name == algo {
algoConfig = config
processor = config.EasConf.Processor
// change algoname and response function name
algoConfig.Name = newAlgoName
if algoConfig.EasConf.ResponseFuncName != "" {
algoConfig.EasConf.ResponseFuncName += "Debug"
}
algorithm.AddAlgoWithSign(algoConfig)
break
}
}
}
// run 返回原始的值,然后处理返回数据// 注册配置
ret, err := algorithm.Run(newAlgoName, algoData.GetFeatures())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
algoData.SetError(err)
} else {
if result, ok := ret.([]response.AlgoResponse); ok {
algoData.SetAlgoResult(algo, result)
if processor == eas.Eas_Processor_EASYREC {
itemList := algoData.GetItems()
for j := 0; j < len(result) && j < len(itemList); j++ {
response, _ := (result[j]).(*eas.EasyrecResponse)
itemList[j].AddProperty("raw_features", response.RawFeatures)
itemList[j].AddProperty("generate_features", response.GenerateFeatures.String())
itemList[j].AddProperty("context_features", response.ContextFeatures)
}
}
}
}
}(algoName)
}
wg.Wait()
if algoData.Error() == nil && algoData.GetAlgoResult() != nil {
go r.logFeatureReplyResult(user, items, context)
}
log.Info(fmt.Sprintf("requestId=%s\tmodule=general_rank\tcost=%d", context.RecommendId, utils.CostTime(start)))
}
func (r *FeatureReplyService) rank(user *module.User, items []*module.Item, context *context.RecommendContext) {
start := time.Now()
rankItems := items
scene := context.GetParameter("scene").(string)
// find rank config
var rankConfig recconf.RankConfig
found := false
if context.ExperimentResult != nil {
rankconf := context.ExperimentResult.GetExperimentParams().Get("rankconf", "")
if rankconf != "" {
d, _ := json.Marshal(rankconf)
if err := json.Unmarshal(d, &rankConfig); err == nil {
found = true
}
}
}
if !found {
if rankConfigs, ok := recconf.Config.RankConf[scene]; ok {
rankConfig = rankConfigs
}
}
algoGenerator := rank.CreateAlgoDataGenerator(rankConfig.Processor, rankConfig.ContextFeatures)
var userFeatures map[string]interface{}
if rankConfig.Processor == eas.Eas_Processor_EASYREC {
userFeatures = user.MakeUserFeatures2()
algoGenerator.SetItemFeatures(rankConfig.ItemFeatures)
} else {
userFeatures = user.MakeUserFeatures()
}
for _, item := range rankItems {
features := item.GetFeatures()
algoGenerator.AddFeatures(item, features, userFeatures)
}
var algoData rank.IAlgoData
if algoGenerator.HasFeatures() {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(1)
}
var wg sync.WaitGroup
for _, algoName := range rankConfig.RankAlgoList {
wg.Add(1)
go func(algo string) {
defer wg.Done()
userAlgo := user.StringProperty("_algo_")
// algo name not equal
if userAlgo != "" && userAlgo != algo {
return
}
newAlgoName := algo + "_feature_reply"
found := false
var processor string
for _, config := range context.Config.AlgoConfs {
if config.Name == newAlgoName {
found = true
processor = config.EasConf.Processor
break
}
}
if !found {
var algoConfig recconf.AlgoConfig
for _, config := range context.Config.AlgoConfs {
if config.Name == algo {
algoConfig = config
processor = config.EasConf.Processor
// change algoname and response function name
algoConfig.Name = newAlgoName
if algoConfig.EasConf.ResponseFuncName != "" {
algoConfig.EasConf.ResponseFuncName += "Debug"
}
algorithm.AddAlgoWithSign(algoConfig)
break
}
}
}
// run 返回原始的值,然后处理返回数据// 注册配置
ret, err := algorithm.Run(newAlgoName, algoData.GetFeatures())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
algoData.SetError(err)
} else {
if result, ok := ret.([]response.AlgoResponse); ok {
algoData.SetAlgoResult(algo, result)
if processor == eas.Eas_Processor_EASYREC {
itemList := algoData.GetItems()
for j := 0; j < len(result) && j < len(itemList); j++ {
response, _ := (result[j]).(*eas.EasyrecResponse)
itemList[j].AddProperty("raw_features", response.RawFeatures)
itemList[j].AddProperty("generate_features", response.GenerateFeatures.String())
itemList[j].AddProperty("context_features", response.ContextFeatures)
}
}
}
}
}(algoName)
}
wg.Wait()
if algoData.Error() == nil && algoData.GetAlgoResult() != nil {
/**
for name, algoResult := range algoData.GetAlgoResult() {
itemList := algoData.GetItems()
for j := 0; j < len(algoResult) && j < len(itemList); j++ {
if algoResult[j].GetModuleType() {
arr_score := algoResult[j].GetScoreMap()
for k, v := range arr_score {
itemList[j].AddAlgoScore(name+"_"+k, v)
}
} else {
itemList[j].AddAlgoScore(name, algoResult[j].GetScore())
}
}
}
**/
go r.logFeatureReplyResult(user, items, context)
}
log.Info(fmt.Sprintf("requestId=%s\tmodule=rank\tcost=%d", context.RecommendId, utils.CostTime(start)))
}
func (r *FeatureReplyService) logFeatureReplyResult(user *module.User, items []*module.Item, context *context.RecommendContext) {
//datasourceType := context.GetParameter("datasource_type").(string)
r.logReatureReplyResultToPairecConfigServer(user, items, context)
/**
if datasourceType == "datahub" {
r.logReatureReplyResultToDatahub(user, items, context)
} else if datasourceType == "eas" {
r.logReatureReplyResultToPairecConfigServer(user, items, context)
}
**/
}
func (r *FeatureReplyService) logReatureReplyResultToPairecConfigServer(user *module.User, items []*module.Item, context *context.RecommendContext) {
jobId := context.GetParameter("job_id")
scene := context.Param.GetParameter("scene").(string)
for _, item := range items {
replyData := model.FeatureConsistencyReplyData{}
replyData.FeatureConsistencyCheckJobConfigId = utils.ToString(jobId, "")
replyData.LogRequestId = context.RecommendId
replyData.LogRequestTime = time.Now().UnixMilli()
replyData.SceneName = scene
replyData.LogUserId = string(user.Id)
replyData.LogItemId = string(item.Id)
replyData.RawFeatures = item.StringProperty("raw_features")
replyData.ContextFeatures = item.StringProperty("context_features")
replyData.GeneratedFeatures = item.StringProperty("generate_features")
resp, err := abtest.GetExperimentClient().SyncFeatureConsistencyCheckJobReplayLog(&replyData)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tevent=logReatureReplyResultToPairecConfigServer\tresponse=%v\terror=%v", context.RecommendId, resp, err))
continue
}
}
}