func()

in dao/feature_view_igraph_dao.go [111:227]


func (d *FeatureViewIGraphDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
	var selectFields []string
	if sequenceConfig.PlayTimeField == "" {
		selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
	} else {
		selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
	}

	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
		sequences := []*sequenceInfo{}
		events := strings.Split(seqEvent, "|")
		var pk string
		if len(events) > 1 {
			pks := make([]string, len(events))
			for i, event := range events {
				pks[i] = url.QueryEscape(fmt.Sprintf("%v_%s", key, event))
			}
			pk = strings.Join(pks, ";")
		} else {
			pk = url.QueryEscape(fmt.Sprintf("%v_%s", key, seqEvent))
		}
		queryString := fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr).limit(%d)",
			d.group, pk, d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField, seqLen)
		request := aligraph.ReadRequest{
			QueryString: queryString,
		}
		resp, err := d.igraphClient.Read(&request)
		if err != nil {
			log.Println(err)
			return nil
		}

		for _, resultData := range resp.Result {
			for _, data := range resultData.Data {
				seq := new(sequenceInfo)
				for field, value := range data {
					if field == "label" {
						continue
					}
					switch field {
					case sequenceConfig.EventField:
						seq.event = utils.ToString(value, "")
					case sequenceConfig.ItemIdField:
						seq.itemId = utils.ToString(value, "")
					case sequenceConfig.PlayTimeField:
						seq.playTime = utils.ToFloat(value, 0)
					case sequenceConfig.TimestampField:
						seq.timestamp = utils.ToInt64(value, 0)
					default:
					}
				}

				if seq.event == "" || seq.itemId == "" {
					continue
				}
				if t, exist := sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}

				sequences = append(sequences, seq)
			}
		}

		return sequences
	}

	results := make([]map[string]interface{}, 0, len(keys))
	var outmu sync.Mutex

	var wg sync.WaitGroup
	for _, key := range keys {
		wg.Add(1)
		go func(key interface{}) {
			defer wg.Done()
			properties := make(map[string]interface{})
			var mu sync.Mutex

			var eventWg sync.WaitGroup
			for _, seqConfig := range onlineConfig {
				eventWg.Add(1)
				go func(seqConfig *api.SeqConfig) {
					defer eventWg.Done()
					var onlineSequences []*sequenceInfo
					var offlineSequences []*sequenceInfo

					//get data from edge

					if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
						onlineSequences = onlineresult
					}

					subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
					mu.Lock()
					defer mu.Unlock()
					for k, value := range subproperties {
						properties[k] = value
					}
				}(seqConfig)
			}
			eventWg.Wait()

			properties[userIdField] = key
			outmu.Lock()
			results = append(results, properties)
			outmu.Unlock()
		}(key)
	}

	wg.Wait()

	return results, nil
}