func()

in module/feature_tablestore_dao.go [340:461]


func (d *FeatureTablestoreDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) {
	fk := d.featureKey
	if fk != "item:id" {
		comms := strings.Split(d.featureKey, ":")
		if len(comms) < 2 {
			log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey))
			return
		}

		fk = comms[1]
	}

	cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(100))), 1)
	maps := make(map[int][]*Item)
	for i, item := range items {
		maps[i%cpuCount] = append(maps[i%cpuCount], item)
	}

	requestCh := make(chan []*Item, cpuCount)
	defer close(requestCh)

	for _, itemlist := range maps {
		requestCh <- itemlist
	}

	var wg sync.WaitGroup
	for i := 0; i < cpuCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			select {
			case itemlist := <-requestCh:
				// var keys []interface{}
				key2Item := make(map[string]*Item, len(itemlist))
				batchGetReq := &tablestore.BatchGetRowRequest{}
				mqCriteria := &tablestore.MultiRowQueryCriteria{}
				for _, item := range itemlist {
					var key string
					if fk == "item:id" {
						key = string(item.Id)
					} else {
						key = item.StringProperty(fk)
					}
					if d.cache != nil {
						if cacheValue, ok := d.cache.GetIfPresent(key); ok {
							item.AddProperties(cacheValue.(map[string]interface{}))
							if context.Debug {
								item.AddProperty("__debug_cache_hit__", true)
							}
							continue
						}
					}
					key2Item[key] = item

					pkToGet := new(tablestore.PrimaryKey)
					pkToGet.AddPrimaryKeyColumn(d.itemFeatureKeyName, key)
					mqCriteria.AddRow(pkToGet)
				}
				if len(key2Item) == 0 {
					return
				}
				mqCriteria.MaxVersion = 1
				if d.itemSelectFields != "" {
					mqCriteria.ColumnsToGet = strings.Split(d.itemSelectFields, ",")
				}
				mqCriteria.TableName = d.table
				batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
				batchGetResponse, err := d.tablestore.Client.BatchGetRow(batchGetReq)

				if err != nil {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\terror=%v", context.RecommendId, err))
					return
				}

				rowsResult, ok := batchGetResponse.TableToRowsResult[d.table]
				if !ok {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\ttable row result empty", context.RecommendId))
					return
				}

				for _, row := range rowsResult {
					if !row.IsSucceed {
						continue
					}
					var key string
					if len(row.PrimaryKey.PrimaryKeys) > 0 {
						pkColumn := row.PrimaryKey.PrimaryKeys[0]
						key = pkColumn.Value.(string)
					}
					if key == "" {
						continue
					}
					item := key2Item[key]
					properties := make(map[string]interface{}, len(row.Columns))

					for _, column := range row.Columns {
						name := column.ColumnName
						switch val := column.Value.(type) {
						case string:
							properties[name] = val
						case int:
							properties[name] = val
						case float64:
							properties[name] = val
						default:
							properties[name] = val
						}

					}

					item.AddProperties(properties)
					if d.cache != nil {
						d.cache.Put(key, properties)
					}
				}

			default:
			}
		}()
	}
	wg.Wait()
}