module/user_collaborative_tablestore_dao.go (219 lines of code) (raw):

package module import ( "fmt" "strconv" "strings" "time" "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/tablestoredb" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" ) type UserCollaborativeTableStoreDao struct { tablestore *tablestoredb.TableStore itemType string userTable string itemTable string recallName string normalization bool } func NewUserCollaborativeTableStoreDao(config recconf.RecallConfig) *UserCollaborativeTableStoreDao { dao := &UserCollaborativeTableStoreDao{} tablestore, err := tablestoredb.GetTableStore(config.UserCollaborativeDaoConf.TableStoreName) if err != nil { log.Error(fmt.Sprintf("%v", err)) return nil } dao.tablestore = tablestore dao.userTable = config.UserCollaborativeDaoConf.User2ItemTable dao.itemTable = config.UserCollaborativeDaoConf.Item2ItemTable dao.itemType = config.ItemType dao.recallName = config.Name if config.UserCollaborativeDaoConf.Normalization == "on" || config.UserCollaborativeDaoConf.Normalization == "" { dao.normalization = true } return dao } func (d *UserCollaborativeTableStoreDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) { getRowRequest := new(tablestore.GetRowRequest) criteria := new(tablestore.SingleRowQueryCriteria) putPk := new(tablestore.PrimaryKey) putPk.AddPrimaryKeyColumn("user_id", string(user.Id)) criteria.PrimaryKey = putPk criteria.ColumnsToGet = []string{"item_ids"} getRowRequest.SingleRowQueryCriteria = criteria getRowRequest.SingleRowQueryCriteria.TableName = d.userTable getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 start := time.Now() getResp, err := d.tablestore.Client.GetRow(getRowRequest) log.Debug(fmt.Sprintf("UserCollaborativeTableStoreDao resp, cost=%d", utils.CostTime(start))) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\terror=%v", context.RecommendId, err)) return } if len(getResp.Columns) == 0 { log.Info(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\tuid=%s\terr=item ids empty", context.RecommendId, user.Id)) return } var ids string if str, ok := getResp.Columns[0].Value.(string); ok { ids = str } itemIds := strings.Split(ids, ",") if len(itemIds) == 0 { log.Info(fmt.Sprintf("module=UserCollaborativeTableStoreDao\tuid=%s\terr=item ids empty", user.Id)) return } preferScoreMap := make(map[string]float64) cpuCount := 4 maps := make(map[int][]string) for i, id := range itemIds { ss := strings.Split(id, ":") if ss[0] == "" { continue } preferScoreMap[ss[0]] = 1 if len(ss) > 1 { if score, err := strconv.ParseFloat(ss[1], 64); err == nil { preferScoreMap[ss[0]] = score } else { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\tevent=ParsePreferScore\tuid=%s\terr=%v", context.RecommendId, user.Id, err)) } } maps[i%cpuCount] = append(maps[i%cpuCount], ss[0]) } itemIdCh := make(chan []string, cpuCount) for _, ids := range maps { itemIdCh <- ids } itemCh := make(chan []*Item, cpuCount) for i := 0; i < cpuCount; i++ { go func() { result := make([]*Item, 0) LOOP: for { select { case ids := <-itemIdCh: batchGetRowRequest := new(tablestore.BatchGetRowRequest) multiRowQueryCriteria := new(tablestore.MultiRowQueryCriteria) multiRowQueryCriteria.ColumnsToGet = []string{"similar_item_ids"} multiRowQueryCriteria.TableName = d.itemTable multiRowQueryCriteria.MaxVersion = 1 for _, id := range ids { putPk := new(tablestore.PrimaryKey) putPk.AddPrimaryKeyColumn("item_id", id) multiRowQueryCriteria.AddRow(putPk) } batchGetRowRequest.MultiRowQueryCriteria = []*tablestore.MultiRowQueryCriteria{multiRowQueryCriteria} batchGetResp, err := d.tablestore.Client.BatchGetRow(batchGetRowRequest) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\terror=%v", context.RecommendId, err)) goto LOOP } for _, rows := range batchGetResp.TableToRowsResult { for _, row := range rows { if row.IsSucceed && len(row.Columns) > 0 { var preferScore float64 = 1 pks := row.PrimaryKey.PrimaryKeys for _, pk := range pks { if pk.ColumnName == "item_id" { triggerId, _ := pk.Value.(string) preferScore = preferScoreMap[triggerId] } } if str, ok := row.Columns[0].Value.(string); ok { list := strings.Split(str, ",") for _, id := range list { strs := strings.Split(id, ":") if len(strs) == 1 && strs[0] != "" && strs[0] != "null" { item := NewItem(strs[0]) item.RetrieveId = d.recallName item.ItemType = d.itemType result = append(result, item) } else if len(strs) == 2 && len(strs[0]) > 0 && strs[0] != "null" { item := NewItem(strs[0]) item.RetrieveId = d.recallName item.ItemType = d.itemType if tmpScore, err := strconv.ParseFloat(strs[1], 64); err == nil { item.Score = tmpScore * preferScore } else { item.Score = preferScore } result = append(result, item) } } } } } } default: goto DONE } } DONE: itemCh <- result }() } ret = mergeUserCollaborativeItemsResult(itemCh, cpuCount, d.normalization) close(itemCh) close(itemIdCh) return } func (d *UserCollaborativeTableStoreDao) GetTriggers(user *User, context *context.RecommendContext) (itemTriggers map[string]float64) { itemTriggers = make(map[string]float64) triggerInfos := d.GetTriggerInfos(user, context) for _, trigger := range triggerInfos { itemTriggers[trigger.ItemId] = trigger.Weight } return } func (d *UserCollaborativeTableStoreDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) { getRowRequest := new(tablestore.GetRowRequest) criteria := new(tablestore.SingleRowQueryCriteria) putPk := new(tablestore.PrimaryKey) putPk.AddPrimaryKeyColumn("user_id", string(user.Id)) criteria.PrimaryKey = putPk criteria.ColumnsToGet = []string{"item_ids"} getRowRequest.SingleRowQueryCriteria = criteria getRowRequest.SingleRowQueryCriteria.TableName = d.userTable getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 getResp, err := d.tablestore.Client.GetRow(getRowRequest) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\terror=%v", context.RecommendId, err)) return } if len(getResp.Columns) == 0 { log.Info(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\tuid=%s\terr=item ids empty", context.RecommendId, user.Id)) return } var ids string if str, ok := getResp.Columns[0].Value.(string); ok { ids = str } itemIds := strings.Split(ids, ",") if len(itemIds) == 0 { log.Info(fmt.Sprintf("module=UserCollaborativeTableStoreDao\tuid=%s\terr=item ids empty", user.Id)) return } for _, id := range itemIds { ss := strings.Split(id, ":") if ss[0] == "" { continue } trigger := &TriggerInfo{ ItemId: ss[0], Weight: 1, } if len(ss) > 1 { if score, err := strconv.ParseFloat(ss[1], 64); err == nil { trigger.Weight = score } else { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeTableStoreDao\tevent=ParsePreferScore\tuid=%s\terr=%v", context.RecommendId, user.Id, err)) } } triggerInfos = append(triggerInfos, trigger) } return }