func()

in module/item_state_filter_hologres_dao.go [71:292]


func (d *ItemStateFilterHologresDao) Filter(user *User, items []*Item) (ret []*Item) {
	fields := make(map[string]bool, len(items))
	cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(requestCount))), 1)
	requestCh := make(chan []interface{}, cpuCount)
	maps := make(map[int][]interface{}, cpuCount)
	itemMap := make(map[ItemId]*Item, len(items))
	index := 0
	userFeatures := user.MakeUserFeatures2()
	for i, item := range items {
		itemId := string(item.Id)
		if d.itmCache != nil {
			if attrs, ok := d.itmCache.GetIfPresent(itemId); ok {
				properties := attrs.(map[string]interface{})
				item.AddProperties(properties)
				if d.filterParam != nil {
					result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
					if err == nil && result {
						fields[itemId] = true
					}
				} else {
					fields[itemId] = true
				}
				continue
			}
		}
		itemMap[item.Id] = item
		maps[index%cpuCount] = append(maps[index%cpuCount], itemId)
		if (i+1)%requestCount == 0 {
			index++
		}
	}

	defer close(requestCh)
	for _, idlist := range maps {
		requestCh <- idlist
	}

	var wg sync.WaitGroup
	var mu sync.Mutex

	mergeFunc := func(maps map[string]bool) {
		mu.Lock()
		for k, v := range maps {
			fields[k] = v
		}
		mu.Unlock()
	}
	for i := 0; i < cpuCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			select {
			case idlist := <-requestCh:
				fieldMap := make(map[string]bool, len(idlist))
				builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
				builder.Select(d.itemFieldName)
				if d.selectFields != "" {
					builder.Select(d.itemFieldName + "," + d.selectFields)
				}
				builder.From(d.table)
				if d.whereClause != "" {
					builder.Where(d.whereClause)
				}
				// for use stmt, adjust idlist length
				// if len(idlist) < 1000 && (len(idlist)%100 != 0) {
				// c := (len(idlist)/100+1)*100 - len(idlist)
				// for i := 0; i < c; i++ {
				// idlist = append(idlist, "-1")
				// }
				// }
				if len(idlist) < requestCount {
					c := requestCount - len(idlist)
					for i := 0; i < c; i++ {
						idlist = append(idlist, "-1")
					}
				}
				builder.Where(builder.In(d.itemFieldName, idlist...))

				sqlquery, args := builder.Build()
				stmtkey := len(idlist)
				stmt := d.getStmt(stmtkey)
				if stmt == nil {
					d.mu.Lock()
					stmt = d.stmtMap[stmtkey]
					if stmt == nil {
						stmt2, err := d.db.Prepare(sqlquery)
						if err != nil {
							log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
							// if error , not filter item
							for _, id := range idlist {
								fieldMap[id.(string)] = true
							}
							mergeFunc(fieldMap)
							d.mu.Unlock()
							return
						}
						d.stmtMap[stmtkey] = stmt2
						stmt = stmt2
						d.mu.Unlock()
					} else {
						d.mu.Unlock()
					}
				}

				rowsChannel := make(chan *sql.Rows, 1)
				ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 200*time.Millisecond)
				defer cancel()
				// async invoke sql query
				go func() {
					rows, err := stmt.Query(args...)
					if err != nil {
						rowsChannel <- nil
						log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
						return
					}

					// check query is timeout
					select {
					case <-ctx.Done():
						if rows != nil {
							rows.Close()
						}
						return
					default:
					}

					rowsChannel <- rows
					return
				}()

				var rows *sql.Rows
				select {
				case <-ctx.Done():
					log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", ctx.Err()))
					for _, id := range idlist {
						fieldMap[id.(string)] = true
					}
					mergeFunc(fieldMap)
					return
				case rows = <-rowsChannel:
					if rows == nil {
						for _, id := range idlist {
							fieldMap[id.(string)] = true
						}
						mergeFunc(fieldMap)
						return
					}
				}

				defer rows.Close()
				columns, err := rows.ColumnTypes()
				if err != nil {
					log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
					// if error , not filter item
					for _, id := range idlist {
						fieldMap[id.(string)] = true
					}
					mergeFunc(fieldMap)
					return
				}
				values := sqlutil.ColumnValues(columns)
				for rows.Next() {
					if err := rows.Scan(values...); err == nil {
						properties := make(map[string]interface{}, len(values))
						var id string
						for i, column := range columns {
							name := column.Name()
							val := values[i]
							if i == 0 {
								switch v := val.(type) {
								case *sql.NullString:
									if v.Valid {
										id = v.String
									}
								case *sql.NullInt32:
									if v.Valid {
										id = strconv.Itoa(int(v.Int32))
									}
								case *sql.NullInt64:
									id = utils.ToString(v.Int64, "")
								}
								if id == "" {
									break
								}
								continue
							}

							if value := sqlutil.ParseColumnValues(val); value != nil {
								properties[name] = value
							}
						}
						if d.itmCache != nil {
							d.itmCache.Put(id, properties)
						}
						if item, ok := itemMap[ItemId(id)]; ok {
							item.AddProperties(properties)
						}
						if d.filterParam != nil {
							result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
							if err == nil && result {
								fieldMap[id] = true
							}
						} else {
							fieldMap[id] = true
						}
					}
				}
				mergeFunc(fieldMap)
			default:
			}
		}()
	}

	wg.Wait()

	for _, item := range items {
		if _, ok := fields[string(item.Id)]; ok {
			ret = append(ret, item)
		}
	}
	return
}