module/user_item_exposure_tablestore_dao.go (219 lines of code) (raw):
package module
import (
"fmt"
"strings"
"time"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/persist/tablestoredb"
"github.com/alibaba/pairec/v2/recconf"
)
type User2ItemExposureTableStoreDao struct {
tablestore *tablestoredb.TableStore
table string
maxItems int32
timeInterval int // second
generateItemDataFuncName string
writeLogExcludeScenes map[string]bool
clearLogScene string
}
func NewUser2ItemExposureTableStoreDao(config recconf.FilterConfig) *User2ItemExposureTableStoreDao {
dao := &User2ItemExposureTableStoreDao{
maxItems: -1,
timeInterval: -1,
generateItemDataFuncName: config.GenerateItemDataFuncName,
writeLogExcludeScenes: make(map[string]bool),
clearLogScene: config.ClearLogIfNotEnoughScene,
}
tablestore, err := tablestoredb.GetTableStore(config.DaoConf.TableStoreName)
if err != nil {
log.Error(fmt.Sprintf("%v", err))
return nil
}
dao.tablestore = tablestore
dao.table = config.DaoConf.TableStoreTableName
if config.MaxItems > 0 {
dao.maxItems = int32(config.MaxItems)
}
if config.TimeInterval > 0 {
dao.timeInterval = config.TimeInterval
}
for _, scene := range config.WriteLogExcludeScenes {
dao.writeLogExcludeScenes[scene] = true
}
return dao
}
func (d *User2ItemExposureTableStoreDao) LogHistory(user *User, items []*Item, context *context.RecommendContext) {
scene := context.GetParameter("scene").(string)
if _, exist := d.writeLogExcludeScenes[scene]; exist {
return
}
uid := string(user.Id)
idList := make([]string, 0)
for _, item := range items {
itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(user.Id, item)
idList = append(idList, itemData)
}
putRowRequest := new(tablestore.PutRowRequest)
putRowChange := new(tablestore.PutRowChange)
putRowChange.TableName = d.table
putPk := new(tablestore.PrimaryKey)
putPk.AddPrimaryKeyColumn("user_id", uid)
putPk.AddPrimaryKeyColumnWithAutoIncrement("auto_id")
putRowChange.PrimaryKey = putPk
putRowChange.AddColumn("item_ids", strings.Join(idList, ","))
putRowChange.SetCondition(tablestore.RowExistenceExpectation_IGNORE)
putRowRequest.PutRowChange = putRowChange
_, err := d.tablestore.Client.PutRow(putRowRequest)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureTableStoreDao\tuid=%s\terr=%v", context.RecommendId, user.Id, err))
}
log.Info(fmt.Sprintf("requestId=%s\tuid=%s\tmsg=log history success", context.RecommendId, user.Id))
}
func (d *User2ItemExposureTableStoreDao) FilterByHistory(uid UID, items []*Item) (ret []*Item) {
getRangeRequest := &tablestore.GetRangeRequest{}
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
rangeRowQueryCriteria.TableName = d.table
startPK := new(tablestore.PrimaryKey)
startPK.AddPrimaryKeyColumn("user_id", string(uid))
startPK.AddPrimaryKeyColumnWithMaxValue("auto_id")
endPK := new(tablestore.PrimaryKey)
endPK.AddPrimaryKeyColumn("user_id", string(uid))
endPK.AddPrimaryKeyColumnWithMinValue("auto_id")
rangeRowQueryCriteria.StartPrimaryKey = startPK
rangeRowQueryCriteria.EndPrimaryKey = endPK
rangeRowQueryCriteria.ColumnsToGet = []string{"item_ids"}
rangeRowQueryCriteria.Direction = tablestore.BACKWARD
rangeRowQueryCriteria.MaxVersion = 1
if d.maxItems > 0 {
rangeRowQueryCriteria.Limit = d.maxItems
}
if d.timeInterval > 0 {
t := time.Now().Unix() - int64(d.timeInterval)
trange := tablestore.TimeRange{
Start: t * 1000,
End: (time.Now().Unix() + 60) * 1000,
}
rangeRowQueryCriteria.TimeRange = &trange
}
getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
getRangeResp, err := d.tablestore.Client.GetRange(getRangeRequest)
if err != nil {
log.Error(fmt.Sprintf("module=User2ItemExposureTableStoreDao\tuid=%s\terr=%v", uid, err))
ret = items
return
}
fiterIds := make(map[string]bool)
for _, row := range getRangeResp.Rows {
if len(row.Columns) > 0 {
if id, ok := row.Columns[0].Value.(string); ok && id != "" {
idList := strings.Split(id, ",")
for _, id := range idList {
fiterIds[id] = true
}
}
}
}
for _, item := range items {
itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(uid, item)
if _, ok := fiterIds[itemData]; !ok {
ret = append(ret, item)
}
}
return
}
func (d *User2ItemExposureTableStoreDao) ClearHistory(user *User, context *context.RecommendContext) {
scene := context.GetParameter("scene").(string)
if scene != d.clearLogScene {
return
}
getRangeRequest := &tablestore.GetRangeRequest{}
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
rangeRowQueryCriteria.TableName = d.table
startPK := new(tablestore.PrimaryKey)
startPK.AddPrimaryKeyColumn("user_id", string(user.Id))
startPK.AddPrimaryKeyColumnWithMinValue("auto_id")
endPK := new(tablestore.PrimaryKey)
endPK.AddPrimaryKeyColumn("user_id", string(user.Id))
endPK.AddPrimaryKeyColumnWithMaxValue("auto_id")
rangeRowQueryCriteria.StartPrimaryKey = startPK
rangeRowQueryCriteria.EndPrimaryKey = endPK
rangeRowQueryCriteria.Direction = tablestore.FORWARD
rangeRowQueryCriteria.MaxVersion = 1
getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
getRangeResp, err := d.tablestore.Client.GetRange(getRangeRequest)
batchWriteRowRequest := &tablestore.BatchWriteRowRequest{}
for {
if err != nil {
context.LogError(fmt.Sprintf("module=User2ItemExposureTableStoreDao\tuid=%s\terr=%v", user.Id, err))
break
}
for _, row := range getRangeResp.Rows {
deleteRowChange := new(tablestore.DeleteRowChange)
deleteRowChange.TableName = d.table
deleteRowChange.PrimaryKey = row.PrimaryKey
deleteRowChange.SetCondition(tablestore.RowExistenceExpectation_EXPECT_EXIST)
batchWriteRowRequest.AddRowChange(deleteRowChange)
}
if getRangeResp.NextStartPrimaryKey == nil {
break
} else {
getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
getRangeResp, err = d.tablestore.Client.GetRange(getRangeRequest)
}
}
_, err = d.tablestore.Client.BatchWriteRow(batchWriteRowRequest)
if err != nil {
context.LogError(fmt.Sprintf("delete user [%s] exposure items failed with error: %v", user.Id, err))
} else {
context.LogInfo(fmt.Sprintf("delete user [%s] exposure items", user.Id))
}
}
func (d *User2ItemExposureTableStoreDao) GetExposureItemIds(user *User, context *context.RecommendContext) (ret string) {
uid := string(user.Id)
getRangeRequest := &tablestore.GetRangeRequest{}
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
rangeRowQueryCriteria.TableName = d.table
startPK := new(tablestore.PrimaryKey)
startPK.AddPrimaryKeyColumn("user_id", string(uid))
startPK.AddPrimaryKeyColumnWithMaxValue("auto_id")
endPK := new(tablestore.PrimaryKey)
endPK.AddPrimaryKeyColumn("user_id", string(uid))
endPK.AddPrimaryKeyColumnWithMinValue("auto_id")
rangeRowQueryCriteria.StartPrimaryKey = startPK
rangeRowQueryCriteria.EndPrimaryKey = endPK
rangeRowQueryCriteria.ColumnsToGet = []string{"item_ids"}
rangeRowQueryCriteria.Direction = tablestore.BACKWARD
rangeRowQueryCriteria.MaxVersion = 1
if d.maxItems > 0 {
rangeRowQueryCriteria.Limit = d.maxItems
}
if d.timeInterval > 0 {
t := time.Now().Unix() - int64(d.timeInterval)
trange := tablestore.TimeRange{
Start: t * 1000,
End: (time.Now().Unix() + 60) * 1000,
}
rangeRowQueryCriteria.TimeRange = &trange
}
getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
getRangeResp, err := d.tablestore.Client.GetRange(getRangeRequest)
if err != nil {
log.Error(fmt.Sprintf("module=User2ItemExposureTableStoreDao\tuid=%s\terr=%v", uid, err))
return
}
fiterIds := make([]string, 0, 10)
for _, row := range getRangeResp.Rows {
if len(row.Columns) > 0 {
if id, ok := row.Columns[0].Value.(string); ok && id != "" {
idList := strings.Split(id, ",")
for _, id := range idList {
fiterIds = append(fiterIds, id)
}
}
}
}
ret = strings.Join(fiterIds, ",")
return
}