in dao/feature_view_featuredb_dao.go [76:663]
func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
result := make([]map[string]interface{}, 0, len(keys))
selectFieldsSet := make(map[string]struct{})
for _, selectField := range selectFields {
selectFieldsSet[selectField] = struct{}{}
}
var wg sync.WaitGroup
var mu sync.Mutex
const groupSize = 200
if d.signature == "" {
return result, errors.New("FeatureStore DB username and password are not entered, please enter them by adding client.LoginFeatureStoreDB(username, password)")
}
if d.featureDBClient.GetCurrentAddress(false) == "" || d.featureDBClient.Token == "" {
return result, errors.New("FeatureDB datasource has not been created")
}
errChan := make(chan error, len(keys)/groupSize+1)
for i := 0; i < len(keys); i += groupSize {
end := i + groupSize
if end > len(keys) {
end = len(keys)
}
ks := keys[i:end]
wg.Add(1)
go func(ks []interface{}) {
defer wg.Done()
var pkeys []string
for _, k := range ks {
pkeys = append(pkeys, utils.ToString(k, ""))
}
body, _ := json.Marshal(map[string]any{"keys": pkeys})
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, len(pkeys))
requestBody := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(requestBody)
requestBody.Reset(body)
req, err := http.NewRequest("POST", url, requestBody)
if err != nil {
errChan <- err
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table, len(pkeys))
req, err = http.NewRequest("POST", url, requestBody)
if err != nil {
errChan <- err
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return
}
reader := bufio.NewReader(response.Body)
keyStartIdx := 0
innerResult := make([]map[string]interface{}, 0, len(ks))
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return
}
recordBlock := fdbserverfb.GetRootAsRecordBlock(buf, 0)
for i := 0; i < recordBlock.ValuesLength(); i++ {
value := new(fdbserverfb.UInt8ValueColumn)
recordBlock.Values(value, i)
dataBytes := value.ValueBytes()
// key 不存在
if len(dataBytes) < 2 {
// fmt.Println("key ", ks[keyStartIdx+i], " not exists")
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
if _, exists := selectFieldsSet[field]; exists {
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_BOOLEAN:
var booleanValue bool
binary.Read(innerReader, binary.LittleEndian, &booleanValue)
properties[field] = booleanValue
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_ARRAY_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayInt32Value := make([]int32, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayInt32Value)
}
properties[field] = arrayInt32Value
case constants.FS_ARRAY_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayInt64Value := make([]int64, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayInt64Value)
}
properties[field] = arrayInt64Value
case constants.FS_ARRAY_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayFloat32Value := make([]float32, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayFloat32Value)
}
properties[field] = arrayFloat32Value
case constants.FS_ARRAY_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayFloat64Value := make([]float64, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayFloat64Value)
}
properties[field] = arrayFloat64Value
case constants.FS_ARRAY_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayStringValue := d.decodeStringArray(innerReader, length)
properties[field] = arrayStringValue
case constants.FS_ARRAY_ARRAY_FLOAT:
var outerLength uint32
binary.Read(innerReader, binary.LittleEndian, &outerLength)
arrayOfArrayFloatValue := make([][]float32, outerLength)
if outerLength > 0 {
var totalElements uint32
binary.Read(innerReader, binary.LittleEndian, &totalElements)
if totalElements == 0 {
for outerIdx := range arrayOfArrayFloatValue {
arrayOfArrayFloatValue[outerIdx] = []float32{}
}
} else {
innerArrayLens := make([]uint32, outerLength)
binary.Read(innerReader, binary.LittleEndian, &innerArrayLens)
innerValidElements := make([]float32, totalElements)
binary.Read(innerReader, binary.LittleEndian, &innerValidElements)
innerIndex := 0
for outerIdx, innerLength := range innerArrayLens {
arrayOfArrayFloatValue[outerIdx] = innerValidElements[innerIndex : innerIndex+int(innerLength)]
innerIndex += int(innerLength)
}
}
}
properties[field] = arrayOfArrayFloatValue
case constants.FS_MAP_INT32_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32Int32Value := make(map[int32]int32, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32Int32Value[key] = values[idx]
}
}
properties[field] = mapInt32Int32Value
case constants.FS_MAP_INT32_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32Int64Value := make(map[int32]int64, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32Int64Value[key] = values[idx]
}
}
properties[field] = mapInt32Int64Value
case constants.FS_MAP_INT32_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32FloatValue := make(map[int32]float32, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32FloatValue[key] = values[idx]
}
}
properties[field] = mapInt32FloatValue
case constants.FS_MAP_INT32_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32DoubleValue := make(map[int32]float64, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32DoubleValue[key] = values[idx]
}
}
properties[field] = mapInt32DoubleValue
case constants.FS_MAP_INT32_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32StringValue := make(map[int32]string, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapInt32StringValue[key] = values[idx]
}
}
properties[field] = mapInt32StringValue
case constants.FS_MAP_INT64_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64Int32Value := make(map[int64]int32, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64Int32Value[key] = values[idx]
}
}
properties[field] = mapInt64Int32Value
case constants.FS_MAP_INT64_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64Int64Value := make(map[int64]int64, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64Int64Value[key] = values[idx]
}
}
properties[field] = mapInt64Int64Value
case constants.FS_MAP_INT64_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64FloatValue := make(map[int64]float32, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64FloatValue[key] = values[idx]
}
}
properties[field] = mapInt64FloatValue
case constants.FS_MAP_INT64_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64DoubleValue := make(map[int64]float64, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64DoubleValue[key] = values[idx]
}
}
properties[field] = mapInt64DoubleValue
case constants.FS_MAP_INT64_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64StringValue := make(map[int64]string, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapInt64StringValue[key] = values[idx]
}
}
properties[field] = mapInt64StringValue
case constants.FS_MAP_STRING_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringInt32Value := make(map[string]int32, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringInt32Value[key] = values[idx]
}
}
properties[field] = mapStringInt32Value
case constants.FS_MAP_STRING_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringInt64Value := make(map[string]int64, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringInt64Value[key] = values[idx]
}
}
properties[field] = mapStringInt64Value
case constants.FS_MAP_STRING_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringFloatValue := make(map[string]float32, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringFloatValue[key] = values[idx]
}
}
properties[field] = mapStringFloatValue
case constants.FS_MAP_STRING_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringDoubleValue := make(map[string]float64, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringDoubleValue[key] = values[idx]
}
}
properties[field] = mapStringDoubleValue
case constants.FS_MAP_STRING_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringStringValue := make(map[string]string, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapStringStringValue[key] = values[idx]
}
}
properties[field] = mapStringStringValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
} else {
var skipBytes int = 0
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE:
skipBytes = 8
case constants.FS_FLOAT:
skipBytes = 4
case constants.FS_INT64:
skipBytes = 8
case constants.FS_INT32:
skipBytes = 4
case constants.FS_BOOLEAN:
skipBytes = 1
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
case constants.FS_ARRAY_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 4
case constants.FS_ARRAY_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 8
case constants.FS_ARRAY_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 4
case constants.FS_ARRAY_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 8
case constants.FS_ARRAY_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_ARRAY_ARRAY_FLOAT:
var outerLength uint32
binary.Read(innerReader, binary.LittleEndian, &outerLength)
if outerLength > 0 {
var totalElements uint32
binary.Read(innerReader, binary.LittleEndian, &totalElements)
if totalElements > 0 {
skipBytes = int(outerLength)*4 + int(totalElements)*4
}
}
case constants.FS_MAP_INT32_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 4)
case constants.FS_MAP_INT32_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 8)
case constants.FS_MAP_INT32_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 4)
case constants.FS_MAP_INT32_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 8)
case constants.FS_MAP_INT32_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
innerReader.Seek(int64(length*4), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_MAP_INT64_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 4)
case constants.FS_MAP_INT64_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 8)
case constants.FS_MAP_INT64_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 4)
case constants.FS_MAP_INT64_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 8)
case constants.FS_MAP_INT64_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
innerReader.Seek(int64(length*8), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_MAP_STRING_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*4
case constants.FS_MAP_STRING_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*8
case constants.FS_MAP_STRING_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*4
case constants.FS_MAP_STRING_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*8
case constants.FS_MAP_STRING_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
keyLen := d.getStringArrayCharLen(innerReader, length)
innerReader.Seek(int64(keyLen), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
}
if skipBytes > 0 {
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
return nil, err
}
}
}
}
properties[d.primaryKeyField] = ks[keyStartIdx+i]
return properties, nil
}
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
readResult, err := readFeatureDBFunc_F_1()
if err != nil {
errChan <- err
return
}
innerResult = append(innerResult, readResult)
} else {
errChan <- fmt.Errorf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
fmt.Printf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
return
}
}
keyStartIdx += recordBlock.ValuesLength()
}
mu.Lock()
result = append(result, innerResult...)
mu.Unlock()
}(ks)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return nil, err
}
}
return result, nil
}