in module/feature_tablestore_dao.go [340:461]
func (d *FeatureTablestoreDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) {
fk := d.featureKey
if fk != "item:id" {
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey))
return
}
fk = comms[1]
}
cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(100))), 1)
maps := make(map[int][]*Item)
for i, item := range items {
maps[i%cpuCount] = append(maps[i%cpuCount], item)
}
requestCh := make(chan []*Item, cpuCount)
defer close(requestCh)
for _, itemlist := range maps {
requestCh <- itemlist
}
var wg sync.WaitGroup
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case itemlist := <-requestCh:
// var keys []interface{}
key2Item := make(map[string]*Item, len(itemlist))
batchGetReq := &tablestore.BatchGetRowRequest{}
mqCriteria := &tablestore.MultiRowQueryCriteria{}
for _, item := range itemlist {
var key string
if fk == "item:id" {
key = string(item.Id)
} else {
key = item.StringProperty(fk)
}
if d.cache != nil {
if cacheValue, ok := d.cache.GetIfPresent(key); ok {
item.AddProperties(cacheValue.(map[string]interface{}))
if context.Debug {
item.AddProperty("__debug_cache_hit__", true)
}
continue
}
}
key2Item[key] = item
pkToGet := new(tablestore.PrimaryKey)
pkToGet.AddPrimaryKeyColumn(d.itemFeatureKeyName, key)
mqCriteria.AddRow(pkToGet)
}
if len(key2Item) == 0 {
return
}
mqCriteria.MaxVersion = 1
if d.itemSelectFields != "" {
mqCriteria.ColumnsToGet = strings.Split(d.itemSelectFields, ",")
}
mqCriteria.TableName = d.table
batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
batchGetResponse, err := d.tablestore.Client.BatchGetRow(batchGetReq)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\terror=%v", context.RecommendId, err))
return
}
rowsResult, ok := batchGetResponse.TableToRowsResult[d.table]
if !ok {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\ttable row result empty", context.RecommendId))
return
}
for _, row := range rowsResult {
if !row.IsSucceed {
continue
}
var key string
if len(row.PrimaryKey.PrimaryKeys) > 0 {
pkColumn := row.PrimaryKey.PrimaryKeys[0]
key = pkColumn.Value.(string)
}
if key == "" {
continue
}
item := key2Item[key]
properties := make(map[string]interface{}, len(row.Columns))
for _, column := range row.Columns {
name := column.ColumnName
switch val := column.Value.(type) {
case string:
properties[name] = val
case int:
properties[name] = val
case float64:
properties[name] = val
default:
properties[name] = val
}
}
item.AddProperties(properties)
if d.cache != nil {
d.cache.Put(key, properties)
}
}
default:
}
}()
}
wg.Wait()
}