in module/feature_tablestore_dao.go [139:338]
func (d *FeatureTablestoreDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
return
}
}()
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\terror=property not found(%s)", context.RecommendId, comms[1]))
return
}
currTime := time.Now().Unix()
item_feature_key_name := "item_id"
if d.itemFeatureKeyName != "" {
item_feature_key_name = d.itemFeatureKeyName
}
event_feature_key_name := "event"
if d.eventFeatureKeyName != "" {
event_feature_key_name = d.eventFeatureKeyName
}
play_time_feature_key_name := "play_time"
if d.playTimeFeatureKeyName != "" {
play_time_feature_key_name = d.playTimeFeatureKeyName
}
timestamp_feature_key_name := "timestamp"
if d.timestampFeatureKeyName != "" {
timestamp_feature_key_name = d.timestampFeatureKeyName
}
ts_feature_key_name := "ts"
if d.tsFeatureKeyName != "" {
ts_feature_key_name = d.tsFeatureKeyName
}
sequence_event_selections := strings.Split(d.sequenceEvent, ",")
selectFields := []string{item_feature_key_name, play_time_feature_key_name, timestamp_feature_key_name}
if len(d.sequenceDimFields) > 0 {
selectFields = append(selectFields, d.sequenceDimFields...)
}
fetchDataFunc := func(table string) (sequences []*sequenceInfo) {
batchGetReq := &tablestore.BatchGetRowRequest{}
mqCriteria := &tablestore.MultiRowQueryCriteria{}
for _, event := range sequence_event_selections {
pkToGet := new(tablestore.PrimaryKey)
pkToGet.AddPrimaryKeyColumn(d.userFeatureKeyName, key)
pkToGet.AddPrimaryKeyColumn(event_feature_key_name, event)
mqCriteria.AddRow(pkToGet)
}
mqCriteria.MaxVersion = d.sequenceLength
mqCriteria.TableName = table
mqCriteria.ColumnsToGet = selectFields
timeRange := new(tablestore.TimeRange)
timeRange.End = currTime * 1000
timeRange.Start = (currTime - 86400*5) * 1000
mqCriteria.TimeRange = timeRange
batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
batchGetResponse, err := d.tablestore.Client.BatchGetRow(batchGetReq)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureTablestoreDao\terror=%v", context.RecommendId, err))
return
}
for _, row := range batchGetResponse.TableToRowsResult[table] {
if row.IsSucceed {
if row.PrimaryKey.PrimaryKeys == nil {
continue
}
// get all versions
versions := make([]int64, 0, 1)
versionSeqmap := make(map[int64]*sequenceInfo, 0)
for _, column := range row.Columns {
if column.ColumnName == item_feature_key_name {
versions = append(versions, column.Timestamp)
seq := new(sequenceInfo)
seq.event = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "")
versionSeqmap[column.Timestamp] = seq
}
}
for _, column := range row.Columns {
seq := versionSeqmap[column.Timestamp]
switch column.ColumnName {
case item_feature_key_name:
seq.itemId = utils.ToString(column.Value, "")
case play_time_feature_key_name:
seq.playTime = utils.ToFloat(column.Value, 0)
case timestamp_feature_key_name:
seq.timestamp = utils.ToInt64(column.Value, 0)
default:
sqlValue := sql.NullString{String: utils.ToString(column.Value, ""), Valid: true}
seq.dimensionFields = append(seq.dimensionFields, sqlValue)
}
}
for _, version := range versions {
seq := versionSeqmap[version]
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := d.sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
sequences = append(sequences, seq)
}
}
}
return
}
var wg sync.WaitGroup
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
wg.Add(1)
go func() {
defer wg.Done()
onlineSequences = fetchDataFunc(d.table)
}()
if d.sequenceOfflineTable != "" {
wg.Add(1)
go func() {
defer wg.Done()
offlineSequences = fetchDataFunc(d.sequenceOfflineTable)
}()
}
wg.Wait()
if len(offlineSequences) > 0 {
index := 0
for index < len(onlineSequences) {
if onlineSequences[index].timestamp < offlineSequences[0].timestamp {
break
}
index++
}
onlineSequences = onlineSequences[:index]
onlineSequences = append(onlineSequences, offlineSequences...)
if len(onlineSequences) > d.sequenceLength {
onlineSequences = onlineSequences[:d.sequenceLength]
}
}
// seqeunce feature correspond to easyrec processor
sequencesValueMap := make(map[string][]string)
sequenceMap := make(map[string]bool, 0)
for _, seq := range onlineSequences {
key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
if _, exist := sequenceMap[key]; !exist {
sequenceMap[key] = true
sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId)
sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp))
sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event)
sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime))
sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp))
for index, field := range seq.dimensionFields {
if field.Valid {
sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String)
}
}
}
}
delim := d.sequenceDelim
if delim == "" {
delim = ";"
}
properties := make(map[string]interface{})
for key, value := range sequencesValueMap {
curSequenceSubName := (d.sequenceName + "__" + key)
properties[curSequenceSubName] = strings.Join(value, delim)
}
properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim)
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
}