dao/feature_view_featuredb_dao.go (1,334 lines of code) (raw):
package dao
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/array"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/ipc"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/memory"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/datasource/featuredb"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/datasource/featuredb/fdbserverfb"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
)
const (
FeatureDB_Protocal_Version_F = byte('F')
FeatureDB_IfNull_Flag_Version_1 = byte('1')
)
var readerPool sync.Pool
func init() {
readerPool = sync.Pool{
New: func() interface{} {
return bytes.NewReader(nil)
},
}
}
type FeatureViewFeatureDBDao struct {
UnimplementedFeatureViewDao
featureDBClient *featuredb.FeatureDBClient
database string
schema string
table string
fieldTypeMap map[string]constants.FSType
fields []string
signature string
primaryKeyField string
}
func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
dao := FeatureViewFeatureDBDao{
database: config.FeatureDBDatabaseName,
schema: config.FeatureDBSchemaName,
table: config.FeatureDBTableName,
fieldTypeMap: config.FieldTypeMap,
signature: config.FeatureDBSignature,
primaryKeyField: config.PrimaryKeyField,
fields: config.Fields,
}
client, err := featuredb.GetFeatureDBClient()
if err != nil {
return nil
}
dao.featureDBClient = client
return &dao
}
func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
result := make([]map[string]interface{}, 0, len(keys))
selectFieldsSet := make(map[string]struct{})
for _, selectField := range selectFields {
selectFieldsSet[selectField] = struct{}{}
}
var wg sync.WaitGroup
var mu sync.Mutex
const groupSize = 200
if d.signature == "" {
return result, errors.New("FeatureStore DB username and password are not entered, please enter them by adding client.LoginFeatureStoreDB(username, password)")
}
if d.featureDBClient.GetCurrentAddress(false) == "" || d.featureDBClient.Token == "" {
return result, errors.New("FeatureDB datasource has not been created")
}
errChan := make(chan error, len(keys)/groupSize+1)
for i := 0; i < len(keys); i += groupSize {
end := i + groupSize
if end > len(keys) {
end = len(keys)
}
ks := keys[i:end]
wg.Add(1)
go func(ks []interface{}) {
defer wg.Done()
var pkeys []string
for _, k := range ks {
pkeys = append(pkeys, utils.ToString(k, ""))
}
body, _ := json.Marshal(map[string]any{"keys": pkeys})
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, len(pkeys))
requestBody := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(requestBody)
requestBody.Reset(body)
req, err := http.NewRequest("POST", url, requestBody)
if err != nil {
errChan <- err
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table, len(pkeys))
req, err = http.NewRequest("POST", url, requestBody)
if err != nil {
errChan <- err
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return
}
reader := bufio.NewReader(response.Body)
keyStartIdx := 0
innerResult := make([]map[string]interface{}, 0, len(ks))
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return
}
recordBlock := fdbserverfb.GetRootAsRecordBlock(buf, 0)
for i := 0; i < recordBlock.ValuesLength(); i++ {
value := new(fdbserverfb.UInt8ValueColumn)
recordBlock.Values(value, i)
dataBytes := value.ValueBytes()
// key 不存在
if len(dataBytes) < 2 {
// fmt.Println("key ", ks[keyStartIdx+i], " not exists")
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
if _, exists := selectFieldsSet[field]; exists {
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_BOOLEAN:
var booleanValue bool
binary.Read(innerReader, binary.LittleEndian, &booleanValue)
properties[field] = booleanValue
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_ARRAY_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayInt32Value := make([]int32, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayInt32Value)
}
properties[field] = arrayInt32Value
case constants.FS_ARRAY_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayInt64Value := make([]int64, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayInt64Value)
}
properties[field] = arrayInt64Value
case constants.FS_ARRAY_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayFloat32Value := make([]float32, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayFloat32Value)
}
properties[field] = arrayFloat32Value
case constants.FS_ARRAY_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayFloat64Value := make([]float64, length)
if length > 0 {
binary.Read(innerReader, binary.LittleEndian, &arrayFloat64Value)
}
properties[field] = arrayFloat64Value
case constants.FS_ARRAY_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
arrayStringValue := d.decodeStringArray(innerReader, length)
properties[field] = arrayStringValue
case constants.FS_ARRAY_ARRAY_FLOAT:
var outerLength uint32
binary.Read(innerReader, binary.LittleEndian, &outerLength)
arrayOfArrayFloatValue := make([][]float32, outerLength)
if outerLength > 0 {
var totalElements uint32
binary.Read(innerReader, binary.LittleEndian, &totalElements)
if totalElements == 0 {
for outerIdx := range arrayOfArrayFloatValue {
arrayOfArrayFloatValue[outerIdx] = []float32{}
}
} else {
innerArrayLens := make([]uint32, outerLength)
binary.Read(innerReader, binary.LittleEndian, &innerArrayLens)
innerValidElements := make([]float32, totalElements)
binary.Read(innerReader, binary.LittleEndian, &innerValidElements)
innerIndex := 0
for outerIdx, innerLength := range innerArrayLens {
arrayOfArrayFloatValue[outerIdx] = innerValidElements[innerIndex : innerIndex+int(innerLength)]
innerIndex += int(innerLength)
}
}
}
properties[field] = arrayOfArrayFloatValue
case constants.FS_MAP_INT32_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32Int32Value := make(map[int32]int32, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32Int32Value[key] = values[idx]
}
}
properties[field] = mapInt32Int32Value
case constants.FS_MAP_INT32_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32Int64Value := make(map[int32]int64, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32Int64Value[key] = values[idx]
}
}
properties[field] = mapInt32Int64Value
case constants.FS_MAP_INT32_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32FloatValue := make(map[int32]float32, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32FloatValue[key] = values[idx]
}
}
properties[field] = mapInt32FloatValue
case constants.FS_MAP_INT32_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32DoubleValue := make(map[int32]float64, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt32DoubleValue[key] = values[idx]
}
}
properties[field] = mapInt32DoubleValue
case constants.FS_MAP_INT32_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt32StringValue := make(map[int32]string, length)
if length > 0 {
keys := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapInt32StringValue[key] = values[idx]
}
}
properties[field] = mapInt32StringValue
case constants.FS_MAP_INT64_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64Int32Value := make(map[int64]int32, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64Int32Value[key] = values[idx]
}
}
properties[field] = mapInt64Int32Value
case constants.FS_MAP_INT64_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64Int64Value := make(map[int64]int64, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64Int64Value[key] = values[idx]
}
}
properties[field] = mapInt64Int64Value
case constants.FS_MAP_INT64_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64FloatValue := make(map[int64]float32, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64FloatValue[key] = values[idx]
}
}
properties[field] = mapInt64FloatValue
case constants.FS_MAP_INT64_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64DoubleValue := make(map[int64]float64, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapInt64DoubleValue[key] = values[idx]
}
}
properties[field] = mapInt64DoubleValue
case constants.FS_MAP_INT64_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapInt64StringValue := make(map[int64]string, length)
if length > 0 {
keys := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &keys)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapInt64StringValue[key] = values[idx]
}
}
properties[field] = mapInt64StringValue
case constants.FS_MAP_STRING_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringInt32Value := make(map[string]int32, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]int32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringInt32Value[key] = values[idx]
}
}
properties[field] = mapStringInt32Value
case constants.FS_MAP_STRING_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringInt64Value := make(map[string]int64, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]int64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringInt64Value[key] = values[idx]
}
}
properties[field] = mapStringInt64Value
case constants.FS_MAP_STRING_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringFloatValue := make(map[string]float32, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]float32, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringFloatValue[key] = values[idx]
}
}
properties[field] = mapStringFloatValue
case constants.FS_MAP_STRING_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringDoubleValue := make(map[string]float64, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := make([]float64, length)
binary.Read(innerReader, binary.LittleEndian, &values)
for idx, key := range keys {
mapStringDoubleValue[key] = values[idx]
}
}
properties[field] = mapStringDoubleValue
case constants.FS_MAP_STRING_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
mapStringStringValue := make(map[string]string, length)
if length > 0 {
keys := d.decodeStringArray(innerReader, length)
values := d.decodeStringArray(innerReader, length)
for idx, key := range keys {
mapStringStringValue[key] = values[idx]
}
}
properties[field] = mapStringStringValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
} else {
var skipBytes int = 0
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE:
skipBytes = 8
case constants.FS_FLOAT:
skipBytes = 4
case constants.FS_INT64:
skipBytes = 8
case constants.FS_INT32:
skipBytes = 4
case constants.FS_BOOLEAN:
skipBytes = 1
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
case constants.FS_ARRAY_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 4
case constants.FS_ARRAY_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 8
case constants.FS_ARRAY_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 4
case constants.FS_ARRAY_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * 8
case constants.FS_ARRAY_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_ARRAY_ARRAY_FLOAT:
var outerLength uint32
binary.Read(innerReader, binary.LittleEndian, &outerLength)
if outerLength > 0 {
var totalElements uint32
binary.Read(innerReader, binary.LittleEndian, &totalElements)
if totalElements > 0 {
skipBytes = int(outerLength)*4 + int(totalElements)*4
}
}
case constants.FS_MAP_INT32_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 4)
case constants.FS_MAP_INT32_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 8)
case constants.FS_MAP_INT32_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 4)
case constants.FS_MAP_INT32_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (4 + 8)
case constants.FS_MAP_INT32_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
innerReader.Seek(int64(length*4), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_MAP_INT64_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 4)
case constants.FS_MAP_INT64_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 8)
case constants.FS_MAP_INT64_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 4)
case constants.FS_MAP_INT64_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length) * (8 + 8)
case constants.FS_MAP_INT64_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
innerReader.Seek(int64(length*8), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
case constants.FS_MAP_STRING_INT32:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*4
case constants.FS_MAP_STRING_INT64:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*8
case constants.FS_MAP_STRING_FLOAT:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*4
case constants.FS_MAP_STRING_DOUBLE:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = d.getStringArrayCharLen(innerReader, length) + int(length)*8
case constants.FS_MAP_STRING_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
keyLen := d.getStringArrayCharLen(innerReader, length)
innerReader.Seek(int64(keyLen), io.SeekCurrent)
skipBytes = d.getStringArrayCharLen(innerReader, length)
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
}
if skipBytes > 0 {
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
return nil, err
}
}
}
}
properties[d.primaryKeyField] = ks[keyStartIdx+i]
return properties, nil
}
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
readResult, err := readFeatureDBFunc_F_1()
if err != nil {
errChan <- err
return
}
innerResult = append(innerResult, readResult)
} else {
errChan <- fmt.Errorf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
fmt.Printf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
return
}
}
keyStartIdx += recordBlock.ValuesLength()
}
mu.Lock()
result = append(result, innerResult...)
mu.Unlock()
}(ks)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return nil, err
}
}
return result, nil
}
func (d *FeatureViewFeatureDBDao) decodeStringArray(innerReader *bytes.Reader, length uint32) []string {
arrayStringValue := make([]string, length)
if length > 0 {
offsets := make([]uint32, length+1)
binary.Read(innerReader, binary.LittleEndian, &offsets)
totalLength := offsets[length]
stringData := make([]byte, totalLength)
binary.Read(innerReader, binary.LittleEndian, &stringData)
for strIdx := uint32(0); strIdx < length; strIdx++ {
start := offsets[strIdx]
end := offsets[strIdx+1]
arrayStringValue[strIdx] = string(stringData[start:end])
}
}
return arrayStringValue
}
func (d *FeatureViewFeatureDBDao) getStringArrayCharLen(innerReader *bytes.Reader, length uint32) int {
if length > 0 {
innerReader.Seek(int64(length*4), io.SeekCurrent)
var totalLength uint32
binary.Read(innerReader, binary.LittleEndian, &totalLength)
return int(totalLength)
}
return 0
}
type FeatureDBBatchGetKKVRequest struct {
PKs []string `json:"pks"`
Length int `json:"length"`
WithValue bool `json:"with_value"`
}
func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
errChan := make(chan error, len(keys)*len(onlineConfig))
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
sequences := []*sequenceInfo{}
events := strings.Split(seqEvent, "|")
pks := []string{}
for _, event := range events {
pks = append(pks, fmt.Sprintf("%v\u001D%s", key, event))
}
request := FeatureDBBatchGetKKVRequest{
PKs: pks,
Length: seqLen,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return nil
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return nil
}
reader := bufio.NewReader(response.Body)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return nil
}
kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)
for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
kkv := new(fdbserverfb.KKVData)
kkvRecordBlock.Values(kkv, i)
pk := string(kkv.Pk())
userIdEvent := strings.Split(pk, "\u001D")
if len(userIdEvent) != 2 {
continue
}
var itemId string
if sequenceConfig.DeduplicationMethodNum == 1 {
itemId = string(kkv.Sk())
} else if sequenceConfig.DeduplicationMethodNum == 2 {
sk := string(kkv.Sk())
itemIdTimestamp := strings.Split(sk, "\u001D")
if len(itemIdTimestamp) != 2 {
continue
}
itemId = itemIdTimestamp[0]
} else {
continue
}
seq := new(sequenceInfo)
seq.event = userIdEvent[1]
seq.itemId = itemId
seq.timestamp = kkv.EventTimestamp()
seq.playTime = kkv.PlayTime()
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
sequences = append(sequences, seq)
}
}
return sequences
}
results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key interface{}) {
defer wg.Done()
properties := make(map[string]interface{})
var mu sync.Mutex
var eventWg sync.WaitGroup
for _, seqConfig := range onlineConfig {
eventWg.Add(1)
go func(seqConfig *api.SeqConfig) {
defer eventWg.Done()
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
// FeatureDB has processed the integration of online sequence features and offline sequence features
// Here we put the results into onlineSequences
if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
onlineSequences = onlineresult
}
subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
mu.Lock()
defer mu.Unlock()
for k, value := range subproperties {
properties[k] = value
}
}(seqConfig)
}
eventWg.Wait()
properties[userIdField] = key
outmu.Lock()
results = append(results, properties)
outmu.Unlock()
}(key)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return nil, err
}
}
return results, nil
}
type FeatureDBScanKKVRequest struct {
Prefixs []string `json:"prefixs"`
Length int `json:"length"`
WithValue bool `json:"with_value"`
}
func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
selectFieldsSet := make(map[string]struct{})
for _, selectField := range selectFields {
selectFieldsSet[selectField] = struct{}{}
}
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
errChan := make(chan error, len(userIds))
fetchDataFunc := func(user_id interface{}) []map[string]interface{} {
results := []map[string]interface{}{}
var response *http.Response
if len(events) == 0 {
prefixs := []string{fmt.Sprintf("%v\u001D", user_id)}
request := FeatureDBScanKKVRequest{
Prefixs: prefixs,
WithValue: true,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
} else {
pks := make([]string, 0, len(events))
for _, event := range events {
pks = append(pks, fmt.Sprintf("%v\u001D%v", user_id, event))
}
request := FeatureDBBatchGetKKVRequest{
PKs: pks,
WithValue: true,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return nil
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return nil
}
reader := bufio.NewReader(response.Body)
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return nil
}
kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)
for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
kkv := new(fdbserverfb.KKVData)
kkvRecordBlock.Values(kkv, i)
dataBytes := kkv.ValueBytes()
if len(dataBytes) < 2 {
//fmt.Println("userid ", user_id, " not exists")
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
if _, exists := selectFieldsSet[field]; exists {
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
} else {
var skipBytes int = 0
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
skipBytes = 4
case constants.FS_INT64:
skipBytes = 8
case constants.FS_FLOAT:
skipBytes = 4
case constants.FS_DOUBLE:
skipBytes = 8
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
case constants.FS_BOOLEAN:
skipBytes = 1
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
}
if skipBytes > 0 {
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
return nil, err
}
}
}
}
return properties, nil
}
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
readResult, err := readFeatureDBFunc_F_1()
if err != nil {
errChan <- err
return nil
}
if t, exist := sequencePlayTimeMap[utils.ToString(readResult[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(readResult[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
results = append(results, readResult)
} else {
errChan <- fmt.Errorf("unsupported protocal version: %d, ifNullFlagVersion: %d", protocalVersion, ifNullFlagVersion)
return nil
}
}
}
return results
}
results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
var outmu sync.Mutex
var wg sync.WaitGroup
for _, userId := range userIds {
wg.Add(1)
go func(userId interface{}) {
defer wg.Done()
innerresult := fetchDataFunc(userId)
outmu.Lock()
results = append(results, innerresult...)
outmu.Unlock()
}(userId)
}
wg.Wait()
return results, nil
}
func deserialize(r io.Reader) ([]byte, error) {
var length uint32
err := binary.Read(r, binary.LittleEndian, &length)
if err != nil {
return nil, err
}
data := make([]byte, length)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, err
}
return data, nil
}
func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int, error) {
snapshotId, _, err := d.createSnapshot()
if err != nil {
return nil, 0, err
}
var program *vm.Program
if filterExpr != "" {
program, err = expr.Compile(filterExpr)
if err != nil {
return nil, 0, err
}
}
alloc := memory.NewGoAllocator()
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots/%s/scan",
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
if err != nil {
return nil, 0, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
return nil, 0, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
return nil, 0, fmt.Errorf("status code: %d, response body: %s", response.StatusCode, string(body))
}
// Arrow IPC reader
reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))
readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
}
return properties, nil
}
ids := make([]string, 0, 1024)
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for reader.Next() {
record := reader.Record()
for i := 0; i < int(record.NumRows()); i++ {
if filterExpr == "" {
ids = append(ids, record.Column(0).(*array.String).Value(i))
} else {
dataBytes := record.Column(1).(*array.Binary).Value(i)
if len(dataBytes) < 2 {
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
properties, err := readFeatureDBFunc_F_1(innerReader)
if err != nil {
return nil, 0, err
}
if ret, err := expr.Run(program, properties); err != nil {
return nil, 0, err
} else if r, ok := ret.(bool); ok && r {
ids = append(ids, record.Column(0).(*array.String).Value(i))
}
}
}
record.Release()
}
return ids, len(ids), nil
}
func (d *FeatureViewFeatureDBDao) createSnapshot() (string, int64, error) {
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots",
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table), bytes.NewReader(nil))
if err != nil {
return "", 0, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
return "", 0, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
body, _ := io.ReadAll(response.Body)
return "", 0, fmt.Errorf("status code: %d, response body: %s", response.StatusCode, string(body))
}
resonseBody := struct {
RequestId string `json:"request_id,omitempty"`
Code string `json:"code"`
Message string `json:"message,omitempty"`
Data map[string]any `json:"data,omitempty"`
}{}
decoder := json.NewDecoder(response.Body)
decoder.UseNumber()
if err := decoder.Decode(&resonseBody); err != nil {
return "", 0, err
}
return resonseBody.Data["snapshot_id"].(string), utils.ToInt64(resonseBody.Data["ts"], 0), nil
}
func (d *FeatureViewFeatureDBDao) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) {
_, ts, err := d.createSnapshot()
if err != nil {
return nil, err
}
var program *vm.Program
if filter != "" {
program, err = expr.Compile(filter)
if err != nil {
return nil, err
}
}
ids, _, err := d.RowCountIds(filter)
if err != nil {
return nil, err
}
readFeatureDBFunc_F_1 := func(innerReader *bytes.Reader) (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
}
return properties, nil
}
if ch != nil {
go func() {
alloc := memory.NewGoAllocator()
for {
time.Sleep(time.Second * 5)
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/iterate_get_kv?ts=%d",
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, ts), bytes.NewReader(nil))
if err != nil {
continue
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
continue
}
if response.StatusCode != http.StatusOK {
continue
}
_ts := utils.ToInt64(response.Header.Get("Next-Ts"), 0)
if _ts == 0 {
continue
}
ts = _ts
reader, _ := ipc.NewReader(response.Body, ipc.WithAllocator(alloc))
innerReader := readerPool.Get().(*bytes.Reader)
for reader.Next() {
record := reader.Record()
for i := 0; i < int(record.NumRows()); i++ {
if filter == "" {
ch <- record.Column(0).(*array.String).Value(i)
} else {
dataBytes := record.Column(1).(*array.Binary).Value(i)
if len(dataBytes) < 2 {
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
properties, err := readFeatureDBFunc_F_1(innerReader)
if err != nil {
continue
}
if ret, err := expr.Run(program, properties); err != nil {
continue
} else if r, ok := ret.(bool); ok && r {
ch <- record.Column(0).(*array.String).Value(i)
}
}
}
record.Release()
}
readerPool.Put(innerReader)
response.Body.Close()
}
}()
}
return ids, nil
}