domain/sequence_feature_view.go (280 lines of code) (raw):
package domain
import (
"encoding/json"
"errors"
"fmt"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/dao"
)
type SequenceFeatureView struct {
*api.FeatureView
Project *Project
FeatureEntity *FeatureEntity
userIdField string
behaviorFields []string
sequenceConfig api.FeatureViewSeqConfig
featureViewDao dao.FeatureViewDao
offline_2_online_seq_map map[string]string
}
func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity) *SequenceFeatureView {
sequenceFeatureView := &SequenceFeatureView{
FeatureView: view,
Project: p,
FeatureEntity: entity,
}
for _, field := range view.Fields {
if field.IsPrimaryKey {
sequenceFeatureView.userIdField = field.Name
break
}
}
err := json.Unmarshal([]byte(view.Config), &sequenceFeatureView.sequenceConfig)
if err != nil {
panic("sequence featureview config unmarshal failed")
}
if sequenceFeatureView.sequenceConfig.RegistrationMode == "" {
sequenceFeatureView.sequenceConfig.RegistrationMode = constants.Seq_Registration_Mode_Full_Sequence
}
sequenceFeatureView.offline_2_online_seq_map = make(map[string]string, len(sequenceFeatureView.sequenceConfig.SeqConfig))
for _, field := range view.Fields {
if field.IsPartition {
continue
} else {
sequenceFeatureView.behaviorFields = append(sequenceFeatureView.behaviorFields, field.Name)
}
}
if sequenceFeatureView.sequenceConfig.RegistrationMode == constants.Seq_Registration_Mode_Full_Sequence {
for _, seqConfig := range sequenceFeatureView.sequenceConfig.SeqConfig {
sequenceFeatureView.offline_2_online_seq_map[seqConfig.OfflineSeqName] = seqConfig.OnlineSeqName
}
seen := make(map[string]bool)
var uniqueSeqConfigs []*api.SeqConfig
for _, seqConfig := range sequenceFeatureView.sequenceConfig.SeqConfig {
if !seen[seqConfig.OnlineSeqName] {
uniqueSeqConfigs = append(uniqueSeqConfigs, seqConfig)
seen[seqConfig.OnlineSeqName] = true
}
}
sequenceFeatureView.sequenceConfig.SeqConfig = uniqueSeqConfigs
}
requiredElements1 := []string{"user_id", "item_id", "event"}
requiredElements2 := []string{"user_id", "item_id", "event", "timestamp"}
if len(sequenceFeatureView.sequenceConfig.DeduplicationMethod) == len(requiredElements1) {
for i, v := range sequenceFeatureView.sequenceConfig.DeduplicationMethod {
if v != requiredElements1[i] {
panic("deduplication_method invalid")
}
}
sequenceFeatureView.sequenceConfig.DeduplicationMethodNum = 1
} else if len(sequenceFeatureView.sequenceConfig.DeduplicationMethod) == len(requiredElements2) {
for i, v := range sequenceFeatureView.sequenceConfig.DeduplicationMethod {
if v != requiredElements2[i] {
panic("deduplication_method invalid")
}
}
sequenceFeatureView.sequenceConfig.DeduplicationMethodNum = 2
} else {
panic("deduplication_method invalid")
}
daoConfig := dao.DaoConfig{
DatasourceType: p.OnlineDatasourceType,
PrimaryKeyField: sequenceFeatureView.userIdField,
}
if view.WriteToFeatureDB || p.OnlineDatasourceType == constants.Datasource_Type_FeatureDB {
daoConfig.DatasourceType = constants.Datasource_Type_FeatureDB
daoConfig.FeatureDBDatabaseName = p.InstanceId
daoConfig.FeatureDBSchemaName = p.ProjectName
if sequenceFeatureView.sequenceConfig.ReferencedFeatureViewId == 0 {
daoConfig.FeatureDBTableName = sequenceFeatureView.Name
} else {
daoConfig.FeatureDBTableName = sequenceFeatureView.sequenceConfig.ReferencedFeatureViewName
}
daoConfig.FeatureDBSignature = p.Signature
fieldTypeMap := make(map[string]constants.FSType, len(view.Fields))
for _, field := range view.Fields {
if field.IsPartition {
continue
} else {
fieldTypeMap[field.Name] = field.Type
}
}
daoConfig.FieldTypeMap = fieldTypeMap
daoConfig.Fields = sequenceFeatureView.behaviorFields
} else {
if sequenceFeatureView.sequenceConfig.ReferencedFeatureViewId == 0 {
switch p.OnlineDatasourceType {
case constants.Datasource_Type_Hologres:
daoConfig.HologresName = p.OnlineStore.GetDatasourceName()
daoConfig.HologresOfflineTableName = p.OnlineStore.GetSeqOfflineTableName(sequenceFeatureView)
daoConfig.HologresOnlineTableName = p.OnlineStore.GetSeqOnlineTableName(sequenceFeatureView)
case constants.Datasource_Type_TableStore:
daoConfig.TableStoreName = p.OnlineStore.GetDatasourceName()
daoConfig.TableStoreOfflineTableName = p.OnlineStore.GetSeqOfflineTableName(sequenceFeatureView)
daoConfig.TableStoreOnlineTableName = p.OnlineStore.GetSeqOnlineTableName(sequenceFeatureView)
case constants.Datasource_Type_IGraph:
daoConfig.SaveOriginalField = true
daoConfig.IGraphName = p.OnlineStore.GetDatasourceName()
daoConfig.GroupName = p.ProjectName
daoConfig.IgraphEdgeName = p.OnlineStore.GetSeqOnlineTableName(sequenceFeatureView)
fieldTypeMap := make(map[string]constants.FSType, len(view.Fields))
for _, field := range view.Fields {
if field.IsPartition {
continue
} else {
fieldTypeMap[field.Name] = field.Type
}
}
daoConfig.FieldTypeMap = fieldTypeMap
default:
}
} else {
referencedFeatureView := p.GetFeatureView(sequenceFeatureView.sequenceConfig.ReferencedFeatureViewName)
if referencedFeatureView == nil {
panic(fmt.Sprintf("referenced feature view :%s not found", sequenceFeatureView.sequenceConfig.ReferencedFeatureViewName))
}
if referencedFeatureView.GetType() != constants.Feature_View_Type_Sequence {
panic(fmt.Sprintf("referenced feature view :%s is not sequence feature view", sequenceFeatureView.sequenceConfig.ReferencedFeatureViewName))
}
referencedSeqFeatureView := referencedFeatureView.(*SequenceFeatureView)
switch p.OnlineDatasourceType {
case constants.Datasource_Type_Hologres:
daoConfig.HologresName = p.OnlineStore.GetDatasourceName()
daoConfig.HologresOfflineTableName = p.OnlineStore.GetSeqOfflineTableName(referencedSeqFeatureView)
daoConfig.HologresOnlineTableName = p.OnlineStore.GetSeqOnlineTableName(referencedSeqFeatureView)
case constants.Datasource_Type_TableStore:
daoConfig.TableStoreName = p.OnlineStore.GetDatasourceName()
daoConfig.TableStoreOfflineTableName = p.OnlineStore.GetSeqOfflineTableName(referencedSeqFeatureView)
daoConfig.TableStoreOnlineTableName = p.OnlineStore.GetSeqOnlineTableName(referencedSeqFeatureView)
case constants.Datasource_Type_IGraph:
daoConfig.SaveOriginalField = true
daoConfig.IGraphName = p.OnlineStore.GetDatasourceName()
daoConfig.GroupName = p.ProjectName
daoConfig.IgraphEdgeName = p.OnlineStore.GetSeqOnlineTableName(referencedSeqFeatureView)
fieldTypeMap := make(map[string]constants.FSType, len(view.Fields))
for _, field := range view.Fields {
if field.IsPartition {
continue
} else {
fieldTypeMap[field.Name] = field.Type
}
}
daoConfig.FieldTypeMap = fieldTypeMap
default:
}
}
}
featureViewDao := dao.NewFeatureViewDao(daoConfig)
sequenceFeatureView.featureViewDao = featureViewDao
return sequenceFeatureView
}
func (f *SequenceFeatureView) GetOnlineFeatures(joinIds []interface{}, features []string, alias map[string]string) ([]map[string]interface{}, error) {
if f.sequenceConfig.RegistrationMode == constants.Seq_Registration_Mode_Only_Behavior {
return nil, errors.New("only full_sequence registration mode supports GetOnlineFeatures, please use GetBehaviorFeatures")
}
sequenceConfig := f.sequenceConfig
onlineConfig := []*api.SeqConfig{}
seenFields := make(map[string]bool)
for _, feature := range features {
if feature == "*" {
onlineConfig = sequenceConfig.SeqConfig
break
} else {
found := false
for _, seqConfig := range sequenceConfig.SeqConfig {
if seqConfig.OnlineSeqName == feature {
found = true
if !seenFields[feature] {
onlineConfig = append(onlineConfig, seqConfig)
seenFields[feature] = true
}
break
}
}
if !found {
return nil, fmt.Errorf("sequence feature name :%s not found in feature view config", feature)
}
}
}
sequenceFeatureResults, err := f.featureViewDao.GetUserSequenceFeature(joinIds, f.userIdField, sequenceConfig, onlineConfig)
if f.userIdField != f.FeatureEntity.FeatureEntityJoinid {
for _, sequencefeatureMap := range sequenceFeatureResults {
sequencefeatureMap[f.FeatureEntity.FeatureEntityJoinid] = sequencefeatureMap[f.userIdField]
delete(sequencefeatureMap, f.userIdField)
}
}
return sequenceFeatureResults, err
}
func (f *SequenceFeatureView) GetBehaviorFeatures(userIds []interface{}, events []interface{}, features []string) ([]map[string]interface{}, error) {
var selectFields []string
seenFields := make(map[string]bool)
for _, feature := range features {
if feature == "*" {
selectFields = append(selectFields, f.behaviorFields...)
break
} else {
if seenFields[feature] {
continue
}
found := false
for _, field := range f.behaviorFields {
if field == feature {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("behavior feature name :%s not found in feature view config", feature)
}
selectFields = append(selectFields, feature)
seenFields[feature] = true
}
}
behaviorFeatureResult, err := f.featureViewDao.GetUserBehaviorFeature(userIds, events, selectFields, f.sequenceConfig)
if f.userIdField != f.FeatureEntity.FeatureEntityJoinid {
for _, behaviorFeatureMap := range behaviorFeatureResult {
behaviorFeatureMap[f.FeatureEntity.FeatureEntityJoinid] = behaviorFeatureMap[f.userIdField]
delete(behaviorFeatureMap, f.userIdField)
}
}
return behaviorFeatureResult, err
}
func (f *SequenceFeatureView) GetName() string {
return f.Name
}
func (f *SequenceFeatureView) GetFeatureEntityName() string {
return f.FeatureEntityName
}
func (f *SequenceFeatureView) GetType() string {
return f.Type
}
func (f *SequenceFeatureView) Offline2Online(input string) string {
if f.sequenceConfig.RegistrationMode == constants.Seq_Registration_Mode_Only_Behavior {
return input
}
return f.offline_2_online_seq_map[input]
}
func (f *SequenceFeatureView) GetFields() []api.FeatureViewFields {
fields := make([]api.FeatureViewFields, len(f.Fields))
for i, field := range f.Fields {
if field != nil {
fields[i] = *field
}
}
return fields
}
func (f *SequenceFeatureView) GetIsWriteToFeatureDB() bool {
return f.WriteToFeatureDB || f.Project.OnlineDatasourceType == constants.Datasource_Type_FeatureDB
}
func (f *SequenceFeatureView) GetTTL() int {
return f.Ttl
}
func (f *SequenceFeatureView) RowCount(string) int {
return 0
}
func (f *SequenceFeatureView) RowCountIds(string) ([]string, int, error) {
return nil, 0, nil
}
// ScanAndIterateData implements FeatureView.
func (f *SequenceFeatureView) ScanAndIterateData(filter string, ch chan<- string) ([]string, error) {
return nil, errors.New("unimplemented")
}