func()

in dao/feature_view_igraph_dao.go [229:303]


func (d *FeatureViewIGraphDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
	if len(events) == 0 {
		return []map[string]interface{}{}, errors.New("igraph not support GetBehaviorFeatures with empty events")
	}
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	fetchDataFunc := func(userId interface{}) []map[string]interface{} {
		var queryString string
		var pkeys []string
		for _, event := range events {
			pkeys = append(pkeys, url.QueryEscape(fmt.Sprintf("%v_%v", userId, event)))
		}
		queryString = fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr)",
			d.group, strings.Join(pkeys, ";"), d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField)

		request := aligraph.ReadRequest{
			QueryString: queryString,
		}
		resp, err := d.igraphClient.Read(&request)
		if err != nil {
			log.Println(err)
			return nil
		}

		results := []map[string]interface{}{}
		for _, resultData := range resp.Result {
			for _, data := range resultData.Data {
				properties := make(map[string]interface{}, len(data))

				for field, value := range data {
					if field == "label" ||
						field == fmt.Sprintf("%v_%v", d.primaryKeyField, sequenceConfig.EventField) ||
						(sequenceConfig.DeduplicationMethodNum == 2 && field == fmt.Sprintf("%v_%v", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)) {
						continue
					}

					switch d.fieldTypeMap[field] {
					case constants.FS_DOUBLE, constants.FS_FLOAT:
						properties[field] = utils.ToFloat(value, -1024)
					case constants.FS_INT32, constants.FS_INT64:
						properties[field] = utils.ToInt(value, -1024)
					default:
						properties[field] = value
					}
				}
				if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
					if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
						continue
					}
				}
				results = append(results, properties)
			}
		}

		return results
	}

	results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
	var outmu sync.Mutex
	var wg sync.WaitGroup

	for _, userId := range userIds {
		wg.Add(1)
		go func(userId interface{}) {
			defer wg.Done()
			innerresult := fetchDataFunc(userId)
			outmu.Lock()
			results = append(results, innerresult...)
			outmu.Unlock()
		}(userId)
	}
	wg.Wait()

	return results, nil
}