in dao/feature_view_featuredb_dao.go [699:874]
func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
errChan := make(chan error, len(keys)*len(onlineConfig))
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
sequences := []*sequenceInfo{}
events := strings.Split(seqEvent, "|")
pks := []string{}
for _, event := range events {
pks = append(pks, fmt.Sprintf("%v\u001D%s", key, event))
}
request := FeatureDBBatchGetKKVRequest{
PKs: pks,
Length: seqLen,
}
body, _ := json.Marshal(request)
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err := d.featureDBClient.Client.Do(req)
if err != nil {
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
errChan <- err
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", d.featureDBClient.Token)
req.Header.Set("Auth", d.signature)
response, err = d.featureDBClient.Client.Do(req)
if err != nil {
errChan <- err
return nil
}
}
defer response.Body.Close() // 确保关闭response.Body
// 检查状态码
if response.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
errChan <- err
return nil
}
var bodyMap map[string]interface{}
if err := json.Unmarshal(bodyBytes, &bodyMap); err == nil {
if msg, found := bodyMap["message"]; found {
log.Printf("StatusCode: %d, Response message: %s\n", response.StatusCode, msg)
}
}
return nil
}
reader := bufio.NewReader(response.Body)
for {
buf, err := deserialize(reader)
if err == io.EOF {
break // End of stream
}
if err != nil {
errChan <- err
return nil
}
kkvRecordBlock := fdbserverfb.GetRootAsKKVRecordBlock(buf, 0)
for i := 0; i < kkvRecordBlock.ValuesLength(); i++ {
kkv := new(fdbserverfb.KKVData)
kkvRecordBlock.Values(kkv, i)
pk := string(kkv.Pk())
userIdEvent := strings.Split(pk, "\u001D")
if len(userIdEvent) != 2 {
continue
}
var itemId string
if sequenceConfig.DeduplicationMethodNum == 1 {
itemId = string(kkv.Sk())
} else if sequenceConfig.DeduplicationMethodNum == 2 {
sk := string(kkv.Sk())
itemIdTimestamp := strings.Split(sk, "\u001D")
if len(itemIdTimestamp) != 2 {
continue
}
itemId = itemIdTimestamp[0]
} else {
continue
}
seq := new(sequenceInfo)
seq.event = userIdEvent[1]
seq.itemId = itemId
seq.timestamp = kkv.EventTimestamp()
seq.playTime = kkv.PlayTime()
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
sequences = append(sequences, seq)
}
}
return sequences
}
results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key interface{}) {
defer wg.Done()
properties := make(map[string]interface{})
var mu sync.Mutex
var eventWg sync.WaitGroup
for _, seqConfig := range onlineConfig {
eventWg.Add(1)
go func(seqConfig *api.SeqConfig) {
defer eventWg.Done()
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
// FeatureDB has processed the integration of online sequence features and offline sequence features
// Here we put the results into onlineSequences
if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
onlineSequences = onlineresult
}
subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
mu.Lock()
defer mu.Unlock()
for k, value := range subproperties {
properties[k] = value
}
}(seqConfig)
}
eventWg.Wait()
properties[userIdField] = key
outmu.Lock()
results = append(results, properties)
outmu.Unlock()
}(key)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return nil, err
}
}
return results, nil
}