in dao/feature_view_featuredb_dao.go [1154:1275]
func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int, error) {
snapshotId, _, err := d.createSnapshot()
if err != nil {
return nil, 0, err
}
var program *vm.Program
if filterExpr != "" {
program, err = expr.Compile(filterExpr)
if err != nil {
return nil, 0, err
}
}
alloc := memory.NewGoAllocator()
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots/%s/scan",
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
if err != nil {
return nil, 0, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
return nil, 0, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
return nil, 0, fmt.Errorf("status code: %d, response body: %s", response.StatusCode, string(body))
}
// Arrow IPC reader
reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))
readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
}
return properties, nil
}
ids := make([]string, 0, 1024)
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for reader.Next() {
record := reader.Record()
for i := 0; i < int(record.NumRows()); i++ {
if filterExpr == "" {
ids = append(ids, record.Column(0).(*array.String).Value(i))
} else {
dataBytes := record.Column(1).(*array.Binary).Value(i)
if len(dataBytes) < 2 {
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
properties, err := readFeatureDBFunc_F_1(innerReader)
if err != nil {
return nil, 0, err
}
if ret, err := expr.Run(program, properties); err != nil {
return nil, 0, err
} else if r, ok := ret.(bool); ok && r {
ids = append(ids, record.Column(0).(*array.String).Value(i))
}
}
}
record.Release()
}
return ids, len(ids), nil
}