module/user_item_exposure_hologres_dao.go (228 lines of code) (raw):

package module import ( gocontext "context" "database/sql" "fmt" "strings" "sync" "time" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/holo" "github.com/alibaba/pairec/v2/recconf" "github.com/huandu/go-sqlbuilder" ) /** create table ddl BEGIN; CREATE TABLE "public"."t_exposure_history" ( "uid" text NOT NULL, "item" text NOT NULL, "create_time" int4 NOT NULL ); CALL SET_TABLE_PROPERTY('"public"."t_exposure_history"', 'clustering_key', '"uid","create_time"'); CALL SET_TABLE_PROPERTY('"public"."t_exposure_history"', 'segment_key', '"create_time"'); CALL SET_TABLE_PROPERTY('"public"."t_exposure_history"', 'bitmap_columns', '"uid","item"'); CALL SET_TABLE_PROPERTY('"public"."t_exposure_history"', 'dictionary_encoding_columns', '"uid","item"'); CALL SET_TABLE_PROPERTY('"public"."t_exposure_history"', 'time_to_live_in_seconds', '86400'); comment on table "public"."t_exposure_history" is '曝光记录表'; COMMIT; **/ var ( holo_exposure_insert_sql = "INSERT INTO %s (uid, item, create_time) VALUES($1, $2, $3)" ) type User2ItemExposureHologresDao struct { db *sql.DB table string maxItems int timeInterval int // second mu sync.Mutex insertStmt *sql.Stmt selectStmt *sql.Stmt generateItemDataFuncName string writeLogExcludeScenes map[string]bool clearLogScene string onlyLogUserExposeFlag bool } func NewUser2ItemExposureHologresDao(config recconf.FilterConfig) *User2ItemExposureHologresDao { dao := &User2ItemExposureHologresDao{ maxItems: -1, timeInterval: -1, generateItemDataFuncName: config.GenerateItemDataFuncName, writeLogExcludeScenes: make(map[string]bool), clearLogScene: config.ClearLogIfNotEnoughScene, onlyLogUserExposeFlag: config.OnlyLogUserExposeFlag, } hologres, err := holo.GetPostgres(config.DaoConf.HologresName) if err != nil { log.Error(fmt.Sprintf("%v", err)) return nil } dao.db = hologres.DB dao.table = config.DaoConf.HologresTableName if config.MaxItems > 0 { dao.maxItems = config.MaxItems } if config.TimeInterval > 0 { dao.timeInterval = config.TimeInterval } for _, scene := range config.WriteLogExcludeScenes { dao.writeLogExcludeScenes[scene] = true } return dao } func (d *User2ItemExposureHologresDao) LogHistory(user *User, items []*Item, context *context.RecommendContext) { scene := context.GetParameter("scene").(string) if _, exist := d.writeLogExcludeScenes[scene]; exist { return } if len(items) == 0 { log.Warning(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureHologresDao\terr=items empty", context.RecommendId)) return } uid := string(user.Id) if d.insertStmt == nil { d.mu.Lock() if d.insertStmt == nil { stmt, err := d.db.Prepare(fmt.Sprintf(holo_exposure_insert_sql, d.table)) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureHologresDao\tuid=%s\terr=%v", context.RecommendId, uid, err)) d.mu.Unlock() return } d.insertStmt = stmt d.mu.Unlock() } else { d.mu.Unlock() } } createTime := time.Now().Unix() var ret string for _, item := range items { itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(user.Id, item) ret = ret + "," + itemData } ret = ret[1:] _, err := d.insertStmt.Exec(uid, ret, createTime) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureHologresDao\tuid=%s\terr=%v", context.RecommendId, user.Id, err)) return } log.Info(fmt.Sprintf("requestId=%s\tscene=%s\tuid=%s\tmsg=log history success", context.RecommendId, scene, user.Id)) } func (d *User2ItemExposureHologresDao) FilterByHistory(uid UID, items []*Item) (ret []*Item) { builder := sqlbuilder.PostgreSQL.NewSelectBuilder() builder.Select("item") builder.From(d.table) builder.Where(builder.Equal("uid", string(uid))) if d.timeInterval > 0 { t := time.Now().Unix() - int64(d.timeInterval) builder.Where(builder.GreaterEqualThan("create_time", t)) } builder.OrderBy("create_time desc") if d.maxItems > 0 { builder.Limit(d.maxItems) } sql, args := builder.Build() if d.selectStmt == nil { d.mu.Lock() if d.selectStmt == nil { stmt, err := d.db.Prepare(sql) if err != nil { log.Error(fmt.Sprintf("module=User2ItemExposureHologresDao\tuid=%s\terr=%v", uid, err)) ret = items d.mu.Unlock() return } d.selectStmt = stmt d.mu.Unlock() } else { d.mu.Unlock() } } ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond) defer cancel() rows, err := d.selectStmt.QueryContext(ctx, args...) if err != nil { log.Error(fmt.Sprintf("module=User2ItemExposureHologresDao\tuid=%s\terr=%v", uid, err)) ret = items return } defer rows.Close() fiterIds := make(map[string]bool) for rows.Next() { var itemDatas string if err := rows.Scan(&itemDatas); err == nil { ids := strings.Split(itemDatas, ",") for _, id := range ids { fiterIds[id] = true } } } if d.onlyLogUserExposeFlag { for _, item := range items { itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(uid, item) if _, ok := fiterIds[itemData]; ok { item.AddProperty("_is_exposure_", 1) } } ret = items } else { for _, item := range items { itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(uid, item) if _, ok := fiterIds[itemData]; !ok { ret = append(ret, item) } } } return } func (d *User2ItemExposureHologresDao) ClearHistory(user *User, context *context.RecommendContext) { scene := context.GetParameter("scene").(string) if scene != d.clearLogScene { return } db := sqlbuilder.PostgreSQL.NewDeleteBuilder() db.DeleteFrom(d.table) db.Where(db.Equal("uid", string(user.Id))) deleteSql, args := db.Build() _, err := d.db.Exec(deleteSql, args...) if err != nil { context.LogError(fmt.Sprintf("delete user [%s] exposure items from holo failed, err=%v", user.Id, err)) } } func (d *User2ItemExposureHologresDao) GetExposureItemIds(user *User, context *context.RecommendContext) (ret string) { uid := string(user.Id) builder := sqlbuilder.PostgreSQL.NewSelectBuilder() builder.Select("item") builder.From(d.table) builder.Where(builder.Equal("uid", string(uid))) if d.timeInterval > 0 { t := time.Now().Unix() - int64(d.timeInterval) builder.Where(builder.GreaterEqualThan("create_time", t)) } builder.OrderBy("create_time desc") if d.maxItems > 0 { builder.Limit(d.maxItems) } sql, args := builder.Build() if d.selectStmt == nil { d.mu.Lock() if d.selectStmt == nil { stmt, err := d.db.Prepare(sql) if err != nil { log.Error(fmt.Sprintf("module=User2ItemExposureHologresDao\tuid=%s\terr=%v", uid, err)) d.mu.Unlock() return } d.selectStmt = stmt d.mu.Unlock() } else { d.mu.Unlock() } } ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond) defer cancel() rows, err := d.selectStmt.QueryContext(ctx, args...) if err != nil { log.Error(fmt.Sprintf("module=User2ItemExposureHologresDao\tuid=%s\terr=%v", uid, err)) return } defer rows.Close() fiterIds := make([]string, 0, 10) for rows.Next() { var itemDatas string if err := rows.Scan(&itemDatas); err == nil { ids := strings.Split(itemDatas, ",") for _, id := range ids { fiterIds = append(fiterIds, id) } } } ret = strings.Join(fiterIds, ",") return }