module/feature_redis_dao.go (343 lines of code) (raw):
package module
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/persist/redisdb"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/utils"
"github.com/gomodule/redigo/redis"
)
const (
REDIS_DATA_TYPE_STRING = "string"
REDIS_DATA_TYPE_HASH = "hash"
REDIS_FIELD_TYPE_CSV = "csv"
REDIS_FIELD_TYPE_JSON = "json"
)
type FeatureRedisDao struct {
*FeatureBaseDao
redis *redisdb.Redis
redisPrefix string
redisDelimeter string
redisDataType string
redisFieldType string
userSelectFields []interface{}
itemSelectFields []interface{}
}
func NewFeatureRedisDao(config recconf.FeatureDaoConfig) *FeatureRedisDao {
dao := &FeatureRedisDao{
FeatureBaseDao: NewFeatureBaseDao(&config),
redisPrefix: config.RedisPrefix,
redisDelimeter: config.RedisValueDelimeter,
redisDataType: REDIS_DATA_TYPE_STRING,
redisFieldType: REDIS_FIELD_TYPE_CSV,
}
redis, err := redisdb.GetRedis(config.RedisName)
if err != nil {
log.Error(fmt.Sprintf("error=%v", err))
return nil
}
dao.redis = redis
if dao.redisDelimeter == "" {
dao.redisDelimeter = ","
}
if config.RedisDataType != "" {
dao.redisDataType = config.RedisDataType
}
if config.RedisFieldType != "" {
dao.redisFieldType = config.RedisFieldType
}
if config.UserSelectFields != "" && config.UserSelectFields != "*" {
fields := strings.Split(config.UserSelectFields, ",")
for _, f := range fields {
dao.userSelectFields = append(dao.userSelectFields, f)
}
}
if config.ItemSelectFields != "" && config.ItemSelectFields != "*" {
fields := strings.Split(config.ItemSelectFields, ",")
for _, f := range fields {
dao.itemSelectFields = append(dao.itemSelectFields, f)
}
}
return dao
}
func (d *FeatureRedisDao) FeatureFetch(user *User, items []*Item, context *context.RecommendContext) {
if d.featureStore == Feature_Store_User {
d.userFeatureFetch(user, context)
} else {
d.itemsFeatureFetch(items, context)
}
}
func (d *FeatureRedisDao) userFeatureFetch(user *User, context *context.RecommendContext) {
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=property not found(%s)", context.RecommendId, user.Id, comms[1]))
return
}
key = d.redisPrefix + key
// hit user cache
if d.cache != nil {
if cacheValue, ok := d.cache.GetIfPresent(key); ok {
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, cacheValue.(map[string]interface{}))
} else {
user.AddProperties(cacheValue.(map[string]interface{}))
}
if context.Debug {
log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\tmsg=hit cache(%s)", context.RecommendId, key))
}
return
}
}
conn := d.redis.Get()
defer conn.Close()
if d.redisDataType == REDIS_DATA_TYPE_STRING {
d.userFeatureFetchByString(user, context, conn, key)
} else if d.redisDataType == REDIS_DATA_TYPE_HASH {
d.userFeatureFetchByHash(user, context, conn, key)
}
}
func (d *FeatureRedisDao) userFeatureFetchByString(user *User, context *context.RecommendContext, conn redis.Conn, key string) error {
str, err := redis.String(conn.Do("GET", key))
if err != nil {
if errors.Is(err, redis.ErrNil) {
log.Info(fmt.Sprintf("requestId=%s\tuid=%s\tmsg=user feature empty", context.RecommendId, user.Id))
} else {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=get user feature error(%v)", context.RecommendId, user.Id, err))
}
return err
}
properties := make(map[string]interface{})
if d.redisFieldType == REDIS_FIELD_TYPE_CSV {
keyParis := strings.Split(str, d.redisDelimeter)
if len(keyParis) == 0 {
return nil
}
for _, pair := range keyParis {
idx := strings.Index(pair, ":")
if idx > 0 {
name := pair[:idx]
value := pair[idx+1:]
properties[name] = value
}
}
} else if d.redisFieldType == REDIS_FIELD_TYPE_JSON {
err := json.Unmarshal([]byte(str), &properties)
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=get user feature error(%v)", context.RecommendId, user.Id, err))
return err
}
}
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
if d.cache != nil {
d.cache.Put(key, properties)
}
return nil
}
func (d *FeatureRedisDao) userFeatureFetchByHash(user *User, context *context.RecommendContext, conn redis.Conn, key string) error {
properties := make(map[string]interface{})
if len(d.userSelectFields) == 0 {
// get all fields
strs, err := redis.Strings(conn.Do("HGETALL", key))
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=get user feature error(%v)", context.RecommendId, user.Id, err))
return err
}
for i := 0; i < len(strs); i += 2 {
properties[strs[i]] = strs[i+1]
}
} else {
var params []interface{}
params = append(params, key)
params = append(params, d.userSelectFields...)
strs, err := redis.Strings(conn.Do("HMGET", params...))
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=get user feature error(%v)", context.RecommendId, user.Id, err))
return err
}
for i, val := range strs {
if val == "" {
continue
}
properties[d.userSelectFields[i].(string)] = val
}
}
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
if d.cache != nil {
d.cache.Put(key, properties)
}
return nil
}
func (d *FeatureRedisDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) {
fk := d.featureKey
if fk != "item:id" {
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey))
return
}
fk = comms[1]
}
cpuCount := utils.MaxInt(int(len(items)/100), 1)
maps := make(map[int][]*Item)
for i, item := range items {
maps[i%cpuCount] = append(maps[i%cpuCount], item)
}
requestCh := make(chan []*Item, cpuCount)
defer close(requestCh)
for _, itemlist := range maps {
requestCh <- itemlist
}
var wg sync.WaitGroup
for i := 0; i < cpuCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case itemlist := <-requestCh:
var keys []interface{}
for _, item := range itemlist {
var key string
if fk == "item:id" {
key = string(item.Id)
} else {
key = item.StringProperty(fk)
}
key = d.redisPrefix + key
if d.cache != nil {
if cacheValue, ok := d.cache.GetIfPresent(key); ok {
item.AddProperties(cacheValue.(map[string]interface{}))
if context.Debug {
item.AddProperty("__debug_cache_hit__", true)
}
continue
}
}
keys = append(keys, key)
}
if len(keys) == 0 {
return
}
conn := d.redis.Get()
defer conn.Close()
if d.redisDataType == REDIS_DATA_TYPE_STRING {
d.itemFeatureFetchByString(itemlist, context, conn, keys)
} else if d.redisDataType == REDIS_DATA_TYPE_HASH {
d.itemFeatureFetchByHash(itemlist, context, conn, keys)
}
default:
}
}()
}
wg.Wait()
}
func (d *FeatureRedisDao) itemFeatureFetchByString(items []*Item, context *context.RecommendContext, conn redis.Conn, keys []interface{}) error {
values, err := redis.Strings(conn.Do("MGET", keys...))
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureRedisDao\terror=%v", context.RecommendId, err))
return err
}
for i, str := range values {
if str == "" {
continue
}
item := items[i]
properties := make(map[string]interface{})
if d.redisFieldType == REDIS_FIELD_TYPE_CSV {
keyParis := strings.Split(str, d.redisDelimeter)
if len(keyParis) == 0 {
continue
}
for _, pair := range keyParis {
keyValues := strings.Split(pair, ":")
if len(keyValues) == 2 {
name := keyValues[0]
val := keyValues[1]
f, err := strconv.ParseFloat(val, 64)
if err == nil {
properties[name] = f
} else {
properties[name] = val
}
}
}
} else if d.redisFieldType == REDIS_FIELD_TYPE_JSON {
if err := json.Unmarshal([]byte(str), &properties); err != nil {
continue
}
}
item.AddProperties(properties)
if d.cache != nil {
d.cache.Put(keys[i], properties)
}
}
return nil
}
func (d *FeatureRedisDao) itemFeatureFetchByHash(items []*Item, context *context.RecommendContext, conn redis.Conn, keys []interface{}) error {
if len(d.itemSelectFields) == 0 {
// get all fields
for _, key := range keys {
conn.Send("HGETALL", key)
}
conn.Flush()
for i, key := range keys {
strs, err := redis.Strings(conn.Receive())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureRedisDao\terror=%v", context.RecommendId, err))
continue
}
item := items[i]
properties := make(map[string]interface{})
for i := 0; i < len(strs); i += 2 {
properties[strs[i]] = strs[i+1]
}
item.AddProperties(properties)
if d.cache != nil {
d.cache.Put(key, properties)
}
}
} else {
for _, key := range keys {
var params []interface{}
params = append(params, key)
params = append(params, d.itemSelectFields...)
conn.Send("HMGET", params...)
}
conn.Flush()
for i, key := range keys {
strs, err := redis.Strings(conn.Receive())
if err != nil {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureRedisDao\terror=%v", context.RecommendId, err))
continue
}
item := items[i]
properties := make(map[string]interface{})
for k, val := range strs {
if val == "" {
continue
}
properties[d.itemSelectFields[k].(string)] = val
}
item.AddProperties(properties)
if d.cache != nil {
d.cache.Put(key, properties)
}
}
}
return nil
}