in module/user2item_hologres_dao.go [45:199]
func (d *User2ItemHologresDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
uid := string(user.Id)
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select("item_ids")
builder.From(d.userTable)
builder.Where(builder.Equal("uid", uid))
sqlquery, args := builder.Build()
if d.userStmt == nil {
d.mu.Lock()
if d.userStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.userStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := d.userStmt.Query(args...)
if err != nil {
d.mu.Lock()
if d.userStmt != nil {
d.userStmt.Close()
}
d.userStmt = nil
d.mu.Unlock()
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
itemIds := make([]string, 0)
for rows.Next() {
var ids string
if err := rows.Scan(&ids); err == nil {
idList := strings.Split(ids, ",")
for _, id := range idList {
if len(id) > 0 {
itemIds = append(itemIds, id)
}
}
}
}
rows.Close()
if len(itemIds) == 0 {
log.Info(fmt.Sprintf("module=User2ItemHologresDao\tuid=%s\terr=item ids empty", uid))
return
}
if len(itemIds) > 100 {
rand.Shuffle(len(itemIds)/2, func(i, j int) {
itemIds[i], itemIds[j] = itemIds[j], itemIds[i]
})
itemIds = itemIds[:100]
}
cpuCount := 4
maps := make(map[int][]interface{})
for i, id := range itemIds {
//maps[i%cpuCount] = append(maps[i%cpuCount], "'"+id+"'")
maps[i%cpuCount] = append(maps[i%cpuCount], id)
}
itemIdCh := make(chan []interface{}, cpuCount)
for _, ids := range maps {
itemIdCh <- ids
}
itemCh := make(chan []*Item, cpuCount)
for i := 0; i < cpuCount; i++ {
go func() {
result := make([]*Item, 0)
LOOP:
for {
select {
case ids := <-itemIdCh:
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select("similar_item_ids")
builder.From(d.itemTable)
builder.Where(builder.In("item_id", ids...))
sqlquery, args := builder.Build()
stmtkey := len(ids)
stmt := d.getItemStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.itemStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
goto LOOP
}
d.itemStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := stmt.Query(args...)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemHologresDao\tsql=%s\terror=%v", context.RecommendId, sqlquery, err))
goto LOOP
}
for rows.Next() {
var ids string
if err := rows.Scan(&ids); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemHologresDao\terror=%v", context.RecommendId, err))
goto LOOP
}
list := strings.Split(ids, ",")
for _, str := range list {
strs := strings.Split(str, ":")
if len(strs) == 2 && len(strs[0]) > 0 && strs[0] != "null" {
item := NewItem(strs[0])
item.RetrieveId = d.recallName
item.ItemType = d.itemType
if tmpScore, err := strconv.ParseFloat(strs[1], 64); err == nil {
item.Score = tmpScore
}
result = append(result, item)
}
}
}
rows.Close()
default:
goto DONE
}
}
DONE:
itemCh <- result
}()
}
for i := 0; i < cpuCount; i++ {
items := <-itemCh
ret = append(ret, items...)
}
close(itemCh)
close(itemIdCh)
return
}