func()

in dao/feature_view_tablestore_dao.go [399:622]


func (d *FeatureViewTableStoreDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	pkField := fmt.Sprintf("%s_%s", d.primaryKeyField, sequenceConfig.EventField)
	var skField string
	if sequenceConfig.DeduplicationMethodNum == 1 {
		skField = sequenceConfig.ItemIdField
	} else if sequenceConfig.DeduplicationMethodNum == 2 {
		skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)
	}

	onlineFunc := func(userId interface{}, event interface{}) []map[string]interface{} {
		getRangeRequest := &tablestore.GetRangeRequest{}
		rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
		rangeRowQueryCriteria.TableName = d.onlineTable

		startPK := new(tablestore.PrimaryKey)
		endPK := new(tablestore.PrimaryKey)
		if event == nil {
			startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v", userId))
			endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%va", userId))
		} else {
			startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event))
			endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event))
		}
		startPK.AddPrimaryKeyColumnWithMinValue(skField)
		endPK.AddPrimaryKeyColumnWithMaxValue(skField)

		rangeRowQueryCriteria.StartPrimaryKey = startPK
		rangeRowQueryCriteria.EndPrimaryKey = endPK
		rangeRowQueryCriteria.Direction = tablestore.FORWARD
		rangeRowQueryCriteria.ColumnsToGet = selectFields
		timeRange := new(tablestore.TimeRange)
		timeRange.End = currTime * 1000
		timeRange.Start = (currTime - 86400*5) * 1000
		rangeRowQueryCriteria.TimeRange = timeRange

		getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
		getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest)

		results := []map[string]interface{}{}

		for {
			if err != nil {
				fmt.Println("get range failed with error:", err)
			}
			for _, row := range getRangeResp.Rows {
				if row.PrimaryKey.PrimaryKeys == nil {
					continue
				}
				newMap := make(map[string]interface{}, len(selectFields))
				if sequenceConfig.DeduplicationMethodNum == 1 {
					newMap[sequenceConfig.ItemIdField] = row.PrimaryKey.PrimaryKeys[1].Value
				}
				for _, column := range row.Columns {
					newMap[column.ColumnName] = column.Value
				}
				if t, exist := sequencePlayTimeMap[utils.ToString(newMap[sequenceConfig.EventField], "")]; exist {
					if utils.ToFloat(newMap[sequenceConfig.PlayTimeField], 0.0) <= t {
						continue
					}
				}
				results = append(results, newMap)
			}
			if getRangeResp.NextStartPrimaryKey == nil {
				break
			} else {
				getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
				getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest)
			}
		}

		return results
	}

	offlineFunc := func(userId interface{}, event interface{}) []map[string]interface{} {
		getRangeRequest := &tablestore.GetRangeRequest{}
		rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
		rangeRowQueryCriteria.TableName = d.offlineTable

		startPK := new(tablestore.PrimaryKey)
		endPK := new(tablestore.PrimaryKey)
		if event == nil {
			startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v", userId))
			endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%va", userId))
		} else {
			startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event))
			endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event))
		}
		startPK.AddPrimaryKeyColumnWithMinValue(skField)
		endPK.AddPrimaryKeyColumnWithMaxValue(skField)

		rangeRowQueryCriteria.StartPrimaryKey = startPK
		rangeRowQueryCriteria.EndPrimaryKey = endPK
		rangeRowQueryCriteria.Direction = tablestore.FORWARD
		rangeRowQueryCriteria.ColumnsToGet = selectFields
		rangeRowQueryCriteria.MaxVersion = 1

		getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
		getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest)
		if err != nil {
			fmt.Println("get range failed with error:", err)
			return nil
		}

		results := []map[string]interface{}{}

		for {
			if err != nil {
				fmt.Println("get range failed with error:", err)
			}
			for _, row := range getRangeResp.Rows {
				if row.PrimaryKey.PrimaryKeys == nil {
					continue
				}
				newMap := make(map[string]interface{}, len(selectFields))
				if sequenceConfig.DeduplicationMethodNum == 1 {
					newMap[sequenceConfig.ItemIdField] = row.PrimaryKey.PrimaryKeys[1].Value
				}
				for _, column := range row.Columns {
					newMap[column.ColumnName] = column.Value
				}
				if t, exist := sequencePlayTimeMap[utils.ToString(newMap[sequenceConfig.EventField], "")]; exist {
					if utils.ToFloat(newMap[sequenceConfig.PlayTimeField], 0.0) <= t {
						continue
					}
				}
				results = append(results, newMap)
			}
			if getRangeResp.NextStartPrimaryKey == nil {
				break
			} else {
				getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
				getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest)
			}
		}

		return results
	}

	results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
	var outmu sync.Mutex
	var wg sync.WaitGroup
	for _, userId := range userIds {
		wg.Add(1)
		go func(userId interface{}) {
			defer wg.Done()
			if len(events) == 0 {
				var innerWg sync.WaitGroup
				var offlineResult []map[string]interface{}
				var onlineResult []map[string]interface{}
				// offline table
				innerWg.Add(1)
				go func() {
					defer innerWg.Done()
					offlineResult = offlineFunc(userId, nil)
				}()
				// online table
				innerWg.Add(1)
				go func() {
					defer innerWg.Done()
					onlineResult = onlineFunc(userId, nil)
				}()
				innerWg.Wait()
				if offlineResult == nil || onlineResult == nil {
					fmt.Println("get user behavior feature failed")
					return
				}
				combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField)
				sort.Slice(combinedResult, func(i, j int) bool {
					return utils.ToInt64(combinedResult[i][sequenceConfig.TimestampField], 0) > utils.ToInt64(combinedResult[j][sequenceConfig.TimestampField], 0)
				})
				outmu.Lock()
				results = append(results, combinedResult...)
				outmu.Unlock()
			} else {
				var mu sync.Mutex
				var eventWg sync.WaitGroup
				innerResults := make([]map[string]interface{}, 0, len(events)*50)
				for _, event := range events {
					eventWg.Add(1)
					go func(event interface{}) {
						defer eventWg.Done()
						var offlineResult []map[string]interface{}
						var onlineResult []map[string]interface{}
						var innerWg sync.WaitGroup
						// offline table
						innerWg.Add(1)
						go func() {
							defer innerWg.Done()
							offlineResult = offlineFunc(userId, event)
						}()
						// online table
						innerWg.Add(1)
						go func() {
							defer innerWg.Done()
							onlineResult = onlineFunc(userId, event)
						}()
						innerWg.Wait()
						if offlineResult == nil || onlineResult == nil {
							fmt.Println("get user behavior feature failed")
							return
						}
						combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField)
						mu.Lock()
						innerResults = append(innerResults, combinedResult...)
						mu.Unlock()
					}(event)
				}
				eventWg.Wait()
				sort.Slice(innerResults, func(i, j int) bool {
					return utils.ToInt64(innerResults[i][sequenceConfig.TimestampField], 0) > utils.ToInt64(innerResults[j][sequenceConfig.TimestampField], 0)
				})
				outmu.Lock()
				results = append(results, innerResults...)
				outmu.Unlock()
			}
		}(userId)
	}
	wg.Wait()

	return results, nil
}