web/callback_controller.go (237 lines of code) (raw):
package web
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/alibaba/pairec/v2/abtest"
"github.com/alibaba/pairec/v2/context"
plog "github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/service"
"github.com/alibaba/pairec/v2/utils"
"github.com/aliyun/aliyun-pairec-config-go-sdk/v2/model"
)
type CallBackParam struct {
SceneId string `json:"scene_id"`
RequestId string `json:"request_id"`
Uid string `json:"uid"`
Features map[string]interface{} `json:"features"`
ComplexTypeFeatures ComplexTypeFeatures `json:"complex_type_features"`
ItemList []map[string]interface{} `json:"item_list"`
RequestInfo map[string]interface{} `json:"request_info"`
Debug bool `json:"debug"`
}
func (r *CallBackParam) GetParameter(name string) interface{} {
if name == "uid" {
return r.Uid
} else if name == "scene" {
return r.SceneId + "_callback"
} else if name == "request_id" {
return r.RequestId
} else if name == "features" {
return r.Features
} else if name == "category" {
return "default"
}
return nil
}
type CallBackResponse struct {
Response
}
func (r *CallBackResponse) ToString() string {
j, _ := json.Marshal(r)
return string(j)
}
type CallBackController struct {
Controller
param CallBackParam
context *context.RecommendContext
}
func (c *CallBackController) Process(w http.ResponseWriter, r *http.Request) {
c.Start = time.Now()
var err error
c.RequestBody, err = io.ReadAll(r.Body)
if err != nil {
c.dealResponse(w, r, ERROR_PARAMETER_CODE, "read parammeter error")
return
}
if len(c.RequestBody) == 0 {
c.dealResponse(w, r, ERROR_PARAMETER_CODE, "request body empty")
return
}
if err := c.CheckParameter(); err != nil {
c.dealResponse(w, r, ERROR_PARAMETER_CODE, err.Error())
return
}
if c.param.RequestId != "" {
c.RequestId = c.param.RequestId
} else {
c.RequestId = utils.UUID()
}
c.LogRequestBegin(r)
c.doProcess(w, r)
c.End = time.Now()
c.LogRequestEnd(r)
}
func (r *CallBackController) CheckParameter() error {
if err := json.Unmarshal(r.RequestBody, &r.param); err != nil {
return err
}
if len(r.param.Uid) == 0 {
return errors.New("uid not empty")
}
if len(r.param.RequestId) == 0 {
return errors.New("request_id not empty")
}
if len(r.param.SceneId) == 0 {
return errors.New("scene_id not empty")
}
if len(r.param.ItemList) == 0 {
return errors.New("recommend item list not empty")
}
if len(r.param.ComplexTypeFeatures.FeaturesMap) > 0 {
if r.param.Features == nil {
r.param.Features = make(map[string]interface{})
}
for k, v := range r.param.ComplexTypeFeatures.FeaturesMap {
r.param.Features[k] = v
}
}
return nil
}
func (c *CallBackController) doProcess(w http.ResponseWriter, r *http.Request) {
// write log async
/**
go func() {
c.doCallbackLog()
}()
**/
Send(c)
c.dealResponse(w, r, SUCCESS_CODE, "success")
}
// doCallbackLog log user features and fg features(invoke eas if have)
func (c *CallBackController) doCallbackLog() {
c.makeCallBackContext()
userId := module.UID(c.param.Uid)
user := module.NewUserWithContext(userId, c.context)
callBackService := service.NewCallBackService()
callBackService.User = user
callBackService.LoadUserFeatures(c.context)
var items []*module.Item
for _, info := range c.param.ItemList {
itemId := fmt.Sprintf("%v", info["item_id"])
item := module.NewItem(itemId)
items = append(items, item)
for k, v := range info {
if k == "item_id" {
continue
}
item.AddProperty(k, v)
}
}
// CallBackProcessFunc process
if f, ok := callBackProcessFuncMap[c.param.SceneId]; ok {
f(user, items, c.context)
}
// load characteristics
callBackService.Items = items
// invoke recall
callBackService.Recommend(c.context)
callBackService.LoadFeatures(c.context)
// model rank
callBackService.Rank(c.context)
currTime := time.Now()
log := make(map[string]interface{})
requestInfo := make(map[string]interface{})
log["request_id"] = c.param.RequestId
log["scene"] = c.param.SceneId
log["request_time"] = currTime.Unix()
userFeaturesData, _ := json.Marshal(callBackService.User.MakeUserFeatures())
log["user_features"] = string(userFeaturesData)
log["user_id"] = string(callBackService.User.Id)
for k, v := range c.param.RequestInfo {
requestInfo[k] = v
}
requestInfoData, _ := json.Marshal(requestInfo)
log["request_info"] = string(requestInfoData)
// first write user log
log["module"] = "user"
if c.param.Debug {
msg, _ := json.Marshal(log)
info := fmt.Sprintf("requestId=%s\tmsg=%s", c.RequestId, string(msg))
plog.Info(info)
}
if err := callBackService.RecordLogList(c.context, []map[string]interface{}{log}); err != nil {
plog.Error(fmt.Sprintf("requestId=%s\tevent=RecordLogList\terror=%v", c.RequestId, err))
return
}
// write item feature
var messages []map[string]interface{}
i := 0
for _, item := range callBackService.Items {
log := make(map[string]interface{})
log["request_id"] = c.param.RequestId
log["scene"] = c.param.SceneId
log["request_time"] = currTime.Unix()
log["module"] = "item"
log["item_id"] = string(item.Id)
log["user_id"] = string(callBackService.User.Id)
log["raw_features"] = ""
if str, ok := item.Properties["raw_features"]; ok {
log["raw_features"] = str
}
//log["raw_features"] = item.StringProperty("raw_features")
log["generate_features"] = ""
if buf, ok := item.Properties["generate_features"]; ok && buf != nil {
log["generate_features"] = buf.(*bytes.Buffer).String()
}
//log["generate_features"] = item.StringProperty("generate_features")
//log["context_features"] = item.StringProperty("context_features")
log["context_features"] = ""
if str, ok := item.Properties["context_features"]; ok {
log["context_features"] = str
}
//log["context_features"] = item.StringProperty("context_features")
delete(item.Properties, "raw_features")
delete(item.Properties, "generate_features")
delete(item.Properties, "context_features")
itemFeatures := item.GetFeatures()
itemFeaturesData, _ := json.Marshal(itemFeatures)
log["item_features"] = string(itemFeaturesData)
if c.param.Debug {
msg, _ := json.Marshal(log)
info := fmt.Sprintf("requestId=%s\tmsg=%s", c.RequestId, string(msg))
plog.Info(info)
}
messages = append(messages, log)
i++
if i%10 == 0 {
if err := callBackService.RecordLogList(c.context, messages); err != nil {
plog.Error(fmt.Sprintf("requestId=%s\tevent=RecordLogList\terror=%v", c.RequestId, err))
return
}
messages = messages[:0]
}
}
if len(messages) > 0 {
if err := callBackService.RecordLogList(c.context, messages); err != nil {
plog.Error(fmt.Sprintf("requestId=%s\tevent=RecordLogList\terror=%v", c.RequestId, err))
return
}
}
}
func (c *CallBackController) dealResponse(w http.ResponseWriter, r *http.Request, code int, msg string) {
response := CallBackResponse{
Response: Response{
RequestId: c.RequestId,
Code: code,
Message: msg,
},
}
io.WriteString(w, response.ToString())
}
func (c *CallBackController) makeCallBackContext() {
c.context = context.NewRecommendContext()
c.context.Param = &c.param
c.context.Config = recconf.Config
c.context.RecommendId = c.RequestId
c.context.Debug = c.param.Debug
abcontext := model.ExperimentContext{
Uid: c.param.Uid,
RequestId: c.RequestId,
FilterParams: map[string]interface{}{},
}
sceneId := c.param.SceneId + "_callback"
if abtest.GetExperimentClient() != nil {
c.context.ExperimentResult = abtest.GetExperimentClient().MatchExperiment(sceneId, &abcontext)
plog.Info(c.context.ExperimentResult.Info())
}
}