in module/item_state_filter_hologres_dao.go [71:292]
func (d *ItemStateFilterHologresDao) Filter(user *User, items []*Item) (ret []*Item) {
fields := make(map[string]bool, len(items))
cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(requestCount))), 1)
requestCh := make(chan []interface{}, cpuCount)
maps := make(map[int][]interface{}, cpuCount)
itemMap := make(map[ItemId]*Item, len(items))
index := 0
userFeatures := user.MakeUserFeatures2()
for i, item := range items {
itemId := string(item.Id)
if d.itmCache != nil {
if attrs, ok := d.itmCache.GetIfPresent(itemId); ok {
properties := attrs.(map[string]interface{})
item.AddProperties(properties)
if d.filterParam != nil {
result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
if err == nil && result {
fields[itemId] = true
}
} else {
fields[itemId] = true
}
continue
}
}
itemMap[item.Id] = item
maps[index%cpuCount] = append(maps[index%cpuCount], itemId)
if (i+1)%requestCount == 0 {
index++
}
}
defer close(requestCh)
for _, idlist := range maps {
requestCh <- idlist
}
var wg sync.WaitGroup
var mu sync.Mutex
mergeFunc := func(maps map[string]bool) {
mu.Lock()
for k, v := range maps {
fields[k] = v
}
mu.Unlock()
}
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case idlist := <-requestCh:
fieldMap := make(map[string]bool, len(idlist))
builder := sqlbuilder.PostgreSQL.NewSelectBuilder()
builder.Select(d.itemFieldName)
if d.selectFields != "" {
builder.Select(d.itemFieldName + "," + d.selectFields)
}
builder.From(d.table)
if d.whereClause != "" {
builder.Where(d.whereClause)
}
// for use stmt, adjust idlist length
// if len(idlist) < 1000 && (len(idlist)%100 != 0) {
// c := (len(idlist)/100+1)*100 - len(idlist)
// for i := 0; i < c; i++ {
// idlist = append(idlist, "-1")
// }
// }
if len(idlist) < requestCount {
c := requestCount - len(idlist)
for i := 0; i < c; i++ {
idlist = append(idlist, "-1")
}
}
builder.Where(builder.In(d.itemFieldName, idlist...))
sqlquery, args := builder.Build()
stmtkey := len(idlist)
stmt := d.getStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.stmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
// if error , not filter item
for _, id := range idlist {
fieldMap[id.(string)] = true
}
mergeFunc(fieldMap)
d.mu.Unlock()
return
}
d.stmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rowsChannel := make(chan *sql.Rows, 1)
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 200*time.Millisecond)
defer cancel()
// async invoke sql query
go func() {
rows, err := stmt.Query(args...)
if err != nil {
rowsChannel <- nil
log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
return
}
// check query is timeout
select {
case <-ctx.Done():
if rows != nil {
rows.Close()
}
return
default:
}
rowsChannel <- rows
return
}()
var rows *sql.Rows
select {
case <-ctx.Done():
log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", ctx.Err()))
for _, id := range idlist {
fieldMap[id.(string)] = true
}
mergeFunc(fieldMap)
return
case rows = <-rowsChannel:
if rows == nil {
for _, id := range idlist {
fieldMap[id.(string)] = true
}
mergeFunc(fieldMap)
return
}
}
defer rows.Close()
columns, err := rows.ColumnTypes()
if err != nil {
log.Error(fmt.Sprintf("module=ItemStateFilterHologresDao\terror=hologres error(%v)", err))
// if error , not filter item
for _, id := range idlist {
fieldMap[id.(string)] = true
}
mergeFunc(fieldMap)
return
}
values := sqlutil.ColumnValues(columns)
for rows.Next() {
if err := rows.Scan(values...); err == nil {
properties := make(map[string]interface{}, len(values))
var id string
for i, column := range columns {
name := column.Name()
val := values[i]
if i == 0 {
switch v := val.(type) {
case *sql.NullString:
if v.Valid {
id = v.String
}
case *sql.NullInt32:
if v.Valid {
id = strconv.Itoa(int(v.Int32))
}
case *sql.NullInt64:
id = utils.ToString(v.Int64, "")
}
if id == "" {
break
}
continue
}
if value := sqlutil.ParseColumnValues(val); value != nil {
properties[name] = value
}
}
if d.itmCache != nil {
d.itmCache.Put(id, properties)
}
if item, ok := itemMap[ItemId(id)]; ok {
item.AddProperties(properties)
}
if d.filterParam != nil {
result, err := d.filterParam.EvaluateByDomain(userFeatures, properties)
if err == nil && result {
fieldMap[id] = true
}
} else {
fieldMap[id] = true
}
}
}
mergeFunc(fieldMap)
default:
}
}()
}
wg.Wait()
for _, item := range items {
if _, ok := fields[string(item.Id)]; ok {
ret = append(ret, item)
}
}
return
}