func()

in dao/feature_view_tablestore_dao.go [53:126]


func (d *FeatureViewTableStoreDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
	result := make([]map[string]interface{}, 0, len(keys))
	var wg sync.WaitGroup
	var mu sync.Mutex

	for i := 0; i < len(keys); i += 100 {
		end := i + 100
		if end > len(keys) {
			end = len(keys)
		}
		ks := keys[i:end]
		wg.Add(1)
		go func(ks []interface{}) {
			defer wg.Done()
			batchGetReq := &tablestore.BatchGetRowRequest{}
			mqCriteria := &tablestore.MultiRowQueryCriteria{}

			for _, key := range ks {
				pkToGet := new(tablestore.PrimaryKey)
				if d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT64 || d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT32 {
					if v, ok := key.(int64); ok {
						pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, v)
					} else {
						s := fmt.Sprintf("%v", key)
						i, _ := strconv.ParseInt(s, 10, 64)
						pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, i)
					}
				} else if d.fieldTypeMap[d.primaryKeyField] == constants.FS_STRING {
					pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, key)
				} else {
					log.Println(errors.New("primary key type is not supported by TableStore"))
					return
				}
				mqCriteria.AddRow(pkToGet)
				mqCriteria.MaxVersion = 1
				mqCriteria.ColumnsToGet = selectFields
			}

			mqCriteria.TableName = d.table
			batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
			batchGetResponse, err := d.tablestoreClient.BatchGetRow(batchGetReq)

			if err != nil {
				log.Println(err)
				return
			}

			for _, rowResults := range batchGetResponse.TableToRowsResult {
				for _, rowResult := range rowResults {
					if rowResult.Error.Message != "" {
						log.Println(errors.New(rowResult.Error.Message))
						return
					}
					if rowResult.PrimaryKey.PrimaryKeys == nil {
						continue
					}
					newMap := make(map[string]interface{})
					for _, pkValue := range rowResult.PrimaryKey.PrimaryKeys {
						newMap[pkValue.ColumnName] = pkValue.Value
					}
					for _, rowValue := range rowResult.Columns {
						newMap[rowValue.ColumnName] = rowValue.Value
					}
					mu.Lock()
					result = append(result, newMap)
					mu.Unlock()
				}
			}
		}(ks)
	}
	wg.Wait()

	return result, nil
}