func()

in module/feature_hologres_dao.go [212:487]


func (d *FeatureHologresDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) {
	defer func() {
		if err := recover(); err != nil {
			stack := string(debug.Stack())
			log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
			return
		}
	}()

	comms := strings.Split(d.featureKey, ":")
	if len(comms) < 2 {
		log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
		return
	}

	key := user.StringProperty(comms[1])
	if key == "" {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=property not found(%s)", context.RecommendId, comms[1]))
		return
	}

	currTime := time.Now().Unix()
	var onlineSequences []*sequenceInfo
	item_feature_key_name := "item_id"
	if d.itemFeatureKeyName != "" {
		item_feature_key_name = d.itemFeatureKeyName
	}
	event_feature_key_name := "event"
	if d.eventFeatureKeyName != "" {
		event_feature_key_name = d.eventFeatureKeyName
	}
	play_time_feature_key_name := "play_time"
	if d.playTimeFeatureKeyName != "" {
		play_time_feature_key_name = d.playTimeFeatureKeyName
	}
	timestamp_feature_key_name := "timestamp"
	if d.timestampFeatureKeyName != "" {
		timestamp_feature_key_name = d.timestampFeatureKeyName
	}
	ts_feature_key_name := "ts"
	if d.tsFeatureKeyName != "" {
		ts_feature_key_name = d.tsFeatureKeyName
	}

	origin_sequence_event_selections := strings.Split(d.sequenceEvent, ",")
	sequence_event_selections := make([]interface{}, len(origin_sequence_event_selections))
	for i, v := range origin_sequence_event_selections {
		sequence_event_selections[i] = v
	}

	var selectFields []string
	if d.hasPlayTimeField {
		selectFields = []string{item_feature_key_name, event_feature_key_name, play_time_feature_key_name, timestamp_feature_key_name}
	} else {
		selectFields = []string{item_feature_key_name, event_feature_key_name, timestamp_feature_key_name}
	}
	if len(d.sequenceDimFields) > 0 {
		selectFields = append(selectFields, d.sequenceDimFields...)
	}
	onlineFunc := func() {
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selectFields...)
		builder.From(d.table)
		where := []string{builder.Equal(d.userFeatureKeyName, key), builder.GreaterThan(timestamp_feature_key_name, currTime-86400*5)}
		if d.sequenceEvent != "" {
			if len(sequence_event_selections) > 1 {
				where = append(where, builder.In(event_feature_key_name, sequence_event_selections...))
			} else {
				where = append(where, builder.Equal(event_feature_key_name, d.sequenceEvent))
			}
		}
		builder.Where(where...).Limit(d.sequenceLength)
		builder.OrderBy(timestamp_feature_key_name).Desc()

		sqlquery, args := builder.Build()
		if d.onlineSequenceStmt == nil {
			d.mu.Lock()
			if d.onlineSequenceStmt == nil {
				stmt, err := d.db.Prepare(sqlquery)
				if err != nil {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
					d.mu.Unlock()
					return
				}
				d.onlineSequenceStmt = stmt
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}
		ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
		defer cancel()
		rows, err := d.onlineSequenceStmt.QueryContext(ctx, args...)
		if err != nil {
			if errors.Is(err, gocontext.DeadlineExceeded) {
				log.Warning("module=FeatureHologresDao\tevent=userSequenceFeatureFetch\ttimeout=100")
				return
			}
			log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
			return
		}

		defer rows.Close()
		for rows.Next() {
			seq := new(sequenceInfo)
			var dst []interface{}
			if d.hasPlayTimeField {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
			} else {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
			}
			if len(d.sequenceDimFields) > 0 {
				seq.dimensionFields = make([]sql.NullString, len(d.sequenceDimFields))
				for i := range seq.dimensionFields {
					dst = append(dst, &seq.dimensionFields[i])
				}
			}
			if err := rows.Scan(dst...); err == nil {
				if t, exist := d.sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}
				onlineSequences = append(onlineSequences, seq)
			} else {
				log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
			}
		}

	}

	var offlineSequences []*sequenceInfo
	offlineFunc := func() {
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selectFields...)
		builder.From(d.sequenceOfflineTable)
		where := []string{builder.Equal(d.userFeatureKeyName, key)}
		if d.sequenceEvent != "" {
			if len(sequence_event_selections) > 1 {
				where = append(where, builder.In(event_feature_key_name, sequence_event_selections...))
			} else {
				where = append(where, builder.Equal(event_feature_key_name, d.sequenceEvent))
			}
		}
		builder.Where(where...).Limit(d.sequenceLength)
		builder.OrderBy(timestamp_feature_key_name).Desc()

		sqlquery, args := builder.Build()
		if d.offlineSequenceStmt == nil {
			d.mu.Lock()
			if d.offlineSequenceStmt == nil {
				stmt, err := d.db.Prepare(sqlquery)
				if err != nil {
					log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
					d.mu.Unlock()
					return
				}
				d.offlineSequenceStmt = stmt
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}
		ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
		defer cancel()
		rows, err := d.offlineSequenceStmt.QueryContext(ctx, args...)
		if err != nil {
			if errors.Is(err, gocontext.DeadlineExceeded) {
				log.Warning("module=FeatureHologresDao\tevent=userSequenceFeatureFetch\ttimeout=100")
				return
			}
			log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
			return
		}

		defer rows.Close()
		for rows.Next() {
			seq := new(sequenceInfo)
			var dst []interface{}
			if d.hasPlayTimeField {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
			} else {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
			}
			if len(d.sequenceDimFields) > 0 {
				seq.dimensionFields = make([]sql.NullString, len(d.sequenceDimFields))
				for i := range seq.dimensionFields {
					dst = append(dst, &seq.dimensionFields[i])
				}
			}
			if err := rows.Scan(dst...); err == nil {
				if t, exist := d.sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}
				offlineSequences = append(offlineSequences, seq)
			} else {
				log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
			}
		}

	}

	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		onlineFunc()
	}()
	if d.sequenceOfflineTable != "" {
		wg.Add(1)
		go func() {
			defer wg.Done()
			offlineFunc()
		}()
	}
	wg.Wait()

	if len(offlineSequences) > 0 {
		index := 0
		for index < len(onlineSequences) {
			if onlineSequences[index].timestamp < offlineSequences[0].timestamp {
				break
			}
			index++
		}

		onlineSequences = onlineSequences[:index]
		onlineSequences = append(onlineSequences, offlineSequences...)
		if len(onlineSequences) > d.sequenceLength {
			onlineSequences = onlineSequences[:d.sequenceLength]
		}
	}

	// seqeunce feature correspond to easyrec processor
	sequencesValueMap := make(map[string][]string)
	sequenceMap := make(map[string]bool, 0)
	for _, seq := range onlineSequences {
		key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
		if _, exist := sequenceMap[key]; !exist {
			sequenceMap[key] = true
			sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId)
			sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp))
			sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event)
			if d.hasPlayTimeField {
				sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime))
			}
			sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp))
			for index, field := range seq.dimensionFields {
				if field.Valid {
					sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String)
				}
			}
		}
	}
	delim := d.sequenceDelim
	if delim == "" {
		delim = ";"
	}
	properties := make(map[string]interface{})
	for key, value := range sequencesValueMap {
		curSequenceSubName := (d.sequenceName + "__" + key)
		properties[curSequenceSubName] = strings.Join(value, delim)
		//user.AddProperty(curSequenceSubName, strings.Join(value, delim))
	}
	properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim)

	if d.cacheFeaturesName != "" {
		user.AddCacheFeatures(d.cacheFeaturesName, properties)
	} else {
		user.AddProperties(properties)
	}
	//user.AddProperty(d.sequenceName, strings.Join(sequencesValueMap[item_feature_key_name], delim))
}