in module/realtime_user2item_u2i2x2i_hologres_dao.go [94:270]
func (d *RealtimeUser2Item2X2ItemHologresDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
itemTriggers := d.GetTriggers(user, context)
if len(itemTriggers) == 0 {
return
}
if d.item2XTable == "" {
for itemId, weight := range itemTriggers {
item := NewItem(itemId)
item.RetrieveId = d.recallName
item.Score = weight
ret = append(ret, item)
}
return
}
var itemIds []interface{}
for id := range itemTriggers {
itemIds = append(itemIds, id)
}
xPreferScoreMap := make(map[string]float64)
sb := sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select("item_id", d.xKey).
From(d.item2XTable).
Where(
sb.In("item_id", itemIds...),
)
sql, args := sb.Build()
stmtkey := len(itemIds)
stmt := d.getItem2XStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.item2XStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.item2XStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := stmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
return
}
defer rows.Close()
xValues := make([]string, 0)
for rows.Next() {
var triggerId, xVal string
if err := rows.Scan(&triggerId, &xVal); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
continue
}
preferScore := itemTriggers[triggerId]
if xVal != "" {
// an item may have many categories, split with delimiter first.
// if a category appears multiple times, add up the scores.
if d.xDelimiter != "" {
for _, v := range strings.Split(xVal, d.xDelimiter) {
xPreferScoreMap[v] += preferScore
xValues = append(xValues, v)
}
} else {
xPreferScoreMap[xVal] += preferScore
xValues = append(xValues, xVal)
}
}
}
xValueMap := make(map[string]bool)
for _, xVal := range xValues {
xValueMap[xVal] = true
}
var mergedXValues []any
for val := range xValueMap {
mergedXValues = append(mergedXValues, val)
}
if len(mergedXValues) == 0 {
return
}
sb = sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select(d.xKey, "item_id").
From(d.x2ItemTable).
Where(
sb.In(d.xKey, mergedXValues...),
)
sql, args = sb.Build()
stmtkey = len(mergedXValues)
stmt = d.getX2ItemStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.x2ItemStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.x2ItemStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel = gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sql, err))
return
}
defer rows.Close()
for rows.Next() {
var xValue, ids string
if err := rows.Scan(&xValue, &ids); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=RealtimeUser2Item2X2ItemHologresDao\terror=%v", context.RecommendId, err))
continue
}
preferScore := xPreferScoreMap[xValue]
list := strings.Split(ids, ",")
for _, str := range list {
strs := strings.Split(str, ":")
if len(strs[0]) > 0 && strs[0] != "null" {
if _, ok := itemTriggers[strs[0]]; ok { // if item id has been in trigger, ignore it
continue
}
}
item := NewItem(strs[0])
item.RetrieveId = d.recallName
item.Score = preferScore
if len(strs) == 2 {
if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[1]), 64); err == nil {
item.Score = item.Score * tmpScore
}
}
ret = append(ret, item)
}
}
gosort.Sort(gosort.Reverse(ItemScoreSlice(ret)))
ret = uniqItems(ret)
if len(ret) > d.recallCount {
ret = ret[:d.recallCount]
}
return
}