service/call_back_service.go (219 lines of code) (raw):
package service
import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/alibaba/pairec/v2/algorithm"
"github.com/alibaba/pairec/v2/algorithm/eas"
"github.com/alibaba/pairec/v2/algorithm/response"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/datasource/datahub"
"github.com/alibaba/pairec/v2/datasource/kafka"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/service/debug"
"github.com/alibaba/pairec/v2/service/feature"
"github.com/alibaba/pairec/v2/service/rank"
"github.com/alibaba/pairec/v2/utils"
)
type CallBackService struct {
recallService *RecallService
featureService *feature.FeatureService
userFeatureService *feature.UserFeatureService
User *module.User
Items []*module.Item
}
func NewCallBackService() *CallBackService {
services := CallBackService{
recallService: &RecallService{},
featureService: feature.DefaultFeatureService(),
userFeatureService: feature.DefaultUserFeatureService(),
}
return &services
}
func (r *CallBackService) LoadUserFeatures(context *context.RecommendContext) {
//user feature prefetch
r.userFeatureService.LoadUserFeaturesForCallback(r.User, context)
}
func (r *CallBackService) Recommend(context *context.RecommendContext) {
scene_name := context.GetParameter("scene").(string)
if _, ok := context.Config.SceneConfs[scene_name]; ok {
items := r.recallService.GetItems(r.User, context)
itemMap := make(map[module.ItemId]*module.Item, len(items))
for _, item := range items {
itemMap[item.Id] = item
}
for _, item := range r.Items {
if itemM, ok := itemMap[item.Id]; ok {
item.RetrieveId = itemM.RetrieveId
}
}
go func() {
debugService := debug.NewDebugService(r.User, context)
debugService.WriteRecallLog(r.User, items, context)
}()
}
}
func (r *CallBackService) LoadFeatures(context *context.RecommendContext) {
// load user features
r.featureService.LoadFeatures(r.User, r.Items, context)
}
func (r *CallBackService) RecordLog(context *context.RecommendContext, msg string) error {
scene_name := context.GetParameter("scene").(string)
scene_name = strings.ReplaceAll(scene_name, "_callback", "")
if callBackConfig, ok := context.Config.CallBackConfs[scene_name]; ok {
data_source := callBackConfig.DataSource
if data_source.Type == recconf.DataSource_Type_Kafka {
return r.RecordToKafka(data_source.Name, msg)
} else if data_source.Type == recconf.DataSource_Type_Datahub {
return r.RecordToDatahub(data_source.Name, msg)
}
}
return nil
}
func (r *CallBackService) RecordToKafka(kafka_name string, msg string) error {
p, error := kafka.GetKafkaProducer(kafka_name)
if error != nil {
return error
}
p.SendMessage([]byte(msg))
return nil
}
func (r *CallBackService) RecordToDatahub(name string, msg string) error {
p, error := datahub.GetDatahub(name)
if error != nil {
return error
}
p.SendMessage([]map[string]interface{}{{"callback_log": msg}})
return nil
}
func (r *CallBackService) RecordLogList(context *context.RecommendContext, messages []map[string]interface{}) error {
scene_name := context.GetParameter("scene").(string)
scene_name = strings.ReplaceAll(scene_name, "_callback", "")
if callBackConfig, ok := context.Config.CallBackConfs[scene_name]; ok {
data_source := callBackConfig.DataSource
if data_source.Type == recconf.DataSource_Type_Kafka {
return r.RecordToKafkaList(data_source.Name, messages)
} else if data_source.Type == recconf.DataSource_Type_Datahub {
return r.RecordToDatahubList(data_source.Name, messages)
}
}
return nil
}
func (r *CallBackService) RecordToKafkaList(kafka_name string, messages []map[string]interface{}) error {
p, error := kafka.GetKafkaProducer(kafka_name)
if error != nil {
return error
}
for _, msg := range messages {
j, _ := json.Marshal(msg)
p.SendMessage(j)
}
return nil
}
func (r *CallBackService) RecordToDatahubList(name string, messages []map[string]interface{}) error {
p, error := datahub.GetDatahub(name)
if error != nil {
return error
}
p.SendMessage(messages)
return nil
}
func (r *CallBackService) Rank(context *context.RecommendContext) {
var rankConfig recconf.RankConfig
scene_name := context.GetParameter("scene").(string)
scene_name = strings.ReplaceAll(scene_name, "_callback", "")
callBackConfig, ok := context.Config.CallBackConfs[scene_name]
if ok {
rankConfig = callBackConfig.RankConf
}
if len(rankConfig.RankAlgoList) == 0 {
return
}
start := time.Now()
rankItems := r.Items
algoGenerator := rank.CreateAlgoDataGenerator(rankConfig.Processor, rankConfig.ContextFeatures)
var userFeatures map[string]interface{}
if rankConfig.Processor == eas.Eas_Processor_EASYREC {
userFeatures = r.User.MakeUserFeatures2()
algoGenerator.SetItemFeatures(rankConfig.ItemFeatures)
} else {
userFeatures = r.User.MakeUserFeatures()
}
for _, item := range rankItems {
features := item.GetFeatures()
algoGenerator.AddFeatures(item, features, userFeatures)
}
var algoData rank.IAlgoData
debugLevel := 3
writeRawFeatrues := false
if callBackConfig.RawFeatures && callBackConfig.RawFeaturesRate > 0 {
if rand.Intn(100) < callBackConfig.RawFeaturesRate {
debugLevel = 1
writeRawFeatrues = true
}
}
if algoGenerator.HasFeatures() {
if context.Debug {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(2)
} else {
algoData = algoGenerator.GeneratorAlgoDataDebugWithLevel(debugLevel)
}
}
var wg sync.WaitGroup
for _, algoName := range rankConfig.RankAlgoList {
wg.Add(1)
go func(algo string) {
defer wg.Done()
newAlgoName := algo + "_callback"
found := false
var processor string
for _, config := range context.Config.AlgoConfs {
if config.Name == newAlgoName {
found = true
processor = config.EasConf.Processor
break
}
}
if !found {
var algoConfig recconf.AlgoConfig
for _, config := range context.Config.AlgoConfs {
if config.Name == algo {
algoConfig = config
processor = config.EasConf.Processor
// change algoname and response function name
algoConfig.Name = newAlgoName
if algoConfig.EasConf.ResponseFuncName != "" {
algoConfig.EasConf.ResponseFuncName += "Debug"
}
algorithm.AddAlgoWithSign(algoConfig)
break
}
}
}
// run 返回原始的值,然后处理返回数据// 注册配置
ret, err := algorithm.Run(newAlgoName, algoData.GetFeatures())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\terror=run algorithm error(%v)", context.RecommendId, err))
algoData.SetError(err)
} else {
if result, ok := ret.([]response.AlgoResponse); ok {
algoData.SetAlgoResult(algo, result)
if processor == eas.Eas_Processor_EASYREC {
itemList := algoData.GetItems()
for j := 0; j < len(result) && j < len(itemList); j++ {
response, _ := (result[j]).(*eas.EasyrecResponse)
if writeRawFeatrues {
itemList[j].AddProperty("raw_features", response.RawFeatures)
}
//itemList[j].AddProperty("generate_features", response.GenerateFeatures)
itemList[j].Properties["generate_features"] = response.GenerateFeatures
itemList[j].AddProperty("context_features", response.ContextFeatures)
}
}
}
}
}(algoName)
}
wg.Wait()
log.Info(fmt.Sprintf("requestId=%s\tmodule=CallBackRank\tcost=%d", context.RecommendId, utils.CostTime(start)))
}