func()

in module/realtime_user2item_u2i2x2i_hologres_dao.go [94:270]


func (d *RealtimeUser2Item2X2ItemHologresDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
	itemTriggers := d.GetTriggers(user, context)
	if len(itemTriggers) == 0 {
		return
	}
	if d.item2XTable == "" {
		for itemId, weight := range itemTriggers {
			item := NewItem(itemId)
			item.RetrieveId = d.recallName
			item.Score = weight
			ret = append(ret, item)
		}
		return
	}

	var itemIds []interface{}
	for id := range itemTriggers {
		itemIds = append(itemIds, id)
	}

	xPreferScoreMap := make(map[string]float64)

	sb := sqlbuilder.PostgreSQL.NewSelectBuilder()
	sb.Select("item_id", d.xKey).
		From(d.item2XTable).
		Where(
			sb.In("item_id", itemIds...),
		)
	sql, args := sb.Build()

	stmtkey := len(itemIds)
	stmt := d.getItem2XStmt(stmtkey)
	if stmt == nil {
		d.mu.Lock()
		stmt = d.item2XStmtMap[stmtkey]
		if stmt == nil {
			stmt2, err := d.db.Prepare(sql)
			if err != nil {
				log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
				d.mu.Unlock()
				return
			}
			d.item2XStmtMap[stmtkey] = stmt2
			stmt = stmt2
			d.mu.Unlock()
		} else {
			d.mu.Unlock()
		}
	}
	ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
	defer cancel()

	rows, err := stmt.QueryContext(ctx, args...)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
		return
	}
	defer rows.Close()

	xValues := make([]string, 0)
	for rows.Next() {
		var triggerId, xVal string
		if err := rows.Scan(&triggerId, &xVal); err != nil {
			log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
			continue
		}

		preferScore := itemTriggers[triggerId]

		if xVal != "" {
			// an item may have many categories, split with delimiter first.
			// if a category appears multiple times, add up the scores.
			if d.xDelimiter != "" {
				for _, v := range strings.Split(xVal, d.xDelimiter) {
					xPreferScoreMap[v] += preferScore
					xValues = append(xValues, v)
				}
			} else {
				xPreferScoreMap[xVal] += preferScore
				xValues = append(xValues, xVal)
			}
		}
	}

	xValueMap := make(map[string]bool)
	for _, xVal := range xValues {
		xValueMap[xVal] = true
	}

	var mergedXValues []any
	for val := range xValueMap {
		mergedXValues = append(mergedXValues, val)
	}

	if len(mergedXValues) == 0 {
		return
	}

	sb = sqlbuilder.PostgreSQL.NewSelectBuilder()
	sb.Select(d.xKey, "item_id").
		From(d.x2ItemTable).
		Where(
			sb.In(d.xKey, mergedXValues...),
		)
	sql, args = sb.Build()

	stmtkey = len(mergedXValues)
	stmt = d.getX2ItemStmt(stmtkey)
	if stmt == nil {
		d.mu.Lock()
		stmt = d.x2ItemStmtMap[stmtkey]
		if stmt == nil {
			stmt2, err := d.db.Prepare(sql)
			if err != nil {
				log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
				d.mu.Unlock()
				return
			}
			d.x2ItemStmtMap[stmtkey] = stmt2
			stmt = stmt2
			d.mu.Unlock()
		} else {
			d.mu.Unlock()
		}
	}
	ctx, cancel = gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
	defer cancel()

	rows, err = stmt.QueryContext(ctx, args...)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
		return
	}
	defer rows.Close()

	for rows.Next() {
		var xValue, ids string
		if err := rows.Scan(&xValue, &ids); err != nil {
			log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
			continue
		}

		preferScore := xPreferScoreMap[xValue]

		list := strings.Split(ids, ",")
		for _, str := range list {
			strs := strings.Split(str, ":")

			if len(strs[0]) > 0 && strs[0] != "null" {
				if _, ok := itemTriggers[strs[0]]; ok { // if item id has been in trigger, ignore it
					continue
				}
			}

			item := NewItem(strs[0])
			item.RetrieveId = d.recallName
			item.Score = preferScore

			if len(strs) == 2 {
				if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[1]), 64); err == nil {
					item.Score = item.Score * tmpScore
				}
			}

			ret = append(ret, item)
		}
	}

	gosort.Sort(gosort.Reverse(ItemScoreSlice(ret)))
	ret = uniqItems(ret)

	if len(ret) > d.recallCount {
		ret = ret[:d.recallCount]
	}

	return
}