in dao/feature_view_tablestore_dao.go [53:126]
func (d *FeatureViewTableStoreDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
result := make([]map[string]interface{}, 0, len(keys))
var wg sync.WaitGroup
var mu sync.Mutex
for i := 0; i < len(keys); i += 100 {
end := i + 100
if end > len(keys) {
end = len(keys)
}
ks := keys[i:end]
wg.Add(1)
go func(ks []interface{}) {
defer wg.Done()
batchGetReq := &tablestore.BatchGetRowRequest{}
mqCriteria := &tablestore.MultiRowQueryCriteria{}
for _, key := range ks {
pkToGet := new(tablestore.PrimaryKey)
if d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT64 || d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT32 {
if v, ok := key.(int64); ok {
pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, v)
} else {
s := fmt.Sprintf("%v", key)
i, _ := strconv.ParseInt(s, 10, 64)
pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, i)
}
} else if d.fieldTypeMap[d.primaryKeyField] == constants.FS_STRING {
pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, key)
} else {
log.Println(errors.New("primary key type is not supported by TableStore"))
return
}
mqCriteria.AddRow(pkToGet)
mqCriteria.MaxVersion = 1
mqCriteria.ColumnsToGet = selectFields
}
mqCriteria.TableName = d.table
batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
batchGetResponse, err := d.tablestoreClient.BatchGetRow(batchGetReq)
if err != nil {
log.Println(err)
return
}
for _, rowResults := range batchGetResponse.TableToRowsResult {
for _, rowResult := range rowResults {
if rowResult.Error.Message != "" {
log.Println(errors.New(rowResult.Error.Message))
return
}
if rowResult.PrimaryKey.PrimaryKeys == nil {
continue
}
newMap := make(map[string]interface{})
for _, pkValue := range rowResult.PrimaryKey.PrimaryKeys {
newMap[pkValue.ColumnName] = pkValue.Value
}
for _, rowValue := range rowResult.Columns {
newMap[rowValue.ColumnName] = rowValue.Value
}
mu.Lock()
result = append(result, newMap)
mu.Unlock()
}
}
}(ks)
}
wg.Wait()
return result, nil
}