in dao/feature_view_hologres_dao.go [346:510]
func (d *FeatureViewHologresDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
selector := make([]string, 0, len(selectFields))
for _, field := range selectFields {
selector = append(selector, fmt.Sprintf("\"%s\"", field))
}
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
onlineFunc := func(userId interface{}) []map[string]interface{} {
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selector...)
builder.From(d.onlineTable)
where := []string{builder.Equal(fmt.Sprintf("\"%s\"", d.primaryKeyField), userId),
builder.GreaterThan(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField), currTime-86400*5)}
if len(events) > 0 {
where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), events...))
}
builder.Where(where...)
builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
sql, args := builder.Build()
stmtKey := crc32.ChecksumIEEE([]byte(sql))
stmt := d.getStmt(stmtKey)
if stmt == nil {
d.mu.Lock()
stmt = d.stmtMap[stmtKey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
d.mu.Unlock()
log.Println(err)
return nil
}
d.stmtMap[stmtKey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := stmt.Query(args...)
if err != nil {
log.Println(err)
return nil
}
defer rows.Close()
columns, _ := rows.ColumnTypes()
values := ColumnValues(columns)
result := make([]map[string]interface{}, 0, len(userIds)*len(events)*50)
for rows.Next() {
if err := rows.Scan(values...); err == nil {
properties := make(map[string]interface{}, len(values))
for i, column := range columns {
name := column.Name()
if value := ParseColumnValues(values[i]); value != nil {
properties[name] = value
}
}
if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
result = append(result, properties)
}
}
return result
}
offlineFunc := func(userId interface{}) []map[string]interface{} {
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selector...)
builder.From(d.offlineTable)
where := []string{builder.Equal(fmt.Sprintf("\"%s\"", d.primaryKeyField), userId)}
if len(events) > 0 {
where = append(where, builder.In(fmt.Sprintf("\"%s\"", sequenceConfig.EventField), events...))
}
builder.Where(where...)
builder.OrderBy(fmt.Sprintf("\"%s\"", sequenceConfig.TimestampField)).Desc()
sql, args := builder.Build()
stmtKey := crc32.ChecksumIEEE([]byte(sql))
stmt := d.getStmt(stmtKey)
if stmt == nil {
d.mu.Lock()
stmt = d.stmtMap[stmtKey]
if stmt == nil {
stmt2, err := d.db.Prepare(sql)
if err != nil {
d.mu.Unlock()
log.Println(err)
return nil
}
d.stmtMap[stmtKey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rows, err := stmt.Query(args...)
if err != nil {
log.Println(err)
return nil
}
defer rows.Close()
columns, _ := rows.ColumnTypes()
values := ColumnValues(columns)
result := make([]map[string]interface{}, 0, len(userIds)*len(events)*50)
for rows.Next() {
if err := rows.Scan(values...); err == nil {
properties := make(map[string]interface{}, len(values))
for i, column := range columns {
name := column.Name()
if value := ParseColumnValues(values[i]); value != nil {
properties[name] = value
}
}
if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
result = append(result, properties)
}
}
return result
}
results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
var outmu sync.Mutex
var wg sync.WaitGroup
for _, userId := range userIds {
wg.Add(1)
go func(userId interface{}) {
defer wg.Done()
var innerWg sync.WaitGroup
var offlineResult []map[string]interface{}
var onlineResult []map[string]interface{}
// offline table
innerWg.Add(1)
go func(userId interface{}) {
defer innerWg.Done()
offlineResult = offlineFunc(userId)
}(userId)
// online table
innerWg.Add(1)
go func(userId interface{}) {
defer innerWg.Done()
onlineResult = onlineFunc(userId)
}(userId)
innerWg.Wait()
if offlineResult == nil || onlineResult == nil {
fmt.Println("get user behavior feature failed")
return
}
combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField)
outmu.Lock()
results = append(results, combinedResult...)
outmu.Unlock()
}(userId)
}
wg.Wait()
return results, nil
}