in dao/feature_view_igraph_dao.go [111:227]
func (d *FeatureViewIGraphDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
var selectFields []string
if sequenceConfig.PlayTimeField == "" {
selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
} else {
selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
}
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
sequences := []*sequenceInfo{}
events := strings.Split(seqEvent, "|")
var pk string
if len(events) > 1 {
pks := make([]string, len(events))
for i, event := range events {
pks[i] = url.QueryEscape(fmt.Sprintf("%v_%s", key, event))
}
pk = strings.Join(pks, ";")
} else {
pk = url.QueryEscape(fmt.Sprintf("%v_%s", key, seqEvent))
}
queryString := fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr).limit(%d)",
d.group, pk, d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField, seqLen)
request := aligraph.ReadRequest{
QueryString: queryString,
}
resp, err := d.igraphClient.Read(&request)
if err != nil {
log.Println(err)
return nil
}
for _, resultData := range resp.Result {
for _, data := range resultData.Data {
seq := new(sequenceInfo)
for field, value := range data {
if field == "label" {
continue
}
switch field {
case sequenceConfig.EventField:
seq.event = utils.ToString(value, "")
case sequenceConfig.ItemIdField:
seq.itemId = utils.ToString(value, "")
case sequenceConfig.PlayTimeField:
seq.playTime = utils.ToFloat(value, 0)
case sequenceConfig.TimestampField:
seq.timestamp = utils.ToInt64(value, 0)
default:
}
}
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
sequences = append(sequences, seq)
}
}
return sequences
}
results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key interface{}) {
defer wg.Done()
properties := make(map[string]interface{})
var mu sync.Mutex
var eventWg sync.WaitGroup
for _, seqConfig := range onlineConfig {
eventWg.Add(1)
go func(seqConfig *api.SeqConfig) {
defer eventWg.Done()
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
//get data from edge
if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
onlineSequences = onlineresult
}
subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
mu.Lock()
defer mu.Unlock()
for k, value := range subproperties {
properties[k] = value
}
}(seqConfig)
}
eventWg.Wait()
properties[userIdField] = key
outmu.Lock()
results = append(results, properties)
outmu.Unlock()
}(key)
}
wg.Wait()
return results, nil
}