dao/feature_view_tablestore_dao.go (548 lines of code) (raw):

package dao import ( "errors" "fmt" "log" "sort" "strconv" "strings" "sync" "time" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants" fstablestore "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/datasource/tablestore" "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils" "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" ) type FeatureViewTableStoreDao struct { UnimplementedFeatureViewDao tablestoreClient *tablestore.TableStoreClient table string primaryKeyField string eventTimeField string ttl int fieldTypeMap map[string]constants.FSType offlineTable string onlineTable string } func NewFeatureViewTableStoreDao(config DaoConfig) *FeatureViewTableStoreDao { dao := FeatureViewTableStoreDao{ table: config.TableStoreTableName, primaryKeyField: config.PrimaryKeyField, eventTimeField: config.EventTimeField, ttl: config.TTL, fieldTypeMap: config.FieldTypeMap, offlineTable: config.TableStoreOfflineTableName, onlineTable: config.TableStoreOnlineTableName, } client, err := fstablestore.GetTableStoreClient(config.TableStoreName) if err != nil { return nil } dao.tablestoreClient = client.GetClient() return &dao } func (d *FeatureViewTableStoreDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) { result := make([]map[string]interface{}, 0, len(keys)) var wg sync.WaitGroup var mu sync.Mutex for i := 0; i < len(keys); i += 100 { end := i + 100 if end > len(keys) { end = len(keys) } ks := keys[i:end] wg.Add(1) go func(ks []interface{}) { defer wg.Done() batchGetReq := &tablestore.BatchGetRowRequest{} mqCriteria := &tablestore.MultiRowQueryCriteria{} for _, key := range ks { pkToGet := new(tablestore.PrimaryKey) if d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT64 || d.fieldTypeMap[d.primaryKeyField] == constants.FS_INT32 { if v, ok := key.(int64); ok { pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, v) } else { s := fmt.Sprintf("%v", key) i, _ := strconv.ParseInt(s, 10, 64) pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, i) } } else if d.fieldTypeMap[d.primaryKeyField] == constants.FS_STRING { pkToGet.AddPrimaryKeyColumn(d.primaryKeyField, key) } else { log.Println(errors.New("primary key type is not supported by TableStore")) return } mqCriteria.AddRow(pkToGet) mqCriteria.MaxVersion = 1 mqCriteria.ColumnsToGet = selectFields } mqCriteria.TableName = d.table batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) batchGetResponse, err := d.tablestoreClient.BatchGetRow(batchGetReq) if err != nil { log.Println(err) return } for _, rowResults := range batchGetResponse.TableToRowsResult { for _, rowResult := range rowResults { if rowResult.Error.Message != "" { log.Println(errors.New(rowResult.Error.Message)) return } if rowResult.PrimaryKey.PrimaryKeys == nil { continue } newMap := make(map[string]interface{}) for _, pkValue := range rowResult.PrimaryKey.PrimaryKeys { newMap[pkValue.ColumnName] = pkValue.Value } for _, rowValue := range rowResult.Columns { newMap[rowValue.ColumnName] = rowValue.Value } mu.Lock() result = append(result, newMap) mu.Unlock() } } }(ks) } wg.Wait() return result, nil } func (d *FeatureViewTableStoreDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) { currTime := time.Now().Unix() sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter) pkField := fmt.Sprintf("%s_%s", userIdField, sequenceConfig.EventField) var skField string if sequenceConfig.DeduplicationMethodNum == 1 { skField = sequenceConfig.ItemIdField } else if sequenceConfig.DeduplicationMethodNum == 2 { skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField) } onlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo { sequences := []*sequenceInfo{} var ots_mu sync.Mutex var ots_wg sync.WaitGroup events := strings.Split(seqEvent, "|") for _, event := range events { ots_wg.Add(1) go func(event string) { defer ots_wg.Done() getRangeRequest := &tablestore.GetRangeRequest{} rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} rangeRowQueryCriteria.TableName = tableName startPK := new(tablestore.PrimaryKey) startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) startPK.AddPrimaryKeyColumnWithMinValue(skField) endPK := new(tablestore.PrimaryKey) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) endPK.AddPrimaryKeyColumnWithMaxValue(skField) rangeRowQueryCriteria.StartPrimaryKey = startPK rangeRowQueryCriteria.EndPrimaryKey = endPK rangeRowQueryCriteria.Direction = tablestore.FORWARD if sequenceConfig.PlayTimeField == "" { rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField} } else { rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField} } timeRange := new(tablestore.TimeRange) timeRange.End = currTime * 1000 timeRange.Start = (currTime - 86400*5) * 1000 rangeRowQueryCriteria.TimeRange = timeRange getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest) for { if err != nil { fmt.Println("get range failed with error:", err) } for _, row := range getRangeResp.Rows { if row.PrimaryKey.PrimaryKeys == nil { continue } seq := new(sequenceInfo) if sequenceConfig.DeduplicationMethodNum == 1 { seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "") } for _, column := range row.Columns { switch column.ColumnName { case sequenceConfig.EventField: seq.event = utils.ToString(column.Value, "") case sequenceConfig.ItemIdField: seq.itemId = utils.ToString(column.Value, "") case sequenceConfig.PlayTimeField: seq.playTime = utils.ToFloat(column.Value, 0) case sequenceConfig.TimestampField: seq.timestamp = utils.ToInt64(column.Value, 0) } } if seq.event == "" || seq.itemId == "" { continue } if t, exist := sequencePlayTimeMap[seq.event]; exist { if seq.playTime <= t { continue } } ots_mu.Lock() sequences = append(sequences, seq) ots_mu.Unlock() } if getRangeResp.NextStartPrimaryKey == nil { break } else { getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest) } } }(event) } ots_wg.Wait() // add seqLen limit sort.Slice(sequences, func(i, j int) bool { return sequences[i].timestamp > sequences[j].timestamp }) limit := seqLen if seqLen > len(sequences) { limit = len(sequences) } resultSequences := sequences[:limit] return resultSequences } offlineFetchDataFunc := func(seqEvent string, seqLen int, key interface{}, tableName string) []*sequenceInfo { sequences := []*sequenceInfo{} var ots_mu sync.Mutex var ots_wg sync.WaitGroup events := strings.Split(seqEvent, "|") for _, event := range events { ots_wg.Add(1) go func(event string) { defer ots_wg.Done() getRangeRequest := &tablestore.GetRangeRequest{} rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} rangeRowQueryCriteria.TableName = tableName startPK := new(tablestore.PrimaryKey) startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) startPK.AddPrimaryKeyColumnWithMinValue(skField) endPK := new(tablestore.PrimaryKey) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%s", key, event)) endPK.AddPrimaryKeyColumnWithMaxValue(skField) rangeRowQueryCriteria.StartPrimaryKey = startPK rangeRowQueryCriteria.EndPrimaryKey = endPK rangeRowQueryCriteria.Direction = tablestore.FORWARD if sequenceConfig.PlayTimeField == "" { rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField} } else { rangeRowQueryCriteria.ColumnsToGet = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField} } rangeRowQueryCriteria.MaxVersion = 1 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest) for { if err != nil { fmt.Println("get range failed with error:", err) } for _, row := range getRangeResp.Rows { if row.PrimaryKey.PrimaryKeys == nil { continue } seq := new(sequenceInfo) if sequenceConfig.DeduplicationMethodNum == 1 { seq.itemId = utils.ToString(row.PrimaryKey.PrimaryKeys[1].Value, "") } for _, column := range row.Columns { switch column.ColumnName { case sequenceConfig.EventField: seq.event = utils.ToString(column.Value, "") case sequenceConfig.ItemIdField: seq.itemId = utils.ToString(column.Value, "") case sequenceConfig.PlayTimeField: seq.playTime = utils.ToFloat(column.Value, 0) case sequenceConfig.TimestampField: seq.timestamp = utils.ToInt64(column.Value, 0) } } if seq.event == "" || seq.itemId == "" { continue } if t, exist := sequencePlayTimeMap[seq.event]; exist { if seq.playTime <= t { continue } } ots_mu.Lock() sequences = append(sequences, seq) ots_mu.Unlock() } if getRangeResp.NextStartPrimaryKey == nil { break } else { getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest) } } }(event) } ots_wg.Wait() // add seqLen limit sort.Slice(sequences, func(i, j int) bool { return sequences[i].timestamp > sequences[j].timestamp }) limit := seqLen if seqLen > len(sequences) { limit = len(sequences) } resultSequences := sequences[:limit] return resultSequences } results := make([]map[string]interface{}, 0, len(keys)) var outmu sync.Mutex var wg sync.WaitGroup for _, key := range keys { wg.Add(1) go func(key interface{}) { defer wg.Done() properties := make(map[string]interface{}) var mu sync.Mutex var eventWg sync.WaitGroup for _, seqConfig := range onlineConfig { eventWg.Add(1) go func(seqConfig *api.SeqConfig) { defer eventWg.Done() var onlineSequences []*sequenceInfo var offlineSequences []*sequenceInfo var innerWg sync.WaitGroup //get data from online table innerWg.Add(1) go func(seqEvent string, seqLen int, key interface{}) { defer innerWg.Done() if onlineresult := onlineFetchDataFunc(seqEvent, seqLen, key, d.onlineTable); onlineresult != nil { onlineSequences = onlineresult } }(seqConfig.SeqEvent, seqConfig.SeqLen, key) //get data from offline table innerWg.Add(1) go func(seqEvent string, seqLen int, key interface{}) { defer innerWg.Done() if offlineresult := offlineFetchDataFunc(seqEvent, seqLen, key, d.offlineTable); offlineresult != nil { offlineSequences = offlineresult } }(seqConfig.SeqEvent, seqConfig.SeqLen, key) innerWg.Wait() subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime) mu.Lock() defer mu.Unlock() for k, value := range subproperties { properties[k] = value } }(seqConfig) } eventWg.Wait() properties[userIdField] = key outmu.Lock() results = append(results, properties) outmu.Unlock() }(key) } wg.Wait() return results, nil } func (d *FeatureViewTableStoreDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) { currTime := time.Now().Unix() sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter) pkField := fmt.Sprintf("%s_%s", d.primaryKeyField, sequenceConfig.EventField) var skField string if sequenceConfig.DeduplicationMethodNum == 1 { skField = sequenceConfig.ItemIdField } else if sequenceConfig.DeduplicationMethodNum == 2 { skField = fmt.Sprintf("%s_%s", sequenceConfig.ItemIdField, sequenceConfig.TimestampField) } onlineFunc := func(userId interface{}, event interface{}) []map[string]interface{} { getRangeRequest := &tablestore.GetRangeRequest{} rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} rangeRowQueryCriteria.TableName = d.onlineTable startPK := new(tablestore.PrimaryKey) endPK := new(tablestore.PrimaryKey) if event == nil { startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v", userId)) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%va", userId)) } else { startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event)) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event)) } startPK.AddPrimaryKeyColumnWithMinValue(skField) endPK.AddPrimaryKeyColumnWithMaxValue(skField) rangeRowQueryCriteria.StartPrimaryKey = startPK rangeRowQueryCriteria.EndPrimaryKey = endPK rangeRowQueryCriteria.Direction = tablestore.FORWARD rangeRowQueryCriteria.ColumnsToGet = selectFields timeRange := new(tablestore.TimeRange) timeRange.End = currTime * 1000 timeRange.Start = (currTime - 86400*5) * 1000 rangeRowQueryCriteria.TimeRange = timeRange getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest) results := []map[string]interface{}{} for { if err != nil { fmt.Println("get range failed with error:", err) } for _, row := range getRangeResp.Rows { if row.PrimaryKey.PrimaryKeys == nil { continue } newMap := make(map[string]interface{}, len(selectFields)) if sequenceConfig.DeduplicationMethodNum == 1 { newMap[sequenceConfig.ItemIdField] = row.PrimaryKey.PrimaryKeys[1].Value } for _, column := range row.Columns { newMap[column.ColumnName] = column.Value } if t, exist := sequencePlayTimeMap[utils.ToString(newMap[sequenceConfig.EventField], "")]; exist { if utils.ToFloat(newMap[sequenceConfig.PlayTimeField], 0.0) <= t { continue } } results = append(results, newMap) } if getRangeResp.NextStartPrimaryKey == nil { break } else { getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest) } } return results } offlineFunc := func(userId interface{}, event interface{}) []map[string]interface{} { getRangeRequest := &tablestore.GetRangeRequest{} rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{} rangeRowQueryCriteria.TableName = d.offlineTable startPK := new(tablestore.PrimaryKey) endPK := new(tablestore.PrimaryKey) if event == nil { startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v", userId)) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%va", userId)) } else { startPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event)) endPK.AddPrimaryKeyColumn(pkField, fmt.Sprintf("%v_%v", userId, event)) } startPK.AddPrimaryKeyColumnWithMinValue(skField) endPK.AddPrimaryKeyColumnWithMaxValue(skField) rangeRowQueryCriteria.StartPrimaryKey = startPK rangeRowQueryCriteria.EndPrimaryKey = endPK rangeRowQueryCriteria.Direction = tablestore.FORWARD rangeRowQueryCriteria.ColumnsToGet = selectFields rangeRowQueryCriteria.MaxVersion = 1 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria getRangeResp, err := d.tablestoreClient.GetRange(getRangeRequest) if err != nil { fmt.Println("get range failed with error:", err) return nil } results := []map[string]interface{}{} for { if err != nil { fmt.Println("get range failed with error:", err) } for _, row := range getRangeResp.Rows { if row.PrimaryKey.PrimaryKeys == nil { continue } newMap := make(map[string]interface{}, len(selectFields)) if sequenceConfig.DeduplicationMethodNum == 1 { newMap[sequenceConfig.ItemIdField] = row.PrimaryKey.PrimaryKeys[1].Value } for _, column := range row.Columns { newMap[column.ColumnName] = column.Value } if t, exist := sequencePlayTimeMap[utils.ToString(newMap[sequenceConfig.EventField], "")]; exist { if utils.ToFloat(newMap[sequenceConfig.PlayTimeField], 0.0) <= t { continue } } results = append(results, newMap) } if getRangeResp.NextStartPrimaryKey == nil { break } else { getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey getRangeResp, err = d.tablestoreClient.GetRange(getRangeRequest) } } return results } results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50) var outmu sync.Mutex var wg sync.WaitGroup for _, userId := range userIds { wg.Add(1) go func(userId interface{}) { defer wg.Done() if len(events) == 0 { var innerWg sync.WaitGroup var offlineResult []map[string]interface{} var onlineResult []map[string]interface{} // offline table innerWg.Add(1) go func() { defer innerWg.Done() offlineResult = offlineFunc(userId, nil) }() // online table innerWg.Add(1) go func() { defer innerWg.Done() onlineResult = onlineFunc(userId, nil) }() innerWg.Wait() if offlineResult == nil || onlineResult == nil { fmt.Println("get user behavior feature failed") return } combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField) sort.Slice(combinedResult, func(i, j int) bool { return utils.ToInt64(combinedResult[i][sequenceConfig.TimestampField], 0) > utils.ToInt64(combinedResult[j][sequenceConfig.TimestampField], 0) }) outmu.Lock() results = append(results, combinedResult...) outmu.Unlock() } else { var mu sync.Mutex var eventWg sync.WaitGroup innerResults := make([]map[string]interface{}, 0, len(events)*50) for _, event := range events { eventWg.Add(1) go func(event interface{}) { defer eventWg.Done() var offlineResult []map[string]interface{} var onlineResult []map[string]interface{} var innerWg sync.WaitGroup // offline table innerWg.Add(1) go func() { defer innerWg.Done() offlineResult = offlineFunc(userId, event) }() // online table innerWg.Add(1) go func() { defer innerWg.Done() onlineResult = onlineFunc(userId, event) }() innerWg.Wait() if offlineResult == nil || onlineResult == nil { fmt.Println("get user behavior feature failed") return } combinedResult := combineBehaviorFeatures(offlineResult, onlineResult, sequenceConfig.TimestampField) mu.Lock() innerResults = append(innerResults, combinedResult...) mu.Unlock() }(event) } eventWg.Wait() sort.Slice(innerResults, func(i, j int) bool { return utils.ToInt64(innerResults[i][sequenceConfig.TimestampField], 0) > utils.ToInt64(innerResults[j][sequenceConfig.TimestampField], 0) }) outmu.Lock() results = append(results, innerResults...) outmu.Unlock() } }(userId) } wg.Wait() return results, nil }