domain/model.go (150 lines of code) (raw):

package domain import ( "fmt" "sync" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils" ) type Model struct { *api.Model project *Project featureViewMap map[string]FeatureView featureEntityMap map[string]*FeatureEntity featureNamesMap map[string][]string // featureview : feature names aliasNamesMap map[string]map[string]string // featureview : alias names featureEntityJoinIdMap map[string]map[string]FeatureView // feature entity joinid : featureviews featureEntityJoinIdList []string } func NewModel(model *api.Model, p *Project) *Model { m := &Model{ Model: model, project: p, featureViewMap: make(map[string]FeatureView), featureEntityMap: make(map[string]*FeatureEntity), featureNamesMap: make(map[string][]string), aliasNamesMap: make(map[string]map[string]string), featureEntityJoinIdMap: make(map[string]map[string]FeatureView), } for _, feature := range m.Features { featureView := m.project.GetFeatureView(feature.FeatureViewName) featureEntity := m.project.GetFeatureEntity(featureView.GetFeatureEntityName()) m.featureViewMap[feature.FeatureViewName] = featureView m.featureEntityMap[featureView.GetFeatureEntityName()] = featureEntity m.featureNamesMap[feature.FeatureViewName] = append(m.featureNamesMap[feature.FeatureViewName], featureView.Offline2Online(feature.Name)) if feature.AliasName != "" { aliasMap, ok := m.aliasNamesMap[feature.FeatureViewName] if !ok { aliasMap = make(map[string]string) } aliasMap[feature.Name] = feature.AliasName m.aliasNamesMap[feature.FeatureViewName] = aliasMap } featureViewMap, ok := m.featureEntityJoinIdMap[featureEntity.FeatureEntityJoinid] if !ok { featureViewMap = make(map[string]FeatureView) } featureViewMap[feature.FeatureViewName] = featureView m.featureEntityJoinIdMap[featureEntity.FeatureEntityJoinid] = featureViewMap } for joinid := range m.featureEntityJoinIdMap { m.featureEntityJoinIdList = append(m.featureEntityJoinIdList, joinid) } //fmt.Println(m) return m } func (m *Model) GetOnlineFeatures(joinIds map[string][]interface{}) ([]map[string]interface{}, error) { size := -1 for _, joinid := range m.featureEntityJoinIdList { if _, ok := joinIds[joinid]; !ok { return nil, fmt.Errorf("join id:%s not found", joinid) } if size == -1 { size = len(joinIds[joinid]) } else { if size != len(joinIds[joinid]) { return nil, fmt.Errorf("join id:%s length not equal", joinid) } } } var mu sync.Mutex var wg sync.WaitGroup joinIdFeaturesMap := make(map[string][]map[string]interface{}) for joinId, keys := range joinIds { featureViewMap := m.featureEntityJoinIdMap[joinId] for _, featureView := range featureViewMap { wg.Add(1) go func(featureView FeatureView, joinId string, keys []interface{}) { defer wg.Done() features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()]) if err != nil { fmt.Println(err) } mu.Lock() joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...) mu.Unlock() }(featureView, joinId, keys) } } wg.Wait() featuresResult := make([]map[string]interface{}, 0, size) for i := 0; i < size; i++ { features := make(map[string]interface{}, len(m.Features)+len(m.featureEntityJoinIdMap)) for _, joinid := range m.featureEntityJoinIdList { joinIdValue := joinIds[joinid][i] for _, joinIdFeatures := range joinIdFeaturesMap[joinid] { if utils.ToString(joinIdFeatures[joinid], "") == utils.ToString(joinIdValue, " ") { for k, v := range joinIdFeatures { features[k] = v } } } } featuresResult = append(featuresResult, features) } return featuresResult, nil } func (m *Model) GetOnlineFeaturesWithEntity(joinIds map[string][]interface{}, featureEntityName string) ([]map[string]interface{}, error) { featureEntity, ok := m.featureEntityMap[featureEntityName] if !ok { return nil, fmt.Errorf("feature entity name:%s not found", featureEntityName) } size := -1 if _, ok := joinIds[featureEntity.FeatureEntityJoinid]; !ok { return nil, fmt.Errorf("join id:%s not found", featureEntity.FeatureEntityJoinid) } size = len(joinIds[featureEntity.FeatureEntityJoinid]) var wg sync.WaitGroup joinIdFeaturesMap := make(map[string][]map[string]interface{}) featureViewMap := m.featureEntityJoinIdMap[featureEntity.FeatureEntityJoinid] var mu sync.Mutex for _, featureView := range featureViewMap { wg.Add(1) go func(featureView FeatureView, joinId string, keys []interface{}) { defer wg.Done() features, err := featureView.GetOnlineFeatures(keys, m.featureNamesMap[featureView.GetName()], m.aliasNamesMap[featureView.GetName()]) if err != nil { fmt.Println(err) } mu.Lock() joinIdFeaturesMap[joinId] = append(joinIdFeaturesMap[joinId], features...) mu.Unlock() }(featureView, featureEntity.FeatureEntityJoinid, joinIds[featureEntity.FeatureEntityJoinid]) } wg.Wait() featuresResult := make([]map[string]interface{}, 0, size) for i := 0; i < size; i++ { features := make(map[string]interface{}, len(m.Features)) joinIdValue := joinIds[featureEntity.FeatureEntityJoinid][i] for _, joinIdFeatures := range joinIdFeaturesMap[featureEntity.FeatureEntityJoinid] { if utils.ToString(joinIdFeatures[featureEntity.FeatureEntityJoinid], "") == utils.ToString(joinIdValue, " ") { for k, v := range joinIdFeatures { features[k] = v } } } featuresResult = append(featuresResult, features) } return featuresResult, nil } func (m *Model) GetLabelPriorityLevel() int { return m.LabelPriorityLevel }