module/user_item_exposure_featurestore_dao.go (134 lines of code) (raw):

package module import ( "fmt" "time" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/fs" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/datasource/featuredb/fdbserverpb" ) type User2ItemExposureFeatureStoreDao struct { fsClient *fs.FSClient table string timeInterval int64 // second generateItemDataFuncName string writeLogExcludeScenes map[string]bool clearLogScene string onlyLogUserExposeFlag bool } func NewUser2ItemExposureFeatureStoreDao(config recconf.FilterConfig) *User2ItemExposureFeatureStoreDao { fsclient, err := fs.GetFeatureStoreClient(config.DaoConf.FeatureStoreName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return nil } dao := &User2ItemExposureFeatureStoreDao{ timeInterval: 0, generateItemDataFuncName: config.GenerateItemDataFuncName, writeLogExcludeScenes: make(map[string]bool), clearLogScene: config.ClearLogIfNotEnoughScene, fsClient: fsclient, onlyLogUserExposeFlag: config.OnlyLogUserExposeFlag, } dao.table = config.DaoConf.FeatureStoreViewName if config.TimeInterval > 0 { dao.timeInterval = int64(config.TimeInterval) } for _, scene := range config.WriteLogExcludeScenes { dao.writeLogExcludeScenes[scene] = true } return dao } func (d *User2ItemExposureFeatureStoreDao) LogHistory(user *User, items []*Item, context *context.RecommendContext) { start := time.Now() scene := context.GetParameter("scene").(string) if _, exist := d.writeLogExcludeScenes[scene]; exist { return } if len(items) == 0 { log.Warning(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureFeatureStoreDao\terr=items empty", context.RecommendId)) return } project := d.fsClient.GetProject() featureView := project.GetFeatureView(d.table) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureFeatureStoreDao\terror=table not found, name:%s", context.RecommendId, d.table)) return } request := new(fdbserverpb.BatchWriteKVReqeust) uid := string(user.Id) ttl := int64(featureView.GetTTL()) ts := start.Unix() - ttl + d.timeInterval for _, item := range items { itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(user.Id, item) request.Kvs = append(request.Kvs, &fdbserverpb.KVData{ Key: uid, Value: []byte(itemData), Ts: ts * 1000, // ms }) } err := fdbserverpb.BatchWriteBloomKV(project, featureView, request) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureFeatureStoreDao\tuid=%s\terr=%v", context.RecommendId, user.Id, err)) return } log.Info(fmt.Sprintf("requestId=%s\tscene=%s\tuid=%s\tmsg=log history success\tcost=%d", context.RecommendId, scene, user.Id, utils.CostTime(start))) } func (d *User2ItemExposureFeatureStoreDao) FilterByHistory(uid UID, items []*Item) (ret []*Item) { project := d.fsClient.GetProject() featureView := project.GetFeatureView(d.table) if featureView == nil { log.Error(fmt.Sprintf("module=User2ItemExposureFeatureStoreDao\terror=table not found, name:%s", d.table)) ret = items return } request := new(fdbserverpb.TestBloomItemsRequest) request.Key = string(uid) for _, item := range items { itemData := getGenerateItemDataFunc(d.generateItemDataFuncName)(uid, item) request.Items = append(request.Items, itemData) } tests, err := fdbserverpb.TestBloomItems(project, featureView, request) if err != nil { log.Error(fmt.Sprintf("module=User2ItemExposureFeatureStoreDao\terr=%v", err)) ret = items return } // only log flag, not filter item if d.onlyLogUserExposeFlag { for i, test := range tests { if test { items[i].AddProperty("_is_exposure_", 1) } } ret = items } else { ret = make([]*Item, 0, len(items)) for i, test := range tests { if !test { ret = append(ret, items[i]) } } } return } func (d *User2ItemExposureFeatureStoreDao) ClearHistory(user *User, context *context.RecommendContext) { scene := context.GetParameter("scene").(string) if scene != d.clearLogScene { return } project := d.fsClient.GetProject() featureView := project.GetFeatureView(d.table) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=User2ItemExposureFeatureStoreDao\terror=table not found, name:%s", context.RecommendId, d.table)) return } err := fdbserverpb.DeleteBloomByKey(project, featureView, string(user.Id)) if err != nil { context.LogError(fmt.Sprintf("delete user [%s] exposure items failed, err=%v", user.Id, err)) } } func (d *User2ItemExposureFeatureStoreDao) GetExposureItemIds(user *User, context *context.RecommendContext) (ret string) { return }