func()

in module/user_topic_tablestore_dao.go [50:216]


func (d *UserTopicTableStoreDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) {
	// uid := string(user.Id)
	getRowRequest := new(tablestore.GetRowRequest)
	criteria := new(tablestore.SingleRowQueryCriteria)
	putPk := new(tablestore.PrimaryKey)
	putPk.AddPrimaryKeyColumn("user_id", string(user.Id))

	criteria.PrimaryKey = putPk
	criteria.ColumnsToGet = []string{"favorite_topic"}
	getRowRequest.SingleRowQueryCriteria = criteria
	getRowRequest.SingleRowQueryCriteria.TableName = d.userTopicTable
	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
	getResp, err := d.tablestore.Client.GetRow(getRowRequest)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=UserTopicTableStoreDao\terror=%v", context.RecommendId, err))
		return
	}

	if len(getResp.Columns) == 0 {
		log.Info(fmt.Sprintf("requestId=%s\tmodule=UserTopicTableStoreDao\tuid=%s\terr=topic empty", context.RecommendId, user.Id))
		return
	}

	topics := make([]*topicInfo, 0, d.topicNum)

	if str, ok := getResp.Columns[0].Value.(string); ok {
		idList := strings.Split(str, ",")
		for _, id := range idList {
			tv := strings.Split(id, ":")
			if len(tv) == 2 {
				f, err := strconv.ParseFloat(tv[1], 64)
				if err != nil || f == float64(0) {
					f = float64(0.5)
				}

				info := topicInfo{
					topic: tv[0],
					value: f,
				}
				topics = append(topics, &info)
			} else if len(tv) == 1 {
				info := topicInfo{
					topic: tv[0],
					value: float64(0.5),
				}
				topics = append(topics, &info)
			}
		}
	}

	if len(topics) == 0 {
		log.Info(fmt.Sprintf("module=UserTopicTableStoreDao\tuid=%s\terr=topic empty", user.Id))
		return
	}

	if len(topics) > d.topicNum {
		rand.Shuffle(len(topics), func(i, j int) {
			topics[i], topics[j] = topics[j], topics[i]
		})

		topics = topics[:d.topicNum]
	}
	totalValue := float64(0)
	for _, topic := range topics {
		totalValue += topic.value
	}

	var wg sync.WaitGroup
	for _, topic := range topics {
		topic.count = int((topic.value / totalValue) * float64(d.recallCount))
		wg.Add(1)
		if d.indexName == "" {
			go func(info *topicInfo) ([]*Item, error) {
				defer wg.Done()
				result := make([]*Item, 0)

				getRangeRequest := &tablestore.GetRangeRequest{}

				rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
				rangeRowQueryCriteria.TableName = d.topicItemTable

				startPK := new(tablestore.PrimaryKey)
				startPK.AddPrimaryKeyColumn("topic", info.topic)
				startPK.AddPrimaryKeyColumnWithMinValue("item_id")
				endPK := new(tablestore.PrimaryKey)
				endPK.AddPrimaryKeyColumn("topic", info.topic)
				endPK.AddPrimaryKeyColumnWithMaxValue("item_id")
				rangeRowQueryCriteria.StartPrimaryKey = startPK
				rangeRowQueryCriteria.EndPrimaryKey = endPK
				rangeRowQueryCriteria.ColumnsToGet = []string{"value"}
				rangeRowQueryCriteria.Direction = tablestore.FORWARD
				rangeRowQueryCriteria.MaxVersion = 1
				rangeRowQueryCriteria.Limit = int32(info.count)
				getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria

				getRangeResp, err := d.tablestore.Client.GetRange(getRangeRequest)
				if err != nil {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=UserTopicTableStoreDao\terror=%v", context.RecommendId, err))
					return result, err
				}
				for _, row := range getRangeResp.Rows {
					item := NewItem(row.PrimaryKey.PrimaryKeys[1].Value.(string))
					item.ItemType = d.itemType
					item.RetrieveId = d.recallName
					if len(row.Columns) == 1 {
						item.Score = utils.ToFloat(row.Columns[0].Value, 0)
					}
					result = append(result, item)
				}

				ret = append(ret, result...)
				return result, nil
			}(topic)

		} else {
			go func(info *topicInfo) {
				defer wg.Done()
				result := make([]*Item, 0)

				searchRequest := &tablestore.SearchRequest{}

				searchRequest.TableName = d.topicItemTable
				searchRequest.IndexName = d.indexName
				rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
				rangeRowQueryCriteria.TableName = d.topicItemTable

				pk := new(tablestore.PrimaryKey)
				pk.AddPrimaryKeyColumn("topic", info.topic)

				searchRequest.RoutingValues = append(searchRequest.RoutingValues, pk)
				searchQuery := search.NewSearchQuery()
				searchQuery.SetSort(&search.Sort{
					Sorters: []search.Sorter{
						&search.FieldSort{
							FieldName: "value",
							Order:     search.SortOrder_DESC.Enum(),
						},
					},
				})
				searchQuery.SetLimit(int32(info.count))
				searchRequest.SetSearchQuery(searchQuery)
				searchRequest.ColumnsToGet = &tablestore.ColumnsToGet{Columns: []string{"value"}}

				searchResp, err := d.tablestore.Client.Search(searchRequest)
				if err != nil {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=UserTopicTableStoreDao\terror=%v", context.RecommendId, err))
					return
				}
				for _, row := range searchResp.Rows {
					item := NewItem(row.PrimaryKey.PrimaryKeys[1].Value.(string))
					item.ItemType = d.itemType
					item.RetrieveId = d.recallName
					if len(row.Columns) == 1 {
						item.Score = utils.ToFloat(row.Columns[0].Value, 0)
					}
					result = append(result, item)
				}

				ret = append(ret, result...)
			}(topic)

		}
	}

	wg.Wait()
	return
}