func()

in module/feature_be_dao.go [502:631]


func (d *FeatureBeDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) {
	defer func() {
		if err := recover(); err != nil {
			stack := string(debug.Stack())
			log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
			return
		}
	}()

	comms := strings.Split(d.featureKey, ":")
	if len(comms) < 2 {
		log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
		return
	}

	key := user.StringProperty(comms[1])
	if key == "" {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1]))
		return
	}

	currTime := time.Now().Unix()
	var (
		item_feature_key_name      = "item_id"
		event_feature_key_name     = "event"
		play_time_feature_key_name = "play_time"
		timestamp_feature_key_name = "timestamp"
		ts_feature_key_name        = "ts"
		onlineSequences            []*sequenceInfo
	)

	if d.itemFeatureKeyName != "" {
		item_feature_key_name = d.itemFeatureKeyName
	}
	if d.eventFeatureKeyName != "" {
		event_feature_key_name = d.eventFeatureKeyName
	}
	if d.playTimeFeatureKeyName != "" {
		play_time_feature_key_name = d.playTimeFeatureKeyName
	}
	if d.timestampFeatureKeyName != "" {
		timestamp_feature_key_name = d.timestampFeatureKeyName
	}
	if d.tsFeatureKeyName != "" {
		ts_feature_key_name = d.tsFeatureKeyName
	}

	readRequest := be.NewReadRequest(d.bizName, d.sequenceLength)
	readRequest.IsRawRequest = true

	params := make(map[string]string)
	params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent)
	params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength)

	readRequest.SetQueryParams(params)

	if context.Debug {
		uri := readRequest.BuildUri()
		log.Info(fmt.Sprintf("requestId=%s\tevent=userSequenceFeatureFetch\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI()))
	}

	readResponse, err := d.beClient.Read(*readRequest)
	if err != nil {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=be error(%v)", context.RecommendId, err))
		return
	}

	matchItems := readResponse.Result.MatchItems
	if matchItems == nil || len(matchItems.FieldValues) == 0 {
		return
	}

	for _, values := range matchItems.FieldValues {
		seq := new(sequenceInfo)
		for i, value := range values {
			if matchItems.FieldNames[i] == d.beItemFeatureKeyName {
				seq.itemId = utils.ToString(value, "")
			} else if matchItems.FieldNames[i] == d.beEventFeatureKeyName {
				seq.event = utils.ToString(value, "")
			} else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName {
				seq.timestamp = utils.ToInt64(value, 0)
			} else if matchItems.FieldNames[i] == d.bePlayTimeFeatureKeyName {
				seq.playTime = utils.ToFloat(value, 0)
			}
		}
		if seq.itemId != "" && seq.event != "" {
			onlineSequences = append(onlineSequences, seq)
		}
	}

	// seqeunce feature correspond to easyrec processor
	sequencesValueMap := make(map[string][]string)
	sequenceMap := make(map[string]bool, len(onlineSequences))
	for _, seq := range onlineSequences {
		key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
		if _, exist := sequenceMap[key]; !exist {
			sequenceMap[key] = true
			sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId)
			sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp))
			sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event)
			if d.hasPlayTimeField {
				sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime))
			}
			sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp))
			for index, field := range seq.dimensionFields {
				if field.Valid {
					sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String)
				}
			}
		}
	}

	delim := d.sequenceDelim
	if delim == "" {
		delim = ";"
	}
	properties := make(map[string]interface{})
	for key, value := range sequencesValueMap {
		curSequenceSubName := (d.sequenceName + "__" + key)
		properties[curSequenceSubName] = strings.Join(value, delim)
	}
	properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim)

	if d.cacheFeaturesName != "" {
		user.AddCacheFeatures(d.cacheFeaturesName, properties)
	} else {
		user.AddProperties(properties)
	}

}