func()

in dao/feature_view_featuredb_dao.go [1312:1450]


func (d *FeatureViewFeatureDBDao) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) {
	_, ts, err := d.createSnapshot()
	if err != nil {
		return nil, err
	}
	var program *vm.Program
	if filter != "" {
		program, err = expr.Compile(filter)
		if err != nil {
			return nil, err
		}
	}

	ids, _, err := d.RowCountIds(filter)

	if err != nil {
		return nil, err
	}
	readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
		properties := make(map[string]interface{})

		for _, field := range d.fields {
			var isNull uint8
			if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
				if err == io.EOF {
					break
				}
				return nil, err
			}
			if isNull == 1 {
				// 跳过空值
				continue
			}

			switch d.fieldTypeMap[field] {
			case constants.FS_INT32:
				var int32Value int32
				binary.Read(innerReader, binary.LittleEndian, &int32Value)
				properties[field] = int32Value
			case constants.FS_INT64:
				var int64Value int64
				binary.Read(innerReader, binary.LittleEndian, &int64Value)
				properties[field] = int64Value
			case constants.FS_FLOAT:
				var float32Value float32
				binary.Read(innerReader, binary.LittleEndian, &float32Value)
				properties[field] = float32Value
			case constants.FS_DOUBLE:
				var float64Value float64
				binary.Read(innerReader, binary.LittleEndian, &float64Value)
				properties[field] = float64Value
			case constants.FS_STRING:
				var length uint32
				binary.Read(innerReader, binary.LittleEndian, &length)
				strBytes := make([]byte, length)
				binary.Read(innerReader, binary.LittleEndian, &strBytes)
				properties[field] = string(strBytes)
			case constants.FS_BOOLEAN:
				var boolValue bool
				binary.Read(innerReader, binary.LittleEndian, &boolValue)
				properties[field] = boolValue
			default:
				var length uint32
				binary.Read(innerReader, binary.LittleEndian, &length)
				strBytes := make([]byte, length)
				binary.Read(innerReader, binary.LittleEndian, &strBytes)
				properties[field] = string(strBytes)
			}
		}
		return properties, nil
	}
	if ch != nil {
		go func() {
			alloc := memory.NewGoAllocator()
			for {
				time.Sleep(time.Second * 5)
				req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/iterate_get_kv?ts=%d",
					d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, ts), bytes.NewReader(nil))
				if err != nil {
					continue
				}
				req.Header.Set("Content-Type", "application/json")
				req.Header.Set("Authorization", d.featureDBClient.Token)
				req.Header.Set("Auth", d.signature)
				response, err := d.featureDBClient.Client.Do(req)
				if err != nil {
					continue
				}
				if response.StatusCode != http.StatusOK {
					continue
				}
				_ts := utils.ToInt64(response.Header.Get("Next-Ts"), 0)
				if _ts == 0 {
					continue
				}
				ts = _ts
				reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))

				innerReader := readerPool.Get().(*bytes.Reader)
				for reader.Next() {
					record := reader.Record()
					for i := 0; i < int(record.NumRows()); i++ {
						if filter == "" {
							ch <- record.Column(0).(*array.String).Value(i)
						} else {
							dataBytes := record.Column(1).(*array.Binary).Value(i)
							if len(dataBytes) < 2 {
								continue
							}
							innerReader.Reset(dataBytes)

							// 读取版本号
							var protocalVersion, ifNullFlagVersion uint8
							binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
							binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
							properties, err := readFeatureDBFunc_F_1(innerReader)
							if err != nil {
								continue
							}
							if ret, err := expr.Run(program, properties); err != nil {
								continue
							} else if r, ok := ret.(bool); ok && r {
								ch <- record.Column(0).(*array.String).Value(i)
							}
						}
					}

					record.Release()
				}
				readerPool.Put(innerReader)
				response.Body.Close()

			}

		}()
	}

	return ids, nil
}