in dao/feature_view_hologres_dao.go [130:344]
func (d *FeatureViewHologresDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
var selectFields []string
if sequenceConfig.PlayTimeField == "" {
selectFields = []string{fmt.Sprintf("\"%s\"", sequenceConfig.ItemIdField), fmt.Sprintf("\"%s\"", sequenceConfig.EventField),
fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)}
} else {
selectFields = []string{fmt.Sprintf("\"%s\"", sequenceConfig.ItemIdField), fmt.Sprintf("\"%s\"", sequenceConfig.EventField),
fmt.Sprintf("\"%s\"", sequenceConfig.PlayTimeField), fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)}
}
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
onlineFunc := func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) []*sequenceInfo {
onlineSequences := []*sequenceInfo{}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.onlineTable)
where := []string{builder.Equal(fmt.Sprintf("\"%s\"", userIdField), key),
builder.GreaterThan(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField), currTime-86400*5)}
if len(sequence_events) > 1 {
where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), sequence_events...))
} else {
where = append(where, builder.Equal(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), seqEvent))
}
builder.Where(where...)
builder.Limit(seqLen)
builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
sql, args := builder.Build()
stmtKey := crc32.ChecksumIEEE([]byte(sql))
stmt := d.getStmt(stmtKey)
if stmt == nil {
d.mu.Lock()
stmt = d.stmtMap[stmtKey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
d.mu.Unlock()
log.Println(err)
return nil
}
d.stmtMap[stmtKey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := stmt.Query(args...)
if err != nil {
log.Println(err)
return nil
}
defer rows.Close()
for rows.Next() {
seq := new(sequenceInfo)
var dst []interface{}
if sequenceConfig.PlayTimeField == "" {
dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
} else {
dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
}
if err := rows.Scan(dst...); err == nil {
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
onlineSequences = append(onlineSequences, seq)
} else {
log.Println(err)
return nil
}
}
return onlineSequences
}
offlineFunc := func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) []*sequenceInfo {
offlineSequences := []*sequenceInfo{}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.offlineTable)
where := []string{builder.Equal(fmt.Sprintf("\"%s\"", userIdField), key)}
if len(sequence_events) > 1 {
where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), sequence_events...))
} else {
where = append(where, builder.Equal(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), seqEvent))
}
builder.Where(where...)
builder.Limit(seqLen)
builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
sql, args := builder.Build()
stmtKey := crc32.ChecksumIEEE([]byte(sql))
stmt := d.getStmt(stmtKey)
if stmt == nil {
d.mu.Lock()
stmt = d.stmtMap[stmtKey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
d.mu.Unlock()
log.Println(err)
return nil
}
d.stmtMap[stmtKey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := stmt.Query(args...)
if err != nil {
log.Println(err)
return nil
}
defer rows.Close()
for rows.Next() {
seq := new(sequenceInfo)
var dst []interface{}
if sequenceConfig.PlayTimeField == "" {
dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
} else {
dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
}
if err := rows.Scan(dst...); err == nil {
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
offlineSequences = append(offlineSequences, seq)
} else {
log.Println(err)
return nil
}
}
return offlineSequences
}
results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key interface{}) {
defer wg.Done()
properties := make(map[string]interface{})
var mu sync.Mutex
var eventWg sync.WaitGroup
for _, seqConfig := range onlineConfig {
eventWg.Add(1)
go func(seqConfig *api.SeqConfig) {
defer eventWg.Done()
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
origin_sequence_events := strings.Split(seqConfig.SeqEvent, "|")
sequence_events := make([]interface{}, len(origin_sequence_events))
for i, v := range origin_sequence_events {
sequence_events[i] = v
}
var innerWg sync.WaitGroup
//get data from online table
innerWg.Add(1)
go func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) {
defer innerWg.Done()
if onlineresult := onlineFunc(seqEvent, sequence_events, seqLen, key); onlineresult != nil {
onlineSequences = onlineresult
}
}(seqConfig.SeqEvent, sequence_events, seqConfig.SeqLen, key)
//get data from offline table
innerWg.Add(1)
go func(seqEvent string, sequence_events []interface{}, seqLen int, key interface{}) {
defer innerWg.Done()
if offlineresult := offlineFunc(seqEvent, sequence_events, seqLen, key); offlineresult != nil {
offlineSequences = offlineresult
}
}(seqConfig.SeqEvent, sequence_events, seqConfig.SeqLen, key)
innerWg.Wait()
subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
mu.Lock()
defer mu.Unlock()
for k, value := range subproperties {
properties[k] = value
}
}(seqConfig)
}
eventWg.Wait()
properties[userIdField] = key
outmu.Lock()
results = append(results, properties)
outmu.Unlock()
}(key)
}
wg.Wait()
return results, nil
}