dao/feature_view_dao.go (120 lines of code) (raw):

package dao import ( "fmt" "strconv" "strings" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils" ) type FeatureViewDao interface { GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) RowCount(string) int RowCountIds(string) ([]string, int, error) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) } type UnimplementedFeatureViewDao struct { } func (d *UnimplementedFeatureViewDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) { return nil, nil } func (d *UnimplementedFeatureViewDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) { return nil, nil } func (d *UnimplementedFeatureViewDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) { return nil, nil } func (d *UnimplementedFeatureViewDao) RowCount(string) int { return 0 } func (d *UnimplementedFeatureViewDao) RowCountIds(string) ([]string, int, error) { return nil, 0, nil } func (d *UnimplementedFeatureViewDao) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) { return nil, nil } func NewFeatureViewDao(config DaoConfig) FeatureViewDao { if config.DatasourceType == constants.Datasource_Type_Hologres { return NewFeatureViewHologresDao(config) } else if config.DatasourceType == constants.Datasource_Type_Mysql { return NewFeatureViewMysqlDao(config) } else if config.DatasourceType == constants.Datasource_Type_IGraph { return NewFeatureViewIGraphDao(config) } else if config.DatasourceType == constants.Datasource_Type_Redis { return NewFeatureViewRedisDao(config) } else if config.DatasourceType == constants.Datasource_Type_TableStore { return NewFeatureViewTableStoreDao(config) } else if config.DatasourceType == constants.Datasource_Type_FeatureDB { return NewFeatureViewFeatureDBDao(config) } panic("not found FeatureViewDao implement") } func makePlayTimeMap(playTimeFilter string) map[string]float64 { sequencePlayTimeMap := make(map[string]float64) if playTimeFilter != "" { playTimes := strings.Split(playTimeFilter, ";") for _, eventTime := range playTimes { strs := strings.Split(eventTime, ":") if len(strs) == 2 { if t, err := strconv.ParseFloat(strs[1], 64); err == nil { sequencePlayTimeMap[strs[0]] = t } } } } return sequencePlayTimeMap } func makeSequenceFeatures(offlineSequences, onlineSequences []*sequenceInfo, seqConfig *api.SeqConfig, sequenceConfig api.FeatureViewSeqConfig, currTime int64) map[string]interface{} { //combine offlineSequences and onlineSequences if len(offlineSequences) > 0 { index := 0 for index < len(onlineSequences) { if onlineSequences[index].timestamp < offlineSequences[0].timestamp { break } index++ } onlineSequences = onlineSequences[:index] onlineSequences = append(onlineSequences, offlineSequences...) if len(onlineSequences) > seqConfig.SeqLen { onlineSequences = onlineSequences[:seqConfig.SeqLen] } } //produce seqeunce feature correspond to easyrec processor sequencesValueMap := make(map[string][]string) sequenceMap := make(map[string]bool, 0) for _, seq := range onlineSequences { key := fmt.Sprintf("%s#%s", seq.itemId, seq.event) if _, exist := sequenceMap[key]; !exist { sequenceMap[key] = true sequencesValueMap[sequenceConfig.ItemIdField] = append(sequencesValueMap[sequenceConfig.ItemIdField], seq.itemId) sequencesValueMap[sequenceConfig.TimestampField] = append(sequencesValueMap[sequenceConfig.TimestampField], fmt.Sprintf("%d", seq.timestamp)) sequencesValueMap[sequenceConfig.EventField] = append(sequencesValueMap[sequenceConfig.EventField], seq.event) if sequenceConfig.PlayTimeField != "" { sequencesValueMap[sequenceConfig.PlayTimeField] = append(sequencesValueMap[sequenceConfig.PlayTimeField], fmt.Sprintf("%.2f", seq.playTime)) } sequencesValueMap["ts"] = append(sequencesValueMap["ts"], fmt.Sprintf("%d", currTime-seq.timestamp)) } } properties := make(map[string]interface{}) for key, value := range sequencesValueMap { curSequenceSubName := (seqConfig.OnlineSeqName + "__" + key) properties[curSequenceSubName] = strings.Join(value, ";") } properties[seqConfig.OnlineSeqName] = strings.Join(sequencesValueMap[sequenceConfig.ItemIdField], ";") return properties } func combineBehaviorFeatures(offlineBehaviorInfo, onlineBehaviorInfo []map[string]interface{}, timestampField string) []map[string]interface{} { // combine offline and online features if len(offlineBehaviorInfo) > 0 { index := 0 for index < len(onlineBehaviorInfo) { if utils.ToInt64(onlineBehaviorInfo[index][timestampField], 0) < utils.ToInt64(offlineBehaviorInfo[0][timestampField], 0) { break } index++ } onlineBehaviorInfo = onlineBehaviorInfo[:index] onlineBehaviorInfo = append(onlineBehaviorInfo, offlineBehaviorInfo...) } return onlineBehaviorInfo }