func()

in dao/feature_view_hologres_dao.go [130:344]


func (d *FeatureViewHologresDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
	var selectFields []string
	if sequenceConfig.PlayTimeField == "" {
		selectFields = []string{fmt.Sprintf("\"%s\"", sequenceConfig.ItemIdField), fmt.Sprintf("\"%s\"", sequenceConfig.EventField),
			fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)}
	} else {
		selectFields = []string{fmt.Sprintf("\"%s\"", sequenceConfig.ItemIdField), fmt.Sprintf("\"%s\"", sequenceConfig.EventField),
			fmt.Sprintf("\"%s\"", sequenceConfig.PlayTimeField), fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)}
	}
	currTime := time.Now().Unix()
	sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)

	onlineFunc := func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) []*sequenceInfo {
		onlineSequences := []*sequenceInfo{}
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selectFields...)
		builder.From(d.onlineTable)
		where := []string{builder.Equal(fmt.Sprintf("\"%s\"", userIdField), key),
			builder.GreaterThan(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField), currTime-86400*5)}
		if len(sequence_events) > 1 {
			where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), sequence_events...))
		} else {
			where = append(where, builder.Equal(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), seqEvent))
		}
		builder.Where(where...)
		builder.Limit(seqLen)
		builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()

		sql, args := builder.Build()
		stmtKey := crc32.ChecksumIEEE([]byte(sql))
		stmt := d.getStmt(stmtKey)
		if stmt == nil {
			d.mu.Lock()
			stmt = d.stmtMap[stmtKey]
			if stmt == nil {
				stmt2, err := d.db.Prepare(sql)
				if err != nil {
					d.mu.Unlock()
					log.Println(err)
					return nil
				}
				d.stmtMap[stmtKey] = stmt2
				stmt = stmt2
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}
		rows, err := stmt.Query(args...)
		if err != nil {
			log.Println(err)
			return nil
		}
		defer rows.Close()
		for rows.Next() {
			seq := new(sequenceInfo)
			var dst []interface{}
			if sequenceConfig.PlayTimeField == "" {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
			} else {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
			}
			if err := rows.Scan(dst...); err == nil {
				if seq.event == "" || seq.itemId == "" {
					continue
				}
				if t, exist := sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}
				onlineSequences = append(onlineSequences, seq)
			} else {
				log.Println(err)
				return nil
			}
		}

		return onlineSequences
	}

	offlineFunc := func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) []*sequenceInfo {
		offlineSequences := []*sequenceInfo{}
		builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
		builder.Select(selectFields...)
		builder.From(d.offlineTable)
		where := []string{builder.Equal(fmt.Sprintf("\"%s\"", userIdField), key)}
		if len(sequence_events) > 1 {
			where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), sequence_events...))
		} else {
			where = append(where, builder.Equal(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), seqEvent))
		}
		builder.Where(where...)
		builder.Limit(seqLen)
		builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()

		sql, args := builder.Build()
		stmtKey := crc32.ChecksumIEEE([]byte(sql))
		stmt := d.getStmt(stmtKey)
		if stmt == nil {
			d.mu.Lock()
			stmt = d.stmtMap[stmtKey]
			if stmt == nil {
				stmt2, err := d.db.Prepare(sql)
				if err != nil {
					d.mu.Unlock()
					log.Println(err)
					return nil
				}
				d.stmtMap[stmtKey] = stmt2
				stmt = stmt2
				d.mu.Unlock()
			} else {
				d.mu.Unlock()
			}
		}

		rows, err := stmt.Query(args...)
		if err != nil {
			log.Println(err)
			return nil
		}
		defer rows.Close()
		for rows.Next() {
			seq := new(sequenceInfo)
			var dst []interface{}
			if sequenceConfig.PlayTimeField == "" {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
			} else {
				dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
			}
			if err := rows.Scan(dst...); err == nil {
				if seq.event == "" || seq.itemId == "" {
					continue
				}
				if t, exist := sequencePlayTimeMap[seq.event]; exist {
					if seq.playTime <= t {
						continue
					}
				}
				offlineSequences = append(offlineSequences, seq)
			} else {
				log.Println(err)
				return nil
			}
		}

		return offlineSequences

	}

	results := make([]map[string]interface{}, 0, len(keys))
	var outmu sync.Mutex

	var wg sync.WaitGroup
	for _, key := range keys {
		wg.Add(1)
		go func(key interface{}) {
			defer wg.Done()
			properties := make(map[string]interface{})
			var mu sync.Mutex

			var eventWg sync.WaitGroup
			for _, seqConfig := range onlineConfig {
				eventWg.Add(1)
				go func(seqConfig *api.SeqConfig) {
					defer eventWg.Done()
					var onlineSequences []*sequenceInfo
					var offlineSequences []*sequenceInfo

					origin_sequence_events := strings.Split(seqConfig.SeqEvent, "|")
					sequence_events := make([]interface{}, len(origin_sequence_events))
					for i, v := range origin_sequence_events {
						sequence_events[i] = v
					}
					var innerWg sync.WaitGroup
					//get data from online table
					innerWg.Add(1)
					go func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) {
						defer innerWg.Done()
						if onlineresult := onlineFunc(seqEvent, sequence_events, seqLen, key); onlineresult != nil {
							onlineSequences = onlineresult
						}
					}(seqConfig.SeqEvent, sequence_events, seqConfig.SeqLen, key)
					//get data from offline table
					innerWg.Add(1)
					go func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) {
						defer innerWg.Done()
						if offlineresult := offlineFunc(seqEvent, sequence_events, seqLen, key); offlineresult != nil {
							offlineSequences = offlineresult
						}
					}(seqConfig.SeqEvent, sequence_events, seqConfig.SeqLen, key)
					innerWg.Wait()

					subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
					mu.Lock()
					defer mu.Unlock()
					for k, value := range subproperties {
						properties[k] = value
					}
				}(seqConfig)
			}
			eventWg.Wait()
			properties[userIdField] = key
			outmu.Lock()
			results = append(results, properties)
			outmu.Unlock()
		}(key)
	}

	wg.Wait()

	return results, nil

}