in dao/feature_view_featuredb_dao.go [1312:1450]
func (d *FeatureViewFeatureDBDao) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) {
_, ts, err := d.createSnapshot()
if err != nil {
return nil, err
}
var program *vm.Program
if filter != "" {
program, err = expr.Compile(filter)
if err != nil {
return nil, err
}
}
ids, _, err := d.RowCountIds(filter)
if err != nil {
return nil, err
}
readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
}
return properties, nil
}
if ch != nil {
go func() {
alloc := memory.NewGoAllocator()
for {
time.Sleep(time.Second * 5)
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/iterate_get_kv?ts=%d",
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, ts), bytes.NewReader(nil))
if err != nil {
continue
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
continue
}
if response.StatusCode != http.StatusOK {
continue
}
_ts := utils.ToInt64(response.Header.Get("Next-Ts"), 0)
if _ts == 0 {
continue
}
ts = _ts
reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))
innerReader := readerPool.Get().(*bytes.Reader)
for reader.Next() {
record := reader.Record()
for i := 0; i < int(record.NumRows()); i++ {
if filter == "" {
ch <- record.Column(0).(*array.String).Value(i)
} else {
dataBytes := record.Column(1).(*array.Binary).Value(i)
if len(dataBytes) < 2 {
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
properties, err := readFeatureDBFunc_F_1(innerReader)
if err != nil {
continue
}
if ret, err := expr.Run(program, properties); err != nil {
continue
} else if r, ok := ret.(bool); ok && r {
ch <- record.Column(0).(*array.String).Value(i)
}
}
}
record.Release()
}
readerPool.Put(innerReader)
response.Body.Close()
}
}()
}
return ids, nil
}