module/feature_hologres_dao.go (623 lines of code) (raw):
package module
import (
gocontext "context"
"database/sql"
"errors"
"fmt"
"math"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/persist/holo"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/utils"
"github.com/alibaba/pairec/v2/utils/sqlutil"
"github.com/huandu/go-sqlbuilder"
)
type FeatureHologresDao struct {
*FeatureBaseDao
hasPlayTimeField bool
db *sql.DB
table string
userFeatureKeyName string
itemFeatureKeyName string
timestampFeatureKeyName string
eventFeatureKeyName string
playTimeFeatureKeyName string
tsFeatureKeyName string
userSelectFields string
itemSelectFields string
sequenceOfflineTable string
mu sync.RWMutex
userStmt *sql.Stmt
itemStmtMap map[int]*sql.Stmt
onlineSequenceStmt *sql.Stmt
offlineSequenceStmt *sql.Stmt
}
func NewFeatureHologresDao(config recconf.FeatureDaoConfig) *FeatureHologresDao {
dao := &FeatureHologresDao{
FeatureBaseDao: NewFeatureBaseDao(&config),
table: config.HologresTableName,
userFeatureKeyName: config.UserFeatureKeyName,
itemFeatureKeyName: config.ItemFeatureKeyName,
timestampFeatureKeyName: config.TimestampFeatureKeyName,
eventFeatureKeyName: config.EventFeatureKeyName,
playTimeFeatureKeyName: config.PlayTimeFeatureKeyName,
tsFeatureKeyName: config.TsFeatureKeyName,
userSelectFields: config.UserSelectFields,
itemSelectFields: config.ItemSelectFields,
sequenceOfflineTable: config.SequenceOfflineTableName,
itemStmtMap: make(map[int]*sql.Stmt),
hasPlayTimeField: true,
}
hologres, err := holo.GetPostgres(config.HologresName)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return nil
}
dao.db = hologres.DB
if config.NoUsePlayTimeField {
dao.hasPlayTimeField = false
}
if dao.itemSelectFields != "" {
fields := strings.Split(dao.itemSelectFields, ",")
if len(fields) > 0 && fields[0] != dao.itemFeatureKeyName {
selectFields := make([]string, 0, 1+len(fields))
selectFields = append(selectFields, dao.itemFeatureKeyName)
for _, field := range fields {
if field != "" {
selectFields = append(selectFields, field)
}
}
dao.itemSelectFields = strings.Join(selectFields, ",")
}
}
return dao
}
func (d *FeatureHologresDao) getItemStmt(key int) *sql.Stmt {
d.mu.RLock()
defer d.mu.RUnlock()
return d.itemStmtMap[key]
}
func (d *FeatureHologresDao) FeatureFetch(user *User, items []*Item, context *context.RecommendContext) {
if d.featureStore == Feature_Store_User && d.featureType == Feature_Type_Sequence {
d.userSequenceFeatureFetch(user, context)
} else if d.featureStore == Feature_Store_User {
d.userFeatureFetch(user, context)
} else {
d.itemsFeatureFetch(items, context)
}
}
func (d *FeatureHologresDao) userFeatureFetch(user *User, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=%v", context.RecommendId, err))
return
}
}()
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=property not found(%s)", context.RecommendId, comms[1]))
return
}
// hit user cache
if d.cache != nil {
if cacheValue, ok := d.cache.GetIfPresent(key); ok {
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, cacheValue.(map[string]interface{}))
} else {
user.AddProperties(cacheValue.(map[string]interface{}))
}
if context.Debug {
log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\tmsg=hit cache(%s)", context.RecommendId, key))
}
return
}
}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(d.userSelectFields)
builder.From(d.table)
builder.Where(builder.Equal(d.userFeatureKeyName, key))
sqlquery, args := builder.Build()
if d.userStmt == nil {
d.mu.Lock()
if d.userStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.userStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := d.userStmt.QueryContext(ctx, args...)
if err != nil {
if errors.Is(err, gocontext.DeadlineExceeded) {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\tevent=userFeatureFetch\ttable=%s\ttimeout=100", context.RecommendId, d.table))
return
}
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
defer rows.Close()
columns, err := rows.ColumnTypes()
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
values := sqlutil.ColumnValues(columns)
for rows.Next() {
if err := rows.Scan(values...); err == nil {
properties := make(map[string]interface{}, len(values))
for i, column := range columns {
name := column.Name()
if value := sqlutil.ParseColumnValues(values[i]); value != nil {
properties[name] = value
} else {
user.DeleteProperty(name)
}
}
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
if d.cache != nil {
d.cache.Put(key, properties)
}
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
}
type sequenceInfo struct {
itemId string
event string
playTime float64
timestamp int64
dimensionFields []sql.NullString
}
func (d *FeatureHologresDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
return
}
}()
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=property not found(%s)", context.RecommendId, comms[1]))
return
}
currTime := time.Now().Unix()
var onlineSequences []*sequenceInfo
item_feature_key_name := "item_id"
if d.itemFeatureKeyName != "" {
item_feature_key_name = d.itemFeatureKeyName
}
event_feature_key_name := "event"
if d.eventFeatureKeyName != "" {
event_feature_key_name = d.eventFeatureKeyName
}
play_time_feature_key_name := "play_time"
if d.playTimeFeatureKeyName != "" {
play_time_feature_key_name = d.playTimeFeatureKeyName
}
timestamp_feature_key_name := "timestamp"
if d.timestampFeatureKeyName != "" {
timestamp_feature_key_name = d.timestampFeatureKeyName
}
ts_feature_key_name := "ts"
if d.tsFeatureKeyName != "" {
ts_feature_key_name = d.tsFeatureKeyName
}
origin_sequence_event_selections := strings.Split(d.sequenceEvent, ",")
sequence_event_selections := make([]interface{}, len(origin_sequence_event_selections))
for i, v := range origin_sequence_event_selections {
sequence_event_selections[i] = v
}
var selectFields []string
if d.hasPlayTimeField {
selectFields = []string{item_feature_key_name, event_feature_key_name, play_time_feature_key_name, timestamp_feature_key_name}
} else {
selectFields = []string{item_feature_key_name, event_feature_key_name, timestamp_feature_key_name}
}
if len(d.sequenceDimFields) > 0 {
selectFields = append(selectFields, d.sequenceDimFields...)
}
onlineFunc := func() {
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.table)
where := []string{builder.Equal(d.userFeatureKeyName, key), builder.GreaterThan(timestamp_feature_key_name, currTime-86400*5)}
if d.sequenceEvent != "" {
if len(sequence_event_selections) > 1 {
where = append(where, builder.In(event_feature_key_name, sequence_event_selections...))
} else {
where = append(where, builder.Equal(event_feature_key_name, d.sequenceEvent))
}
}
builder.Where(where...).Limit(d.sequenceLength)
builder.OrderBy(timestamp_feature_key_name).Desc()
sqlquery, args := builder.Build()
if d.onlineSequenceStmt == nil {
d.mu.Lock()
if d.onlineSequenceStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.onlineSequenceStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := d.onlineSequenceStmt.QueryContext(ctx, args...)
if err != nil {
if errors.Is(err, gocontext.DeadlineExceeded) {
log.Warning("module=FeatureHologresDao\tevent=userSequenceFeatureFetch\ttimeout=100")
return
}
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
defer rows.Close()
for rows.Next() {
seq := new(sequenceInfo)
var dst []interface{}
if d.hasPlayTimeField {
dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
} else {
dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
}
if len(d.sequenceDimFields) > 0 {
seq.dimensionFields = make([]sql.NullString, len(d.sequenceDimFields))
for i := range seq.dimensionFields {
dst = append(dst, &seq.dimensionFields[i])
}
}
if err := rows.Scan(dst...); err == nil {
if t, exist := d.sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
onlineSequences = append(onlineSequences, seq)
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
}
var offlineSequences []*sequenceInfo
offlineFunc := func() {
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(selectFields...)
builder.From(d.sequenceOfflineTable)
where := []string{builder.Equal(d.userFeatureKeyName, key)}
if d.sequenceEvent != "" {
if len(sequence_event_selections) > 1 {
where = append(where, builder.In(event_feature_key_name, sequence_event_selections...))
} else {
where = append(where, builder.Equal(event_feature_key_name, d.sequenceEvent))
}
}
builder.Where(where...).Limit(d.sequenceLength)
builder.OrderBy(timestamp_feature_key_name).Desc()
sqlquery, args := builder.Build()
if d.offlineSequenceStmt == nil {
d.mu.Lock()
if d.offlineSequenceStmt == nil {
stmt, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.offlineSequenceStmt = stmt
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
rows, err := d.offlineSequenceStmt.QueryContext(ctx, args...)
if err != nil {
if errors.Is(err, gocontext.DeadlineExceeded) {
log.Warning("module=FeatureHologresDao\tevent=userSequenceFeatureFetch\ttimeout=100")
return
}
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
defer rows.Close()
for rows.Next() {
seq := new(sequenceInfo)
var dst []interface{}
if d.hasPlayTimeField {
dst = []interface{}{&seq.itemId, &seq.event, &seq.playTime, &seq.timestamp}
} else {
dst = []interface{}{&seq.itemId, &seq.event, &seq.timestamp}
}
if len(d.sequenceDimFields) > 0 {
seq.dimensionFields = make([]sql.NullString, len(d.sequenceDimFields))
for i := range seq.dimensionFields {
dst = append(dst, &seq.dimensionFields[i])
}
}
if err := rows.Scan(dst...); err == nil {
if t, exist := d.sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
offlineSequences = append(offlineSequences, seq)
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
onlineFunc()
}()
if d.sequenceOfflineTable != "" {
wg.Add(1)
go func() {
defer wg.Done()
offlineFunc()
}()
}
wg.Wait()
if len(offlineSequences) > 0 {
index := 0
for index < len(onlineSequences) {
if onlineSequences[index].timestamp < offlineSequences[0].timestamp {
break
}
index++
}
onlineSequences = onlineSequences[:index]
onlineSequences = append(onlineSequences, offlineSequences...)
if len(onlineSequences) > d.sequenceLength {
onlineSequences = onlineSequences[:d.sequenceLength]
}
}
// seqeunce feature correspond to easyrec processor
sequencesValueMap := make(map[string][]string)
sequenceMap := make(map[string]bool, 0)
for _, seq := range onlineSequences {
key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
if _, exist := sequenceMap[key]; !exist {
sequenceMap[key] = true
sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId)
sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp))
sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event)
if d.hasPlayTimeField {
sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime))
}
sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp))
for index, field := range seq.dimensionFields {
if field.Valid {
sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String)
}
}
}
}
delim := d.sequenceDelim
if delim == "" {
delim = ";"
}
properties := make(map[string]interface{})
for key, value := range sequencesValueMap {
curSequenceSubName := (d.sequenceName + "__" + key)
properties[curSequenceSubName] = strings.Join(value, delim)
//user.AddProperty(curSequenceSubName, strings.Join(value, delim))
}
properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim)
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
//user.AddProperty(d.sequenceName, strings.Join(sequencesValueMap[item_feature_key_name], delim))
}
func (d *FeatureHologresDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=%v", context.RecommendId, err))
return
}
}()
if len(items) == 0 {
return
}
fk := d.featureKey
if fk != "item:id" {
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey))
return
}
fk = comms[1]
}
requestCount := 600
cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(requestCount))), 1)
requestCh := make(chan []*Item, cpuCount)
defer close(requestCh)
if cpuCount == 1 {
requestCh <- items
} else {
maps := make(map[int][]*Item)
index := 0
for i, item := range items {
maps[index%cpuCount] = append(maps[index%cpuCount], item)
if (i+1)%requestCount == 0 {
index++
}
}
for _, itemlist := range maps {
requestCh <- itemlist
}
}
var wg sync.WaitGroup
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case itemlist := <-requestCh:
var keys []interface{}
key2Item := make(map[string]*Item, len(itemlist))
for _, item := range itemlist {
var key string
if fk == "item:id" {
key = string(item.Id)
} else {
key = item.StringProperty(fk)
}
if d.cache != nil {
if cacheValue, ok := d.cache.GetIfPresent(key); ok {
item.AddProperties(cacheValue.(map[string]interface{}))
if context.Debug {
item.AddProperty("__debug_cache_hit__", true)
}
continue
}
}
keys = append(keys, key)
key2Item[key] = item
}
if len(keys) == 0 {
return
}
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(d.itemSelectFields)
builder.From(d.table)
if len(keys) < requestCount {
c := requestCount - len(keys)
for i := 0; i < c; i++ {
keys = append(keys, "-1")
}
}
builder.Where(builder.In(d.itemFeatureKeyName, keys...))
sqlquery, args := builder.Build()
stmtkey := len(keys)
stmt := d.getItemStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.itemStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.itemStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rowsChannel := make(chan *sql.Rows, 1)
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
// async invoke sql query
go func() {
rows, err := stmt.Query(args...)
if err != nil {
rowsChannel <- nil
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
// check query is timeout
select {
case <-ctx.Done():
if rows != nil {
rows.Close()
}
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=%v", context.RecommendId, ctx.Err()))
return
default:
}
rowsChannel <- rows
}()
var rows *sql.Rows
select {
case <-ctx.Done():
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, ctx.Err()))
return
case rows = <-rowsChannel:
if rows == nil {
return
}
}
defer rows.Close()
columns, err := rows.ColumnTypes()
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
return
}
values := sqlutil.ColumnValues(columns)
var key string
for rows.Next() {
if err := rows.Scan(values...); err == nil {
var item *Item
properties := make(map[string]interface{}, len(values))
key = ""
for i, column := range columns {
name := column.Name()
val := values[i]
if i == 0 {
if value := sqlutil.ParseColumnValues(val); value != nil {
key = utils.ToString(value, "")
}
if key == "" {
break
}
item = key2Item[key]
continue
}
if value := sqlutil.ParseColumnValues(val); value != nil {
properties[name] = value
}
}
if nil != item && len(properties) > 0 {
item.AddProperties(properties)
if d.cache != nil {
d.cache.Put(key, properties)
}
}
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\terror=hologres error(%v)", context.RecommendId, err))
}
}
default:
}
}()
}
wg.Wait()
}