in module/realtime_user2item_hologres_dao.go [213:333]
func (d *RealtimeUser2ItemHologresDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) {
itemTriggerMap := make(map[string]*TriggerInfo, d.limit)
var selectFields []string
if d.hasPlayTimeField {
selectFields = []string{"item_id", "event", "play_time", "timestamp"}
} else {
selectFields = []string{"item_id", "event", "timestamp"}
}
if len(d.propertyFields) > 0 {
selectFields = append(selectFields, d.propertyFields...)
}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.userTriggerTable)
where := []string{builder.Equal("user_id", string(user.Id))}
if d.whereClause != "" {
where = append(where, d.whereClause)
}
builder.Where(where...).Limit(d.limit)
builder.OrderBy("timestamp").Desc()
sqlquery, args := builder.Build()
if d.userStmt == nil {
d.mu.Lock()
if d.userStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.userStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := d.userStmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
defer rows.Close()
currentTime := time.Now()
for rows.Next() {
trigger := new(TriggerInfo)
var dst []interface{}
if d.hasPlayTimeField {
dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.playTime, &trigger.timestamp}
} else {
dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.timestamp}
}
if len(d.propertyFields) > 0 {
trigger.propertyFieldValues = make([]sql.NullString, len(d.propertyFields))
for i := range trigger.propertyFieldValues {
dst = append(dst, &trigger.propertyFieldValues[i])
}
}
if err := rows.Scan(dst...); err == nil {
if t, exist := d.eventPlayTimeMap[trigger.event]; exist {
if trigger.playTime <= t {
continue
}
}
weightScore := float64(1)
if score, ok := d.eventWeightMap[trigger.event]; ok {
weightScore = score
}
eventScore := float64(0)
properties := map[string]interface{}{
"currentTime": float64(currentTime.Unix()),
"eventTime": float64(trigger.timestamp),
}
if result, err := d.weightEvaluableExpression.Evaluate(properties); err == nil {
if value, ok := result.(float64); ok {
eventScore = value
}
}
weight := weightScore * eventScore
if info, exist := itemTriggerMap[trigger.ItemId]; exist {
switch d.weightMode {
case weight_mode_max:
if weight > info.Weight {
info.Weight = weight
}
default:
info.Weight += weight
}
} else {
trigger.Weight = weight
itemTriggerMap[trigger.ItemId] = trigger
}
//fmt.Println(trigger.itemId, itemTriggerMap[trigger.itemId])
//itemTriggers[trigger.itemId] += weight
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
for _, triggerInfo := range itemTriggerMap {
triggerInfos = append(triggerInfos, triggerInfo)
}
gosort.Sort(gosort.Reverse(TriggerInfoSlice(triggerInfos)))
triggerInfos = d.DiversityTriggers(triggerInfos)
if len(triggerInfos) > d.triggerCount {
triggerInfos = triggerInfos[:d.triggerCount]
}
return
}