domain/base_feature_view.go (214 lines of code) (raw):

package domain import ( "encoding/json" "errors" "fmt" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/dao" ) type BaseFeatureView struct { *api.FeatureView Project *Project FeatureEntity *FeatureEntity featureFields []string primaryKeyField api.FeatureViewFields eventTimeField api.FeatureViewFields featureViewDao dao.FeatureViewDao } func NewBaseFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity) *BaseFeatureView { featureView := &BaseFeatureView{ FeatureView: view, Project: p, FeatureEntity: entity, } for _, field := range view.Fields { if field.IsEventTime { featureView.eventTimeField = *field featureView.featureFields = append(featureView.featureFields, field.Name) } else if field.IsPartition { continue } else if field.IsPrimaryKey { featureView.primaryKeyField = *field } else { featureView.featureFields = append(featureView.featureFields, field.Name) } } daoConfig := dao.DaoConfig{ DatasourceType: p.OnlineDatasourceType, PrimaryKeyField: featureView.primaryKeyField.Name, EventTimeField: featureView.eventTimeField.Name, TTL: int(featureView.Ttl), SaveOriginalField: false, } if view.WriteToFeatureDB || p.OnlineDatasourceType == constants.Datasource_Type_FeatureDB { daoConfig.DatasourceType = constants.Datasource_Type_FeatureDB daoConfig.FeatureDBDatabaseName = p.InstanceId daoConfig.FeatureDBSchemaName = p.ProjectName daoConfig.FeatureDBTableName = featureView.Name daoConfig.FeatureDBSignature = p.Signature fieldTypeMap := make(map[string]constants.FSType, len(view.Fields)) for _, field := range view.Fields { if field.IsPartition { continue } else { fieldTypeMap[field.Name] = field.Type } } daoConfig.FieldTypeMap = fieldTypeMap daoConfig.Fields = featureView.featureFields } else { switch p.OnlineDatasourceType { case constants.Datasource_Type_Hologres: daoConfig.HologresTableName = p.OnlineStore.GetTableName(featureView) daoConfig.HologresName = p.OnlineStore.GetDatasourceName() case constants.Datasource_Type_IGraph: if view.Config != "" { configM := make(map[string]interface{}) if err := json.Unmarshal([]byte(view.Config), &configM); err == nil { if save_original_field, exist := configM["save_original_field"]; exist { if val, ok := save_original_field.(bool); ok { daoConfig.SaveOriginalField = val } } } } daoConfig.IGraphName = p.OnlineStore.GetDatasourceName() daoConfig.GroupName = p.ProjectName daoConfig.LabelName = p.OnlineStore.GetTableName(featureView) fieldMap := make(map[string]string, len(view.Fields)) fieldTypeMap := make(map[string]constants.FSType, len(view.Fields)) for _, field := range view.Fields { if field.IsPrimaryKey { fieldMap[field.Name] = field.Name fieldTypeMap[field.Name] = field.Type } else if field.IsPartition { continue } else { var name string if daoConfig.SaveOriginalField { name = field.Name } else { name = fmt.Sprintf("f%d", field.Position) } fieldMap[name] = field.Name fieldTypeMap[name] = field.Type } } daoConfig.FieldMap = fieldMap daoConfig.FieldTypeMap = fieldTypeMap case constants.Datasource_Type_TableStore: daoConfig.TableStoreTableName = p.OnlineStore.GetTableName(featureView) daoConfig.TableStoreName = p.OnlineStore.GetDatasourceName() fieldTypeMap := make(map[string]constants.FSType, len(view.Fields)) for _, field := range view.Fields { if field.IsPrimaryKey { fieldTypeMap[field.Name] = field.Type } else if field.IsPartition { continue } else { fieldTypeMap[field.Name] = field.Type } } daoConfig.FieldTypeMap = fieldTypeMap default: } } featureViewDao := dao.NewFeatureViewDao(daoConfig) featureView.featureViewDao = featureViewDao return featureView } func (f *BaseFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) { var selectFields []string selectFields = append(selectFields, f.primaryKeyField.Name) seenFields := make(map[string]bool) seenFields[f.primaryKeyField.Name] = true for _, featureName := range features { if featureName == "*" { selectFields = append(selectFields, f.featureFields...) } else { if seenFields[featureName] { continue } found := false for _, field := range f.featureFields { if field == featureName { found = true break } } if !found { return nil, fmt.Errorf("feature name :%s not found in the featureview fields", featureName) } selectFields = append(selectFields, featureName) seenFields[featureName] = true } } for featureName := range alias { found := false for _, field := range f.featureFields { if field == featureName { found = true break } } if !found { return nil, fmt.Errorf("feature name :%s not found in the featureview fields", featureName) } } featureResult, err := f.featureViewDao.GetFeatures(joinIds, selectFields) if f.primaryKeyField.Name != f.FeatureEntity.FeatureEntityJoinid { for _, featureMap := range featureResult { featureMap[f.FeatureEntity.FeatureEntityJoinid] = featureMap[f.primaryKeyField.Name] delete(featureMap, f.primaryKeyField.Name) } } for featureName, aliasName := range alias { for _, featureMap := range featureResult { if _, ok := featureMap[featureName]; ok { featureMap[aliasName] = featureMap[featureName] delete(featureMap, featureName) } } } return featureResult, err } func (f *BaseFeatureView) GetBehaviorFeatures(userIds []interface{}, events []interface{}, features []string) ([]map[string]interface{}, error) { return nil, errors.New("only sequence feature view supports GetBehaviorFeatures") } func (f *BaseFeatureView) GetName() string { return f.Name } func (f *BaseFeatureView) GetFeatureEntityName() string { return f.FeatureEntityName } func (f *BaseFeatureView) GetType() string { return f.Type } func (f *BaseFeatureView) Offline2Online(input string) string { return input } func (f *BaseFeatureView) GetFields() []api.FeatureViewFields { fields := make([]api.FeatureViewFields, len(f.Fields)) for i, field := range f.Fields { if field != nil { fields[i] = *field } } return fields } func (f *BaseFeatureView) GetIsWriteToFeatureDB() bool { return f.WriteToFeatureDB || f.Project.OnlineDatasourceType == constants.Datasource_Type_FeatureDB } func (f *BaseFeatureView) GetTTL() int { return f.Ttl } func (f *BaseFeatureView) RowCount(expr string) int { return f.featureViewDao.RowCount(expr) } // RowCountIds implements FeatureView. func (f *BaseFeatureView) RowCountIds(expr string) ([]string, int, error) { return f.featureViewDao.RowCountIds(expr) } // ScanAndIterateData implements FeatureView. func (f *BaseFeatureView) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) { return f.featureViewDao.ScanAndIterateData(filter, ch) }