in module/item_state_filter_featurestore_dao.go [62:172]
func (d *ItemStateFilterFeatureStoreDao) Filter(user *User, items []*Item) (ret []*Item) {
fields := make(map[string]bool, len(items))
cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(requestCount))), 1)
requestCh := make(chan []interface{}, cpuCount)
maps := make(map[int][]interface{}, cpuCount)
itemMap := make(map[ItemId]*Item, len(items))
index := 0
userFeatures := user.MakeUserFeatures2()
for i, item := range items {
itemId := string(item.Id)
if d.itmCache != nil {
if attrs, ok := d.itmCache.GetIfPresent(itemId); ok {
properties := attrs.(map[string]interface{})
item.AddProperties(properties)
if d.filterParam != nil {
result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
if err == nil && result {
fields[itemId] = true
}
} else {
fields[itemId] = true
}
continue
}
}
itemMap[item.Id] = item
maps[index%cpuCount] = append(maps[index%cpuCount], itemId)
if (i+1)%requestCount == 0 {
index++
}
}
defer close(requestCh)
for _, idlist := range maps {
requestCh <- idlist
}
var wg sync.WaitGroup
var mu sync.Mutex
mergeFunc := func(maps map[string]bool) {
mu.Lock()
for k, v := range maps {
fields[k] = v
}
mu.Unlock()
}
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case idlist := <-requestCh:
fieldMap := make(map[string]bool, len(idlist))
featureView := d.fsClient.GetProject().GetFeatureView(d.table)
if featureView == nil {
log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=featureView not found, table:%s", d.table))
return
}
features, err := featureView.GetOnlineFeatures(idlist, d.selectFields, map[string]string{})
if err != nil {
// if error , not filter item
log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=%v", err))
for _, id := range idlist {
fieldMap[id.(string)] = true
}
mergeFunc(fieldMap)
return
}
featureEntity := d.fsClient.GetProject().GetFeatureEntity(featureView.GetFeatureEntityName())
if featureEntity == nil {
log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=featureEntity not found, name:%s", featureView.GetFeatureEntityName()))
return
}
for _, itemFeatures := range features {
itemId := utils.ToString(itemFeatures[featureEntity.FeatureEntityJoinid], "")
if itemId != "" {
if item, ok := itemMap[ItemId(itemId)]; ok {
item.AddProperties(itemFeatures)
if d.itmCache != nil {
d.itmCache.Put(itemId, itemFeatures)
}
if d.filterParam != nil {
result, err := d.filterParam.EvaluateByDomain(userFeatures, itemFeatures)
if err == nil && result {
fieldMap[itemId] = true
}
} else {
fieldMap[itemId] = true
}
}
}
}
mergeFunc(fieldMap)
default:
}
}()
}
wg.Wait()
for _, item := range items {
if _, ok := fields[string(item.Id)]; ok {
ret = append(ret, item)
}
}
return
}