in dao/feature_view_igraph_dao.go [229:303]
func (d *FeatureViewIGraphDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
if len(events) == 0 {
return []map[string]interface{}{}, errors.New("igraph not support GetBehaviorFeatures with empty events")
}
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
fetchDataFunc := func(userId interface{}) []map[string]interface{} {
var queryString string
var pkeys []string
for _, event := range events {
pkeys = append(pkeys, url.QueryEscape(fmt.Sprintf("%v_%v", userId, event)))
}
queryString = fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr)",
d.group, strings.Join(pkeys, ";"), d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField)
request := aligraph.ReadRequest{
QueryString: queryString,
}
resp, err := d.igraphClient.Read(&request)
if err != nil {
log.Println(err)
return nil
}
results := []map[string]interface{}{}
for _, resultData := range resp.Result {
for _, data := range resultData.Data {
properties := make(map[string]interface{}, len(data))
for field, value := range data {
if field == "label" ||
field == fmt.Sprintf("%v_%v", d.primaryKeyField, sequenceConfig.EventField) ||
(sequenceConfig.DeduplicationMethodNum == 2 && field == fmt.Sprintf("%v_%v", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)) {
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE, constants.FS_FLOAT:
properties[field] = utils.ToFloat(value, -1024)
case constants.FS_INT32, constants.FS_INT64:
properties[field] = utils.ToInt(value, -1024)
default:
properties[field] = value
}
}
if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
results = append(results, properties)
}
}
return results
}
results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
var outmu sync.Mutex
var wg sync.WaitGroup
for _, userId := range userIds {
wg.Add(1)
go func(userId interface{}) {
defer wg.Done()
innerresult := fetchDataFunc(userId)
outmu.Lock()
results = append(results, innerresult...)
outmu.Unlock()
}(userId)
}
wg.Wait()
return results, nil
}