in module/feature_be_dao.go [502:631]
func (d *FeatureBeDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
return
}
}()
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1]))
return
}
currTime := time.Now().Unix()
var (
item_feature_key_name = "item_id"
event_feature_key_name = "event"
play_time_feature_key_name = "play_time"
timestamp_feature_key_name = "timestamp"
ts_feature_key_name = "ts"
onlineSequences []*sequenceInfo
)
if d.itemFeatureKeyName != "" {
item_feature_key_name = d.itemFeatureKeyName
}
if d.eventFeatureKeyName != "" {
event_feature_key_name = d.eventFeatureKeyName
}
if d.playTimeFeatureKeyName != "" {
play_time_feature_key_name = d.playTimeFeatureKeyName
}
if d.timestampFeatureKeyName != "" {
timestamp_feature_key_name = d.timestampFeatureKeyName
}
if d.tsFeatureKeyName != "" {
ts_feature_key_name = d.tsFeatureKeyName
}
readRequest := be.NewReadRequest(d.bizName, d.sequenceLength)
readRequest.IsRawRequest = true
params := make(map[string]string)
params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent)
params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength)
readRequest.SetQueryParams(params)
if context.Debug {
uri := readRequest.BuildUri()
log.Info(fmt.Sprintf("requestId=%s\tevent=userSequenceFeatureFetch\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI()))
}
readResponse, err := d.beClient.Read(*readRequest)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=be error(%v)", context.RecommendId, err))
return
}
matchItems := readResponse.Result.MatchItems
if matchItems == nil || len(matchItems.FieldValues) == 0 {
return
}
for _, values := range matchItems.FieldValues {
seq := new(sequenceInfo)
for i, value := range values {
if matchItems.FieldNames[i] == d.beItemFeatureKeyName {
seq.itemId = utils.ToString(value, "")
} else if matchItems.FieldNames[i] == d.beEventFeatureKeyName {
seq.event = utils.ToString(value, "")
} else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName {
seq.timestamp = utils.ToInt64(value, 0)
} else if matchItems.FieldNames[i] == d.bePlayTimeFeatureKeyName {
seq.playTime = utils.ToFloat(value, 0)
}
}
if seq.itemId != "" && seq.event != "" {
onlineSequences = append(onlineSequences, seq)
}
}
// seqeunce feature correspond to easyrec processor
sequencesValueMap := make(map[string][]string)
sequenceMap := make(map[string]bool, len(onlineSequences))
for _, seq := range onlineSequences {
key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
if _, exist := sequenceMap[key]; !exist {
sequenceMap[key] = true
sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId)
sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp))
sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event)
if d.hasPlayTimeField {
sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime))
}
sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp))
for index, field := range seq.dimensionFields {
if field.Valid {
sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String)
}
}
}
}
delim := d.sequenceDelim
if delim == "" {
delim = ";"
}
properties := make(map[string]interface{})
for key, value := range sequencesValueMap {
curSequenceSubName := (d.sequenceName + "__" + key)
properties[curSequenceSubName] = strings.Join(value, delim)
}
properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim)
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
}