func()

in module/item_state_filter_featurestore_dao.go [62:172]


func (d *ItemStateFilterFeatureStoreDao) Filter(user *User, items []*Item) (ret []*Item) {
	fields := make(map[string]bool, len(items))
	cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(requestCount))), 1)

	requestCh := make(chan []interface{}, cpuCount)
	maps := make(map[int][]interface{}, cpuCount)
	itemMap := make(map[ItemId]*Item, len(items))
	index := 0
	userFeatures := user.MakeUserFeatures2()
	for i, item := range items {
		itemId := string(item.Id)
		if d.itmCache != nil {
			if attrs, ok := d.itmCache.GetIfPresent(itemId); ok {
				properties := attrs.(map[string]interface{})
				item.AddProperties(properties)
				if d.filterParam != nil {
					result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
					if err == nil && result {
						fields[itemId] = true
					}
				} else {
					fields[itemId] = true
				}
				continue
			}
		}
		itemMap[item.Id] = item
		maps[index%cpuCount] = append(maps[index%cpuCount], itemId)
		if (i+1)%requestCount == 0 {
			index++
		}
	}

	defer close(requestCh)
	for _, idlist := range maps {
		requestCh <- idlist
	}

	var wg sync.WaitGroup
	var mu sync.Mutex

	mergeFunc := func(maps map[string]bool) {
		mu.Lock()
		for k, v := range maps {
			fields[k] = v
		}
		mu.Unlock()
	}
	for i := 0; i < cpuCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			select {
			case idlist := <-requestCh:
				fieldMap := make(map[string]bool, len(idlist))

				featureView := d.fsClient.GetProject().GetFeatureView(d.table)
				if featureView == nil {
					log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=featureView not found, table:%s", d.table))
					return
				}
				features, err := featureView.GetOnlineFeatures(idlist, d.selectFields, map[string]string{})
				if err != nil {
					// if error , not filter item
					log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=%v", err))
					for _, id := range idlist {
						fieldMap[id.(string)] = true
					}
					mergeFunc(fieldMap)
					return
				}
				featureEntity := d.fsClient.GetProject().GetFeatureEntity(featureView.GetFeatureEntityName())
				if featureEntity == nil {
					log.Error(fmt.Sprintf("module=ItemStateFilterFeatureStoreDao\terror=featureEntity not found, name:%s", featureView.GetFeatureEntityName()))
					return
				}
				for _, itemFeatures := range features {
					itemId := utils.ToString(itemFeatures[featureEntity.FeatureEntityJoinid], "")
					if itemId != "" {
						if item, ok := itemMap[ItemId(itemId)]; ok {
							item.AddProperties(itemFeatures)
							if d.itmCache != nil {
								d.itmCache.Put(itemId, itemFeatures)
							}
							if d.filterParam != nil {
								result, err := d.filterParam.EvaluateByDomain(userFeatures, itemFeatures)
								if err == nil && result {
									fieldMap[itemId] = true
								}
							} else {
								fieldMap[itemId] = true
							}
						}
					}

				}
				mergeFunc(fieldMap)
			default:
			}
		}()
	}

	wg.Wait()

	for _, item := range items {
		if _, ok := fields[string(item.Id)]; ok {
			ret = append(ret, item)
		}
	}
	return
}