in module/feature_mysql_dao.go [149:323]
func (d *FeatureMysqlDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=%v", context.RecommendId, err))
return
}
}()
fk := d.featureKey
if fk != "item:id" {
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey))
return
}
fk = comms[1]
}
cpuCount := utils.MaxInt(int(math.Ceil(float64(len(items))/float64(1000))), 1)
requestCh := make(chan []*Item, cpuCount)
defer close(requestCh)
if cpuCount == 1 {
requestCh <- items
} else {
maps := make(map[int][]*Item)
for i, item := range items {
maps[i%cpuCount] = append(maps[i%cpuCount], item)
}
for _, itemlist := range maps {
requestCh <- itemlist
}
}
var wg sync.WaitGroup
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case itemlist := <-requestCh:
var keys []interface{}
key2Item := make(map[string]*Item, len(itemlist))
for _, item := range itemlist {
var key string
if fk == "item:id" {
key = string(item.Id)
} else {
key = item.StringProperty(fk)
}
keys = append(keys, key)
key2Item[key] = item
}
builder := sqlbuilder.MySQL.NewSelectBuilder()
builder.Select(d.itemSelectFields)
builder.From(d.table)
builder.Where(builder.In(d.itemFeatureKeyName, keys...))
sqlquery, args := builder.Build()
stmtkey := len(keys)
stmt := d.getItemStmt(stmtkey)
if stmt == nil {
d.mu.Lock()
stmt = d.itemStmtMap[stmtkey]
if stmt == nil {
stmt2, err := d.db.Prepare(sqlquery)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=mysql error(%v)", context.RecommendId, err))
d.mu.Unlock()
return
}
d.itemStmtMap[stmtkey] = stmt2
stmt = stmt2
d.mu.Unlock()
} else {
d.mu.Unlock()
}
}
rowsChannel := make(chan *sql.Rows, 1)
ctx, cancel := gocontext.WithTimeout(gocontext.Background(), 100*time.Millisecond)
defer cancel()
// async invoke sql query
go func() {
rows, err := stmt.Query(args...)
if err != nil {
rowsChannel <- nil
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=mysql error(%v)", context.RecommendId, err))
return
}
// check query is timeout
select {
case <-ctx.Done():
if rows != nil {
rows.Close()
}
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=%v", context.RecommendId, ctx.Err()))
return
default:
}
rowsChannel <- rows
}()
var rows *sql.Rows
select {
case <-ctx.Done():
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=mysql error(%v)", context.RecommendId, ctx.Err()))
return
case rows = <-rowsChannel:
if rows == nil {
return
}
}
defer rows.Close()
columns, err := rows.ColumnTypes()
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=mysql error(%v)", context.RecommendId, err))
return
}
values := sqlutil.ColumnValues(columns)
for rows.Next() {
if err := rows.Scan(values...); err == nil {
var item *Item
properties := make(map[string]interface{}, len(values))
for i, column := range columns {
name := column.Name()
val := values[i]
if i == 0 {
var key string
switch v := val.(type) {
case *sql.NullString:
if v.Valid {
key = v.String
}
case *sql.NullInt32:
if v.Valid {
key = strconv.Itoa(int(v.Int32))
}
case *sql.NullInt64:
if v.Valid {
key = strconv.Itoa(int(v.Int64))
}
}
if key == "" {
break
}
item = key2Item[key]
continue
}
if value := sqlutil.ParseColumnValues(val); value != nil {
properties[name] = value
}
}
if nil != item && len(properties) > 0 {
item.AddProperties(properties)
}
} else {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureMysqlDao\terror=mysql error(%v)", context.RecommendId, err))
}
}
default:
}
}()
}
wg.Wait()
}