in dao/feature_view_featuredb_dao.go [882:1138]
func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
selectFieldsSet := make(map[string]struct{})
for _, selectField := range selectFields {
selectFieldsSet[selectField] = struct{}{}
}
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
errChan := make(chan error, len(userIds))
fetchDataFunc := func(user_id interface{}) []map[string]interface{} {
results := []map[string]interface{}{}
var response *http.Response
if len(events) == 0 {
prefixs := []string{fmt.Sprintf("%v\u001D", user_id)}
request := FeatureDBScanKKVRequest{
Prefixs: prefixs,
WithValue: true,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
} else {
pks := make([]string, 0, len(events))
for _, event := range events {
pks = append(pks, fmt.Sprintf("%v\u001D%v", user_id, event))
}
request := FeatureDBBatchGetKKVRequest{
PKs: pks,
WithValue: true,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return nil
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return nil
}
reader := bufio.NewReader(response.Body)
innerReader := readerPool.Get().(*bytes.Reader)
defer readerPool.Put(innerReader)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return nil
}
kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)
for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
kkv := new(fdbserverfb.KKVData)
kkvRecordBlock.Values(kkv, i)
dataBytes := kkv.ValueBytes()
if len(dataBytes) < 2 {
//fmt.Println("userid ", user_id, " not exists")
continue
}
innerReader.Reset(dataBytes)
// 读取版本号
var protocalVersion, ifNullFlagVersion uint8
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
properties := make(map[string]interface{})
for _, field := range d.fields {
var isNull uint8
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
if err == io.EOF {
break
}
return nil, err
}
if isNull == 1 {
// 跳过空值
continue
}
if _, exists := selectFieldsSet[field]; exists {
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
var int32Value int32
binary.Read(innerReader, binary.LittleEndian, &int32Value)
properties[field] = int32Value
case constants.FS_INT64:
var int64Value int64
binary.Read(innerReader, binary.LittleEndian, &int64Value)
properties[field] = int64Value
case constants.FS_FLOAT:
var float32Value float32
binary.Read(innerReader, binary.LittleEndian, &float32Value)
properties[field] = float32Value
case constants.FS_DOUBLE:
var float64Value float64
binary.Read(innerReader, binary.LittleEndian, &float64Value)
properties[field] = float64Value
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
case constants.FS_BOOLEAN:
var boolValue bool
binary.Read(innerReader, binary.LittleEndian, &boolValue)
properties[field] = boolValue
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
strBytes := make([]byte, length)
binary.Read(innerReader, binary.LittleEndian, &strBytes)
properties[field] = string(strBytes)
}
} else {
var skipBytes int = 0
switch d.fieldTypeMap[field] {
case constants.FS_INT32:
skipBytes = 4
case constants.FS_INT64:
skipBytes = 8
case constants.FS_FLOAT:
skipBytes = 4
case constants.FS_DOUBLE:
skipBytes = 8
case constants.FS_STRING:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
case constants.FS_BOOLEAN:
skipBytes = 1
default:
var length uint32
binary.Read(innerReader, binary.LittleEndian, &length)
skipBytes = int(length)
}
if skipBytes > 0 {
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
return nil, err
}
}
}
}
return properties, nil
}
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
readResult, err := readFeatureDBFunc_F_1()
if err != nil {
errChan <- err
return nil
}
if t, exist := sequencePlayTimeMap[utils.ToString(readResult[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(readResult[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
results = append(results, readResult)
} else {
errChan <- fmt.Errorf("unsupported protocal version: %d, ifNullFlagVersion: %d", protocalVersion, ifNullFlagVersion)
return nil
}
}
}
return results
}
results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
var outmu sync.Mutex
var wg sync.WaitGroup
for _, userId := range userIds {
wg.Add(1)
go func(userId interface{}) {
defer wg.Done()
innerresult := fetchDataFunc(userId)
outmu.Lock()
results = append(results, innerresult...)
outmu.Unlock()
}(userId)
}
wg.Wait()
return results, nil
}