module/cold_start_recall_featurestore_dao.go (140 lines of code) (raw):

package module import ( "fmt" "math/rand" "strings" "time" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/fs" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" ) type ColdStartRecallFeatureStoreDao struct { fsClient *fs.FSClient recallCount int timeInterval int recallName string table string whereClause string ch chan string itemIds []string lastScanTime time.Time // last scan data time } func NewColdStartRecallFeatureStoreDao(config recconf.RecallConfig) *ColdStartRecallFeatureStoreDao { fsclient, err := fs.GetFeatureStoreClient(config.ColdStartDaoConf.FeatureStoreName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return nil } dao := &ColdStartRecallFeatureStoreDao{ fsClient: fsclient, recallCount: config.RecallCount, table: config.ColdStartDaoConf.FeatureStoreViewName, recallName: config.Name, timeInterval: config.ColdStartDaoConf.TimeInterval, whereClause: config.ColdStartDaoConf.WhereClause, ch: make(chan string, 1000), itemIds: make([]string, 0, 1024), } featureView := dao.fsClient.GetProject().GetFeatureView(dao.table) if featureView == nil { panic(fmt.Sprintf("featureView not found, table:%s", dao.table)) } go dao.initItemData() if featureView.GetType() == "Stream" { go dao.loopIterateData() } return dao } func (d *ColdStartRecallFeatureStoreDao) initItemData() { featureView := d.fsClient.GetProject().GetFeatureView(d.table) if featureView == nil { log.Error(fmt.Sprintf("module=ColdStartRecallFeatureStoreDao\trecallName=%s\terror=featureView not found, table:%s", d.recallName, d.table)) return } where := d.whereClause createTime := time.Now().Add(time.Duration(-1*d.timeInterval) * time.Second) where = strings.ReplaceAll(where, "${time}", utils.ToString(createTime.Unix(), "0")) var ( ids []string err error ) if featureView.GetType() == "Batch" { ids, err = featureView.ScanAndIterateData(where, nil) } else { ids, err = featureView.ScanAndIterateData(where, d.ch) } d.lastScanTime = time.Now() if err != nil { log.Error(fmt.Sprintf("module=ColdStartRecallFeatureStoreDao\terror=%v", err)) return } d.itemIds = ids } func (d *ColdStartRecallFeatureStoreDao) loopIterateData() { ticker := time.NewTicker(time.Minute) var ids []string appendItems := func() { newItemIds := make([]string, len(d.itemIds)) copy(newItemIds, d.itemIds) m := make(map[string]bool) for _, id := range newItemIds { m[id] = true } for _, id := range ids { if _, ok := m[id]; !ok { newItemIds = append(newItemIds, id) } } ids = ids[:0] d.itemIds = newItemIds } for id := range d.ch { ids = append(ids, id) select { case <-ticker.C: if len(ids) > 0 { appendItems() } default: if len(ids) > 1000 { appendItems() } } } } func (d *ColdStartRecallFeatureStoreDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) { featureView := d.fsClient.GetProject().GetFeatureView(d.table) if featureView == nil { log.Error(fmt.Sprintf("module=ColdStartRecallFeatureStoreDao\trecallName=%s\terror=featureView not found, table:%s", d.recallName, d.table)) return } for _, itemId := range d.itemIds { item := NewItem(itemId) item.RetrieveId = d.recallName ret = append(ret, item) } go func() { if time.Since(d.lastScanTime) <= time.Duration(30)*time.Minute { return } d.lastScanTime = time.Now() where := d.whereClause createTime := time.Now().Add(time.Duration(-1*d.timeInterval) * time.Second) where = strings.ReplaceAll(where, "${time}", utils.ToString(createTime.Unix(), "0")) ids, err := featureView.ScanAndIterateData(where, nil) if err != nil { log.Error(fmt.Sprintf("module=ColdStartRecallFeatureStoreDao\terror=%v", err)) return } d.itemIds = ids }() rand.Shuffle(len(ret), func(i, j int) { ret[i], ret[j] = ret[j], ret[i] }) if len(ret) > d.recallCount { ret = ret[:d.recallCount] } return }