module/realtime_user2item_featurestore_dao.go (240 lines of code) (raw):
package module
import (
"database/sql"
"fmt"
gosort "sort"
"strconv"
"strings"
"time"
"github.com/Knetic/govaluate"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/persist/fs"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/utils"
)
type RealtimeUser2ItemFeatureStoreDao struct {
*RealtimeUser2ItemBaseDao
hasPlayTimeField bool
itemCount int
fsClient *fs.FSClient
userTriggerTable string
itemTable string
weightEvaluableExpression *govaluate.EvaluableExpression
weightMode string
itemIdFieldName string
eventFieldName string
timestampFieldName string
playtimeFieldName string
events []any
similarItemIdField string
}
func NewRealtimeUser2ItemFeatureStoreDao(config recconf.RecallConfig) *RealtimeUser2ItemFeatureStoreDao {
dao := &RealtimeUser2ItemFeatureStoreDao{
itemCount: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.ItemCount,
userTriggerTable: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.FeatureStoreViewName,
hasPlayTimeField: true,
itemTable: config.RealTimeUser2ItemDaoConf.Item2ItemFeatureViewName,
weightMode: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.WeightMode,
RealtimeUser2ItemBaseDao: NewRealtimeUser2ItemBaseDao(&config),
itemIdFieldName: "item_id",
eventFieldName: "event",
playtimeFieldName: "play_time",
timestampFieldName: "timestamp",
similarItemIdField: "similar_item_ids",
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.NoUsePlayTimeField {
dao.hasPlayTimeField = false
}
fsclient, err := fs.GetFeatureStoreClient(config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.FeatureStoreName)
if err != nil {
panic(fmt.Sprintf("error=%v", err))
}
dao.fsClient = fsclient
expression, err := govaluate.NewEvaluableExpressionWithFunctions(config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.WeightExpression,
govaluateFunctions)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return nil
}
dao.weightEvaluableExpression = expression
if dao.weightMode == "" {
dao.weightMode = weight_mode_sum
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.ItemIdFieldName != "" {
dao.itemIdFieldName = config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.ItemIdFieldName
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.EventFieldName != "" {
dao.eventFieldName = config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.EventFieldName
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.TimestampFieldName != "" {
dao.timestampFieldName = config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.TimestampFieldName
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.PlayTimeFieldName != "" {
dao.playtimeFieldName = config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.PlayTimeFieldName
}
if config.RealTimeUser2ItemDaoConf.SimilarItemIdField != "" {
dao.similarItemIdField = config.RealTimeUser2ItemDaoConf.SimilarItemIdField
}
for k := range dao.eventWeightMap {
dao.events = append(dao.events, k)
}
return dao
}
func (d *RealtimeUser2ItemFeatureStoreDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
itemTriggers := d.GetTriggers(user, context)
if len(itemTriggers) == 0 {
return
}
if d.itemTable == "" {
for itemId, weight := range itemTriggers {
item := NewItem(itemId)
item.RetrieveId = d.recallName
item.Score = weight
ret = append(ret, item)
}
return
}
var itemIds []interface{}
for id := range itemTriggers {
itemIds = append(itemIds, id)
}
featureView := d.fsClient.GetProject().GetFeatureView(d.itemTable)
if featureView == nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemFeatureStoreDao\trecallName=%s\terror=featureView not found, featureview:%s", context.RecommendId, d.recallName, d.itemTable))
return
}
features, err := featureView.GetOnlineFeatures(itemIds, []string{d.similarItemIdField}, nil)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemFeatureStoreDao\terror=%v", context.RecommendId, err))
return
}
featureView.GetFeatureEntityName()
featureEntity := d.fsClient.GetProject().GetFeatureEntity(featureView.GetFeatureEntityName())
if featureEntity == nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemFeatureStoreDao\terror=featureEntity not found, featureEntity:%s", context.RecommendId, featureView.GetFeatureEntityName()))
return
}
for _, featureMap := range features {
triggerId := utils.ToString(featureMap[featureEntity.FeatureEntityJoinid], "")
ids := utils.ToString(featureMap[d.similarItemIdField], "")
preferScore := itemTriggers[triggerId]
list := strings.Split(ids, ",")
for _, str := range list {
strs := strings.Split(str, ":")
if strs[0] == "" || strs[0] == "null" {
continue
}
if len(strs) == 2 {
item := NewItem(strs[0])
item.RetrieveId = d.recallName
if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[1]), 64); err == nil {
item.Score = tmpScore * preferScore
} else {
item.Score = preferScore
}
ret = append(ret, item)
} else if len(strs) == 3 { // compatible format itemid1:recall1:score1
item := NewItem(strs[0])
item.RetrieveId = d.recallName
if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[2]), 64); err == nil {
item.Score = tmpScore * preferScore
} else {
item.Score = preferScore
}
ret = append(ret, item)
}
}
}
gosort.Sort(gosort.Reverse(ItemScoreSlice(ret)))
ret = uniqItems(ret)
if len(ret) > d.recallCount {
ret = ret[:d.recallCount]
}
return
}
func (d *RealtimeUser2ItemFeatureStoreDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) {
featureView := d.fsClient.GetProject().GetFeatureView(d.userTriggerTable)
if featureView == nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemFeatureStoreDao\trecallName=%s\terror=featureView not found, featureview:%s", context.RecommendId, d.recallName, d.userTriggerTable))
return
}
itemTriggerMap := make(map[string]*TriggerInfo, 50)
var selectFields []string
if d.hasPlayTimeField {
selectFields = []string{d.itemIdFieldName, d.eventFieldName, d.playtimeFieldName, d.timestampFieldName}
} else {
selectFields = []string{d.itemIdFieldName, d.eventFieldName, d.timestampFieldName}
}
if len(d.propertyFields) > 0 {
selectFields = append(selectFields, d.propertyFields...)
}
features, err := featureView.GetBehaviorFeatures([]any{user.Id}, d.events, selectFields)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemFeatureStoreDao\terror=featurestore error(%v)", context.RecommendId, err))
return
}
currentTime := time.Now()
for _, seqData := range features {
trigger := new(TriggerInfo)
trigger.ItemId = utils.ToString(seqData[d.itemIdFieldName], "")
trigger.event = utils.ToString(seqData[d.eventFieldName], "")
trigger.timestamp = utils.ToInt64(seqData[d.timestampFieldName], 0)
if d.hasPlayTimeField {
trigger.playTime = utils.ToFloat(seqData[d.playtimeFieldName], 0)
}
if t, exist := d.eventPlayTimeMap[trigger.event]; exist {
if trigger.playTime <= t {
continue
}
}
for _, propertyField := range d.propertyFields {
trigger.propertyFieldValues = append(trigger.propertyFieldValues, sql.NullString{String: utils.ToString(seqData[propertyField], ""), Valid: true})
}
weightScore := float64(1)
if score, ok := d.eventWeightMap[trigger.event]; ok {
weightScore = score
}
eventScore := float64(0)
properties := map[string]interface{}{
"currentTime": float64(currentTime.Unix()),
"eventTime": float64(trigger.timestamp),
}
if result, err := d.weightEvaluableExpression.Evaluate(properties); err == nil {
if value, ok := result.(float64); ok {
eventScore = value
}
}
weight := weightScore * eventScore
if info, exist := itemTriggerMap[trigger.ItemId]; exist {
switch d.weightMode {
case weight_mode_max:
if weight > info.Weight {
info.Weight = weight
}
default:
info.Weight += weight
}
} else {
trigger.Weight = weight
itemTriggerMap[trigger.ItemId] = trigger
}
}
for _, triggerInfo := range itemTriggerMap {
triggerInfos = append(triggerInfos, triggerInfo)
}
gosort.Sort(gosort.Reverse(TriggerInfoSlice(triggerInfos)))
triggerInfos = d.DiversityTriggers(triggerInfos)
if len(triggerInfos) > d.triggerCount {
triggerInfos = triggerInfos[:d.triggerCount]
}
return
}
func (d *RealtimeUser2ItemFeatureStoreDao) GetTriggers(user *User, context *context.RecommendContext) (itemTriggers map[string]float64) {
triggerInfos := d.GetTriggerInfos(user, context)
itemTriggers = make(map[string]float64, len(triggerInfos))
for _, trigger := range triggerInfos {
itemTriggers[trigger.ItemId] = trigger.Weight
}
return
}