module/feature_featurestore_dao.go (271 lines of code) (raw):

package module import ( "fmt" "strings" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/persist/fs" "github.com/alibaba/pairec/v2/recconf" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/domain" ) type FeatureFeatureStoreDao struct { *FeatureBaseDao client *fs.FSClient fsModel string fsEntity string fsViewName string userSelectFields string itemSelectFields string } func NewFeatureFeatureStoreDao(config recconf.FeatureDaoConfig) *FeatureFeatureStoreDao { dao := &FeatureFeatureStoreDao{ FeatureBaseDao: NewFeatureBaseDao(&config), fsModel: config.FeatureStoreModelName, fsEntity: config.FeatureStoreEntityName, fsViewName: config.FeatureStoreViewName, userSelectFields: config.UserSelectFields, itemSelectFields: config.ItemSelectFields, } client, err := fs.GetFeatureStoreClient(config.FeatureStoreName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return nil } dao.client = client return dao } func (d *FeatureFeatureStoreDao) FeatureFetch(user *User, items []*Item, context *context.RecommendContext) { if d.featureStore == Feature_Store_User && d.featureType == Feature_Type_Sequence { //d.userSequenceFeatureFetch(user, context) } else if d.featureStore == Feature_Store_User { d.userFeatureFetch(user, context) } else { d.itemsFeatureFetch(items, context) } } func (d *FeatureFeatureStoreDao) userFeatureFetch(user *User, context *context.RecommendContext) { defer func() { if err := recover(); err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=%v", context.RecommendId, err)) return } }() comms := strings.Split(d.featureKey, ":") if len(comms) < 2 { log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey)) return } key := user.StringProperty(comms[1]) if key == "" { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=property not found(%s)", context.RecommendId, comms[1])) return } // hit user cache if d.cache != nil { if cacheValue, ok := d.cache.GetIfPresent(key); ok { if d.cacheFeaturesName != "" { user.AddCacheFeatures(d.cacheFeaturesName, cacheValue.(map[string]interface{})) } else { user.AddProperties(cacheValue.(map[string]interface{})) } if context.Debug { log.Info(fmt.Sprintf("requestId=%s\tmodule=FeatureHologresDao\tmsg=hit cache(%s)", context.RecommendId, key)) } return } } if d.fsViewName != "" { d.doUserFeatureFetchWithFeatureView(user, context, key) } else { d.doUserFeatureFetchWithEntity(user, context, key) } } func (d *FeatureFeatureStoreDao) doUserFeatureFetchWithEntity(user *User, context *context.RecommendContext, key string) { model := d.client.GetProject().GetModel(d.fsModel) if model == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=model not found(%s)", context.RecommendId, d.fsModel)) return } entity := d.client.GetProject().GetFeatureEntity(d.fsEntity) if entity == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=feature entity not found(%s)", context.RecommendId, d.fsEntity)) return } features, err := model.GetOnlineFeaturesWithEntity(map[string][]interface{}{entity.FeatureEntityJoinid: {key}}, d.fsEntity) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=get features error(%s)", context.RecommendId, err)) return } if len(features) == 0 { log.Warning(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=get features empty", context.RecommendId)) return } if model.GetLabelPriorityLevel() == 1 { contextFeatures := context.GetParameter("features") if contextFeatures != nil { if ctxFeatures, ok := contextFeatures.(map[string]any); ok { for k, v := range ctxFeatures { features[0][k] = v } } } } if d.cacheFeaturesName != "" { user.AddCacheFeatures(d.cacheFeaturesName, features[0]) } else { user.AddProperties(features[0]) } if d.cache != nil { d.cache.Put(key, features[0]) } } func (d *FeatureFeatureStoreDao) doUserFeatureFetchWithFeatureView(user *User, context *context.RecommendContext, key string) { featureView := d.client.GetProject().GetFeatureView(d.fsViewName) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=feature view not found(%s)", context.RecommendId, d.fsViewName)) return } var featuresNames []string if d.userSelectFields == "" || d.userSelectFields == "*" { featuresNames = []string{"*"} } else { featuresNames = strings.Split(d.userSelectFields, ",") } features, err := featureView.GetOnlineFeatures([]any{key}, featuresNames, nil) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=get features error(%s)", context.RecommendId, err)) return } if len(features) == 0 { log.Warning(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=get features empty", context.RecommendId)) return } if d.cacheFeaturesName != "" { user.AddCacheFeatures(d.cacheFeaturesName, features[0]) } else { user.AddProperties(features[0]) } if d.cache != nil { d.cache.Put(key, features[0]) } } func (d *FeatureFeatureStoreDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) { defer func() { if err := recover(); err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=%v", context.RecommendId, err)) return } }() if len(items) == 0 { return } fk := d.featureKey if fk != "item:id" { comms := strings.Split(d.featureKey, ":") if len(comms) < 2 { log.Error(fmt.Sprintf("requestId=%s\tevent=itemsFeatureFetch\terror=featureKey error(%s)", context.RecommendId, d.featureKey)) return } fk = comms[1] } var keys []interface{} keysMap := make(map[string]bool) key2Item := make(map[string][]*Item, len(items)) for _, item := range items { var key string if fk == "item:id" { key = string(item.Id) } else { key = item.StringProperty(fk) } if d.cache != nil { if cacheValue, ok := d.cache.GetIfPresent(key); ok { item.AddProperties(cacheValue.(map[string]any)) if context.Debug { item.AddProperty("__debug_cache_hit__", true) } continue } } keysMap[key] = true key2Item[key] = append(key2Item[key], item) } for k := range keysMap { keys = append(keys, k) } if len(keys) == 0 { return } var ( entityJoinId string features []map[string]any err error ) if d.fsViewName != "" { featureView := d.client.GetProject().GetFeatureView(d.fsViewName) if featureView == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=feature view not found(%s)", context.RecommendId, d.fsViewName)) return } features, err = d.doItemsFeatureFetchWithFeatureView(featureView, keys) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=%v", context.RecommendId, err)) return } entityName := featureView.GetFeatureEntityName() entity := d.client.GetProject().GetFeatureEntity(entityName) if entity == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=feature entity not found(%s)", context.RecommendId, entityName)) return } entityJoinId = entity.FeatureEntityJoinid } else { entity := d.client.GetProject().GetFeatureEntity(d.fsEntity) if entity == nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=feature entity not found(%s)", context.RecommendId, d.fsEntity)) return } features, err = d.doItemsFeatureFetchWithEntity(entity, keys) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureFeatureStoreDao\terror=%v", context.RecommendId, err)) return } entityJoinId = entity.FeatureEntityJoinid } for key, itemlist := range key2Item { for i, featureMap := range features { if key == featureMap[entityJoinId] { for _, item := range itemlist { item.AddProperties(featureMap) if d.cache != nil { d.cache.Put(key, featureMap) } } features[i] = features[len(features)-1] features = features[:len(features)-1] } } } } func (d *FeatureFeatureStoreDao) doItemsFeatureFetchWithEntity(entity *domain.FeatureEntity, keys []any) ([]map[string]any, error) { model := d.client.GetProject().GetModel(d.fsModel) if model == nil { return nil, fmt.Errorf("model not found(%s)", d.fsModel) } features, err := model.GetOnlineFeaturesWithEntity(map[string][]interface{}{entity.FeatureEntityJoinid: keys}, d.fsEntity) if err != nil { return nil, err } return features, nil } func (d *FeatureFeatureStoreDao) doItemsFeatureFetchWithFeatureView(featureView domain.FeatureView, keys []any) ([]map[string]any, error) { var featuresNames []string if d.itemSelectFields == "" || d.itemSelectFields == "*" { featuresNames = []string{"*"} } else { featuresNames = strings.Split(d.itemSelectFields, ",") } features, err := featureView.GetOnlineFeatures(keys, featuresNames, nil) if err != nil { return nil, err } return features, nil }