module/realtime_user2item_u2i2x2i_hologres_dao.go (347 lines of code) (raw):
package module
import (
gocontext "context"
"database/sql"
"fmt"
gosort "sort"
"strconv"
"strings"
"sync"
"time"
"github.com/Knetic/govaluate"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/persist/holo"
"github.com/alibaba/pairec/v2/recconf"
"github.com/huandu/go-sqlbuilder"
)
type RealtimeUser2Item2X2ItemHologresDao struct {
*RealtimeUser2ItemBaseDao
hasPlayTimeField bool
itemCount int
db *sql.DB
userTriggerTable string
whereClause string
item2XTable string
x2ItemTable string
xKey string
xDelimiter string
weightEvaluableExpression *govaluate.EvaluableExpression
weightMode string
mu sync.RWMutex
userStmt *sql.Stmt
item2XStmtMap map[int]*sql.Stmt
x2ItemStmtMap map[int]*sql.Stmt
}
func NewRealtimeUser2Item2X2ItemHologresDao(config recconf.RecallConfig) *RealtimeUser2Item2X2ItemHologresDao {
dao := &RealtimeUser2Item2X2ItemHologresDao{
itemCount: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.ItemCount,
userTriggerTable: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.HologresTableName,
whereClause: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.WhereClause,
hasPlayTimeField: true,
item2XTable: config.RealTimeUser2ItemDaoConf.Item2XTable,
x2ItemTable: config.RealTimeUser2ItemDaoConf.X2ItemTable,
xKey: config.RealTimeUser2ItemDaoConf.XKey,
xDelimiter: config.RealTimeUser2ItemDaoConf.XDelimiter,
item2XStmtMap: make(map[int]*sql.Stmt, 0),
x2ItemStmtMap: make(map[int]*sql.Stmt, 0),
weightMode: config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.WeightMode,
RealtimeUser2ItemBaseDao: NewRealtimeUser2ItemBaseDao(&config),
}
if config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.NoUsePlayTimeField {
dao.hasPlayTimeField = false
}
hologres, err := holo.GetPostgres(config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.HologresName)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return nil
}
dao.db = hologres.DB
expression, err := govaluate.NewEvaluableExpressionWithFunctions(config.RealTimeUser2ItemDaoConf.UserTriggerDaoConf.WeightExpression,
govaluateFunctions)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return nil
}
dao.weightEvaluableExpression = expression
if dao.weightMode == "" {
dao.weightMode = weight_mode_sum
}
return dao
}
func (d *RealtimeUser2Item2X2ItemHologresDao) getItem2XStmt(key int) *sql.Stmt {
d.mu.RLock()
defer d.mu.RUnlock()
return d.item2XStmtMap[key]
}
func (d *RealtimeUser2Item2X2ItemHologresDao) getX2ItemStmt(key int) *sql.Stmt {
d.mu.RLock()
defer d.mu.RUnlock()
return d.x2ItemStmtMap[key]
}
func (d *RealtimeUser2Item2X2ItemHologresDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
itemTriggers := d.GetTriggers(user, context)
if len(itemTriggers) == 0 {
return
}
if d.item2XTable == "" {
for itemId, weight := range itemTriggers {
item := NewItem(itemId)
item.RetrieveId = d.recallName
item.Score = weight
ret = append(ret, item)
}
return
}
var itemIds []interface{}
for id := range itemTriggers {
itemIds = append(itemIds, id)
}
xPreferScoreMap := make(map[string]float64)
sb := sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select("item_id", d.xKey).
From(d.item2XTable).
Where(
sb.In("item_id", itemIds...),
)
sql, args := sb.Build()
stmtkey := len(itemIds)
stmt := d.getItem2XStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.item2XStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.item2XStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := stmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
return
}
defer rows.Close()
xValues := make([]string, 0)
for rows.Next() {
var triggerId, xVal string
if err := rows.Scan(&triggerId, &xVal); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
continue
}
preferScore := itemTriggers[triggerId]
if xVal != "" {
// an item may have many categories, split with delimiter first.
// if a category appears multiple times, add up the scores.
if d.xDelimiter != "" {
for _, v := range strings.Split(xVal, d.xDelimiter) {
xPreferScoreMap[v] += preferScore
xValues = append(xValues, v)
}
} else {
xPreferScoreMap[xVal] += preferScore
xValues = append(xValues, xVal)
}
}
}
xValueMap := make(map[string]bool)
for _, xVal := range xValues {
xValueMap[xVal] = true
}
var mergedXValues []any
for val := range xValueMap {
mergedXValues = append(mergedXValues, val)
}
if len(mergedXValues) == 0 {
return
}
sb = sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select(d.xKey, "item_id").
From(d.x2ItemTable).
Where(
sb.In(d.xKey, mergedXValues...),
)
sql, args = sb.Build()
stmtkey = len(mergedXValues)
stmt = d.getX2ItemStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.x2ItemStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.x2ItemStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel = gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
return
}
defer rows.Close()
for rows.Next() {
var xValue, ids string
if err := rows.Scan(&xValue, &ids); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
continue
}
preferScore := xPreferScoreMap[xValue]
list := strings.Split(ids, ",")
for _, str := range list {
strs := strings.Split(str, ":")
if len(strs[0]) > 0 && strs[0] != "null" {
if _, ok := itemTriggers[strs[0]]; ok { // if item id has been in trigger, ignore it
continue
}
}
item := NewItem(strs[0])
item.RetrieveId = d.recallName
item.Score = preferScore
if len(strs) == 2 {
if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[1]), 64); err == nil {
item.Score = item.Score * tmpScore
}
}
ret = append(ret, item)
}
}
gosort.Sort(gosort.Reverse(ItemScoreSlice(ret)))
ret = uniqItems(ret)
if len(ret) > d.recallCount {
ret = ret[:d.recallCount]
}
return
}
func (d *RealtimeUser2Item2X2ItemHologresDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) {
itemTriggerMap := make(map[string]*TriggerInfo, d.limit)
var selectFields []string
if d.hasPlayTimeField {
selectFields = []string{"item_id", "event", "play_time", "timestamp"}
} else {
selectFields = []string{"item_id", "event", "timestamp"}
}
if len(d.propertyFields) > 0 {
selectFields = append(selectFields, d.propertyFields...)
}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.userTriggerTable)
where := []string{builder.Equal("user_id", string(user.Id))}
if d.whereClause != "" {
where = append(where, d.whereClause)
}
builder.Where(where...).Limit(d.limit)
builder.OrderBy("timestamp").Desc()
sqlquery, args := builder.Build()
if d.userStmt == nil {
d.mu.Lock()
if d.userStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.userStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := d.userStmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
defer rows.Close()
currentTime := time.Now()
for rows.Next() {
trigger := new(TriggerInfo)
var dst []interface{}
if d.hasPlayTimeField {
dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.playTime, &trigger.timestamp}
} else {
dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.timestamp}
}
if len(d.propertyFields) > 0 {
trigger.propertyFieldValues = make([]sql.NullString, len(d.propertyFields))
for i := range trigger.propertyFieldValues {
dst = append(dst, &trigger.propertyFieldValues[i])
}
}
if err := rows.Scan(dst...); err == nil {
if t, exist := d.eventPlayTimeMap[trigger.event]; exist {
if trigger.playTime <= t {
continue
}
}
weightScore := float64(1)
if score, ok := d.eventWeightMap[trigger.event]; ok {
weightScore = score
}
eventScore := float64(0)
properties := map[string]interface{}{
"currentTime": float64(currentTime.Unix()),
"eventTime": float64(trigger.timestamp),
}
if result, err := d.weightEvaluableExpression.Evaluate(properties); err == nil {
if value, ok := result.(float64); ok {
eventScore = value
}
}
weight := weightScore * eventScore
if info, exist := itemTriggerMap[trigger.ItemId]; exist {
switch d.weightMode {
case weight_mode_max:
if weight > info.Weight {
info.Weight = weight
}
default:
info.Weight += weight
}
} else {
trigger.Weight = weight
itemTriggerMap[trigger.ItemId] = trigger
}
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
for _, triggerInfo := range itemTriggerMap {
triggerInfos = append(triggerInfos, triggerInfo)
}
gosort.Sort(gosort.Reverse(TriggerInfoSlice(triggerInfos)))
triggerInfos = d.DiversityTriggers(triggerInfos)
if len(triggerInfos) > d.triggerCount {
triggerInfos = triggerInfos[:d.triggerCount]
}
return
}
func (d *RealtimeUser2Item2X2ItemHologresDao) GetTriggers(user *User, context *context.RecommendContext) (itemTriggers map[string]float64) {
triggerInfos := d.GetTriggerInfos(user, context)
itemTriggers = make(map[string]float64, len(triggerInfos))
for _, trigger := range triggerInfos {
itemTriggers[trigger.ItemId] = trigger.Weight
}
return
}