func()

in module/realtime_user2item_hologres_dao.go [213:333]


func (d *RealtimeUser2ItemHologresDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) {
	itemTriggerMap := make(map[string]*TriggerInfo, d.limit)
	var selectFields []string
	if d.hasPlayTimeField {
		selectFields = []string{"item_id", "event", "play_time", "timestamp"}
	} else {
		selectFields = []string{"item_id", "event", "timestamp"}
	}
	if len(d.propertyFields) > 0 {
		selectFields = append(selectFields, d.propertyFields...)
	}
	builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
	builder.Select(selectFields...)
	builder.From(d.userTriggerTable)
	where := []string{builder.Equal("user_id", string(user.Id))}
	if d.whereClause != "" {
		where = append(where, d.whereClause)
	}
	builder.Where(where...).Limit(d.limit)
	builder.OrderBy("timestamp").Desc()

	sqlquery, args := builder.Build()
	if d.userStmt == nil {
		d.mu.Lock()
		if d.userStmt == nil {
			stmt, err := d.db.Prepare(sqlquery)
			if err != nil {
				log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
				d.mu.Unlock()
				return
			}
			d.userStmt = stmt
			d.mu.Unlock()
		} else {
			d.mu.Unlock()
		}
	}
	ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
	defer cancel()
	rows, err := d.userStmt.QueryContext(ctx, args...)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
		return
	}

	defer rows.Close()
	currentTime := time.Now()
	for rows.Next() {
		trigger := new(TriggerInfo)
		var dst []interface{}
		if d.hasPlayTimeField {
			dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.playTime, &trigger.timestamp}
		} else {
			dst = []interface{}{&trigger.ItemId, &trigger.event, &trigger.timestamp}
		}
		if len(d.propertyFields) > 0 {
			trigger.propertyFieldValues = make([]sql.NullString, len(d.propertyFields))
			for i := range trigger.propertyFieldValues {
				dst = append(dst, &trigger.propertyFieldValues[i])
			}
		}
		if err := rows.Scan(dst...); err == nil {
			if t, exist := d.eventPlayTimeMap[trigger.event]; exist {
				if trigger.playTime <= t {
					continue
				}
			}

			weightScore := float64(1)
			if score, ok := d.eventWeightMap[trigger.event]; ok {
				weightScore = score
			}

			eventScore := float64(0)
			properties := map[string]interface{}{
				"currentTime": float64(currentTime.Unix()),
				"eventTime":   float64(trigger.timestamp),
			}

			if result, err := d.weightEvaluableExpression.Evaluate(properties); err == nil {
				if value, ok := result.(float64); ok {
					eventScore = value
				}
			}

			weight := weightScore * eventScore

			if info, exist := itemTriggerMap[trigger.ItemId]; exist {
				switch d.weightMode {
				case weight_mode_max:
					if weight > info.Weight {
						info.Weight = weight
					}
				default:
					info.Weight += weight
				}
			} else {
				trigger.Weight = weight
				itemTriggerMap[trigger.ItemId] = trigger
			}

			//fmt.Println(trigger.itemId, itemTriggerMap[trigger.itemId])
			//itemTriggers[trigger.itemId] += weight
		} else {
			log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
		}
	}

	for _, triggerInfo := range itemTriggerMap {
		triggerInfos = append(triggerInfos, triggerInfo)
	}
	gosort.Sort(gosort.Reverse(TriggerInfoSlice(triggerInfos)))

	triggerInfos = d.DiversityTriggers(triggerInfos)

	if len(triggerInfos) > d.triggerCount {
		triggerInfos = triggerInfos[:d.triggerCount]
	}

	return
}