func()

in dao/feature_view_tablestore_dao.go [128:397]


func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	pkField := fmt.Sprintf("%s_%s", userIdField, sequenceConfig.EventField)
	var skField string
	if sequenceConfig.DeduplicationMethodNum == 1 {
		skField = sequenceConfig.ItemIdField
	} else if sequenceConfig.DeduplicationMethodNum == 2 {
		skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)
	}

	onlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo {
		sequences := []*sequenceInfo{}

		var ots_mu sync.Mutex
		var ots_wg sync.WaitGroup
		events := strings.Split(seqEvent, "|")

		for _, event := range events {
			ots_wg.Add(1)
			go func(event string) {
				defer ots_wg.Done()
				getRangeRequest := &tablestore.GetRangeRequest{}
				rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
				rangeRowQueryCriteria.TableName = tableName

				startPK := new(tablestore.PrimaryKey)
				startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
				startPK.AddPrimaryKeyColumnWithMinValue(skField)
				endPK := new(tablestore.PrimaryKey)
				endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
				endPK.AddPrimaryKeyColumnWithMaxValue(skField)

				rangeRowQueryCriteria.StartPrimaryKey = startPK
				rangeRowQueryCriteria.EndPrimaryKey = endPK
				rangeRowQueryCriteria.Direction = tablestore.FORWARD
				if sequenceConfig.PlayTimeField == "" {
					rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
				} else {
					rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
				}
				timeRange := new(tablestore.TimeRange)
				timeRange.End = currTime * 1000
				timeRange.Start = (currTime - 86400*5) * 1000
				rangeRowQueryCriteria.TimeRange = timeRange

				getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
				getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest)

				for {
					if err != nil {
						fmt.Println("get range failed with error:", err)
					}
					for _, row := range getRangeResp.Rows {
						if row.PrimaryKey.PrimaryKeys == nil {
							continue
						}
						seq := new(sequenceInfo)
						if sequenceConfig.DeduplicationMethodNum == 1 {
							seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "")
						}
						for _, column := range row.Columns {
							switch column.ColumnName {
							case sequenceConfig.EventField:
								seq.event = utils.ToString(column.Value, "")
							case sequenceConfig.ItemIdField:
								seq.itemId = utils.ToString(column.Value, "")
							case sequenceConfig.PlayTimeField:
								seq.playTime = utils.ToFloat(column.Value, 0)
							case sequenceConfig.TimestampField:
								seq.timestamp = utils.ToInt64(column.Value, 0)
							}
						}

						if seq.event == "" || seq.itemId == "" {
							continue
						}
						if t, exist := sequencePlayTimeMap[seq.event]; exist {
							if seq.playTime <= t {
								continue
							}
						}

						ots_mu.Lock()
						sequences = append(sequences, seq)
						ots_mu.Unlock()
					}
					if getRangeResp.NextStartPrimaryKey == nil {
						break
					} else {
						getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
						getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest)
					}
				}
			}(event)
		}
		ots_wg.Wait()

		// add seqLen limit
		sort.Slice(sequences, func(i, j int) bool {
			return sequences[i].timestamp > sequences[j].timestamp
		})
		limit := seqLen
		if seqLen > len(sequences) {
			limit = len(sequences)
		}

		resultSequences := sequences[:limit]

		return resultSequences
	}

	offlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo {
		sequences := []*sequenceInfo{}

		var ots_mu sync.Mutex
		var ots_wg sync.WaitGroup
		events := strings.Split(seqEvent, "|")

		for _, event := range events {
			ots_wg.Add(1)
			go func(event string) {
				defer ots_wg.Done()
				getRangeRequest := &tablestore.GetRangeRequest{}
				rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
				rangeRowQueryCriteria.TableName = tableName

				startPK := new(tablestore.PrimaryKey)
				startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
				startPK.AddPrimaryKeyColumnWithMinValue(skField)
				endPK := new(tablestore.PrimaryKey)
				endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event))
				endPK.AddPrimaryKeyColumnWithMaxValue(skField)

				rangeRowQueryCriteria.StartPrimaryKey = startPK
				rangeRowQueryCriteria.EndPrimaryKey = endPK
				rangeRowQueryCriteria.Direction = tablestore.FORWARD
				if sequenceConfig.PlayTimeField == "" {
					rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
				} else {
					rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
				}
				rangeRowQueryCriteria.MaxVersion = 1

				getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
				getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest)

				for {
					if err != nil {
						fmt.Println("get range failed with error:", err)
					}
					for _, row := range getRangeResp.Rows {
						if row.PrimaryKey.PrimaryKeys == nil {
							continue
						}
						seq := new(sequenceInfo)
						if sequenceConfig.DeduplicationMethodNum == 1 {
							seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "")
						}
						for _, column := range row.Columns {
							switch column.ColumnName {
							case sequenceConfig.EventField:
								seq.event = utils.ToString(column.Value, "")
							case sequenceConfig.ItemIdField:
								seq.itemId = utils.ToString(column.Value, "")
							case sequenceConfig.PlayTimeField:
								seq.playTime = utils.ToFloat(column.Value, 0)
							case sequenceConfig.TimestampField:
								seq.timestamp = utils.ToInt64(column.Value, 0)
							}
						}

						if seq.event == "" || seq.itemId == "" {
							continue
						}
						if t, exist := sequencePlayTimeMap[seq.event]; exist {
							if seq.playTime <= t {
								continue
							}
						}

						ots_mu.Lock()
						sequences = append(sequences, seq)
						ots_mu.Unlock()
					}
					if getRangeResp.NextStartPrimaryKey == nil {
						break
					} else {
						getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
						getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest)
					}
				}
			}(event)
		}
		ots_wg.Wait()

		// add seqLen limit
		sort.Slice(sequences, func(i, j int) bool {
			return sequences[i].timestamp > sequences[j].timestamp
		})
		limit := seqLen
		if seqLen > len(sequences) {
			limit = len(sequences)
		}

		resultSequences := sequences[:limit]

		return resultSequences
	}

	results := make([]map[string]interface{}, 0, len(keys))
	var outmu sync.Mutex

	var wg sync.WaitGroup
	for _, key := range keys {
		wg.Add(1)
		go func(key interface{}) {
			defer wg.Done()
			properties := make(map[string]interface{})
			var mu sync.Mutex

			var eventWg sync.WaitGroup
			for _, seqConfig := range onlineConfig {
				eventWg.Add(1)
				go func(seqConfig *api.SeqConfig) {
					defer eventWg.Done()
					var onlineSequences []*sequenceInfo
					var offlineSequences []*sequenceInfo

					var innerWg sync.WaitGroup
					//get data from online table
					innerWg.Add(1)
					go func(seqEvent string, seqLen int, key interface{}) {
						defer innerWg.Done()
						if onlineresult := onlineFetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil {
							onlineSequences = onlineresult
						}
					}(seqConfig.SeqEvent, seqConfig.SeqLen, key)
					//get data from offline table
					innerWg.Add(1)
					go func(seqEvent string, seqLen int, key interface{}) {
						defer innerWg.Done()
						if offlineresult := offlineFetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil {
							offlineSequences = offlineresult
						}
					}(seqConfig.SeqEvent, seqConfig.SeqLen, key)
					innerWg.Wait()

					subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
					mu.Lock()
					defer mu.Unlock()
					for k, value := range subproperties {
						properties[k] = value
					}
				}(seqConfig)
			}
			eventWg.Wait()

			properties[userIdField] = key
			outmu.Lock()
			results = append(results, properties)
			outmu.Unlock()
		}(key)
	}

	wg.Wait()

	return results, nil
}