module/user_collaborative_featurestore_dao.go (166 lines of code) (raw):

package module import ( "fmt" "math/rand" "strconv" "strings" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/fs" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" ) type UserCollaborativeFeatureStoreDao struct { fsClient *fs.FSClient userTable string itemTable string itemType string recallName string normalization bool } func NewUserCollaborativeFeatureStoreDao(config recconf.RecallConfig) *UserCollaborativeFeatureStoreDao { fsclient, err := fs.GetFeatureStoreClient(config.UserCollaborativeDaoConf.FeatureStoreName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return nil } dao := &UserCollaborativeFeatureStoreDao{ fsClient: fsclient, userTable: config.UserCollaborativeDaoConf.User2ItemFeatureViewName, itemTable: config.UserCollaborativeDaoConf.Item2ItemFeatureViewName, itemType: config.ItemType, recallName: config.Name, } if config.UserCollaborativeDaoConf.Normalization == "on" || config.UserCollaborativeDaoConf.Normalization == "" { dao.normalization = true } return dao } func (d *UserCollaborativeFeatureStoreDao) ListItemsByUser(user *User, context *context.RecommendContext) (ret []*Item) { uid := string(user.Id) featureView := d.fsClient.GetProject().GetFeatureView(d.userTable) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\terror=featureView not found, table:%s", context.RecommendId, d.userTable)) return } features, err := featureView.GetOnlineFeatures([]any{uid}, []string{"item_ids"}, map[string]string{}) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\terror=%v", context.RecommendId, err)) return } if len(features) == 0 { return } itemIds := make([]string, 0) preferScoreMap := make(map[string]float64) itemIdsStr := features[0]["item_ids"] if ids := utils.ToString(itemIdsStr, ""); ids != "" { idList := strings.Split(ids, ",") for _, id := range idList { strs := strings.Split(id, ":") if strs[0] == "" { continue } itemIds = append(itemIds, strs[0]) preferScoreMap[strs[0]] = 1 if len(strs) > 1 { if score, err := strconv.ParseFloat(strs[1], 64); err == nil { preferScoreMap[strs[0]] = score } else { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\tevent=ParsePreferScore\tuid=%s\terr=%v", context.RecommendId, uid, err)) } } } } if len(itemIds) == 0 { log.Info(fmt.Sprintf("module=UserCollaborativeFeatureStoreDao\tuid=%s\terr=item ids empty", uid)) return } if len(itemIds) > 200 { rand.Shuffle(len(itemIds)/2, func(i, j int) { itemIds[i], itemIds[j] = itemIds[j], itemIds[i] }) itemIds = itemIds[:200] } cpuCount := 4 maps := make(map[int][]interface{}) for i, id := range itemIds { maps[i%cpuCount] = append(maps[i%cpuCount], id) } itemIdCh := make(chan []interface{}, cpuCount) for _, ids := range maps { itemIdCh <- ids } itemCh := make(chan []*Item, cpuCount) for i := 0; i < cpuCount; i++ { go func() { result := make([]*Item, 0) LOOP: for { select { case ids := <-itemIdCh: featureView := d.fsClient.GetProject().GetFeatureView(d.itemTable) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\terror=featureView not found, table:%s", context.RecommendId, d.userTable)) goto LOOP } featureEntity := d.fsClient.GetProject().GetFeatureEntity(featureView.GetFeatureEntityName()) if featureEntity == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\terror=featureEntity not found, name:%s", context.RecommendId, featureView.GetFeatureEntityName())) goto LOOP } features, err := featureView.GetOnlineFeatures(ids, []string{"similar_item_ids"}, map[string]string{}) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=UserCollaborativeFeatureStoreDao\terror=%v", context.RecommendId, err)) goto LOOP } if len(features) == 0 { goto LOOP } for _, feature := range features { triggerId := utils.ToString(feature[featureEntity.FeatureEntityJoinid], "") ids := utils.ToString(feature["similar_item_ids"], "") if triggerId == "" || ids == "" { continue } preferScore := preferScoreMap[triggerId] list := strings.Split(ids, ",") for _, str := range list { strs := strings.Split(str, ":") if len(strs) == 2 && len(strs[0]) > 0 && strs[0] != "null" { item := NewItem(strs[0]) item.RetrieveId = d.recallName item.ItemType = d.itemType if tmpScore, err := strconv.ParseFloat(strings.TrimSpace(strs[1]), 64); err == nil { item.Score = tmpScore * preferScore } else { item.Score = preferScore } result = append(result, item) } } } default: goto DONE } } DONE: itemCh <- result }() } ret = mergeUserCollaborativeItemsResult(itemCh, cpuCount, d.normalization) close(itemCh) close(itemIdCh) return } func (d *UserCollaborativeFeatureStoreDao) GetTriggers(user *User, context *context.RecommendContext) (itemTriggers map[string]float64) { itemTriggers = make(map[string]float64) triggerInfos := d.GetTriggerInfos(user, context) for _, trigger := range triggerInfos { itemTriggers[trigger.ItemId] = trigger.Weight } return } func (d *UserCollaborativeFeatureStoreDao) GetTriggerInfos(user *User, context *context.RecommendContext) (triggerInfos []*TriggerInfo) { return }