func()

in dao/feature_view_featuredb_dao.go [1154:1275]


func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int, error) {
	snapshotId, _, err := d.createSnapshot()
	if err != nil {
		return nil, 0, err
	}
	var program *vm.Program
	if filterExpr != "" {
		program, err = expr.Compile(filterExpr)
		if err != nil {
			return nil, 0, err
		}
	}

	alloc := memory.NewGoAllocator()
	req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots/%s/scan",
		d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
	if err != nil {
		return nil, 0, err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", d.featureDBClient.Token)
	req.Header.Set("Auth", d.signature)
	response, err := d.featureDBClient.Client.Do(req)
	if err != nil {
		return nil, 0, err
	}
	defer response.Body.Close()
	if response.StatusCode != http.StatusOK {
		body, _ := io.ReadAll(response.Body)
		return nil, 0, fmt.Errorf("status code: %d, response body: %s", response.StatusCode, string(body))
	}

	// Arrow IPC reader
	reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))

	readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
		properties := make(map[string]interface{})

		for _, field := range d.fields {
			var isNull uint8
			if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
				if err == io.EOF {
					break
				}
				return nil, err
			}
			if isNull == 1 {
				// 跳过空值
				continue
			}

			switch d.fieldTypeMap[field] {
			case constants.FS_INT32:
				var int32Value int32
				binary.Read(innerReader, binary.LittleEndian, &int32Value)
				properties[field] = int32Value
			case constants.FS_INT64:
				var int64Value int64
				binary.Read(innerReader, binary.LittleEndian, &int64Value)
				properties[field] = int64Value
			case constants.FS_FLOAT:
				var float32Value float32
				binary.Read(innerReader, binary.LittleEndian, &float32Value)
				properties[field] = float32Value
			case constants.FS_DOUBLE:
				var float64Value float64
				binary.Read(innerReader, binary.LittleEndian, &float64Value)
				properties[field] = float64Value
			case constants.FS_STRING:
				var length uint32
				binary.Read(innerReader, binary.LittleEndian, &length)
				strBytes := make([]byte, length)
				binary.Read(innerReader, binary.LittleEndian, &strBytes)
				properties[field] = string(strBytes)
			case constants.FS_BOOLEAN:
				var boolValue bool
				binary.Read(innerReader, binary.LittleEndian, &boolValue)
				properties[field] = boolValue
			default:
				var length uint32
				binary.Read(innerReader, binary.LittleEndian, &length)
				strBytes := make([]byte, length)
				binary.Read(innerReader, binary.LittleEndian, &strBytes)
				properties[field] = string(strBytes)
			}
		}
		return properties, nil
	}
	ids := make([]string, 0, 1024)
	innerReader := readerPool.Get().(*bytes.Reader)
	defer readerPool.Put(innerReader)
	for reader.Next() {
		record := reader.Record()
		for i := 0; i < int(record.NumRows()); i++ {
			if filterExpr == "" {
				ids = append(ids, record.Column(0).(*array.String).Value(i))
			} else {
				dataBytes := record.Column(1).(*array.Binary).Value(i)
				if len(dataBytes) < 2 {
					continue
				}
				innerReader.Reset(dataBytes)

				// 读取版本号
				var protocalVersion, ifNullFlagVersion uint8
				binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
				binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
				properties, err := readFeatureDBFunc_F_1(innerReader)
				if err != nil {
					return nil, 0, err
				}
				if ret, err := expr.Run(program, properties); err != nil {
					return nil, 0, err
				} else if r, ok := ret.(bool); ok && r {
					ids = append(ids, record.Column(0).(*array.String).Value(i))
				}
			}
		}
		record.Release()
	}
	return ids, len(ids), nil
}