dao/feature_view_igraph_dao.go (263 lines of code) (raw):
package dao
import (
"errors"
"fmt"
"log"
"net/url"
"strings"
"sync"
"time"
aligraph "github.com/aliyun/aliyun-igraph-go-sdk"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/api"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/constants"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/datasource/igraph"
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils"
)
type FeatureViewIGraphDao struct {
UnimplementedFeatureViewDao
igraphClient *aligraph.Client
group string
label string
primaryKeyField string
eventTimeField string
ttl int
fieldMap map[string]string
fieldTypeMap map[string]constants.FSType
reverseFieldMap map[string]string
edgeName string
}
func NewFeatureViewIGraphDao(config DaoConfig) *FeatureViewIGraphDao {
dao := FeatureViewIGraphDao{
group: config.GroupName,
label: config.LabelName,
primaryKeyField: config.PrimaryKeyField,
eventTimeField: config.EventTimeField,
ttl: config.TTL,
fieldMap: config.FieldMap, // igraph name => feature view schema name mapping
fieldTypeMap: config.FieldTypeMap,
reverseFieldMap: make(map[string]string, len(config.FieldMap)), // revserse fieldMap kv, feature view schema name => igraph name mapping
edgeName: config.IgraphEdgeName,
}
client, err := igraph.GetGraphClient(config.IGraphName)
if err != nil {
return nil
}
dao.igraphClient = client.GraphClient
for k, v := range dao.fieldMap {
dao.reverseFieldMap[v] = k
}
return &dao
}
func (d *FeatureViewIGraphDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
var pkeys []string
for _, key := range keys {
if pkey := utils.ToString(key, ""); pkey != "" {
pkeys = append(pkeys, url.QueryEscape(pkey))
}
}
selector := make([]string, 0, len(selectFields))
for _, field := range selectFields {
selector = append(selector, fmt.Sprintf("\"%s\"", d.reverseFieldMap[field]))
}
var queryString string
if len(d.fieldMap) == len(selectFields) {
queryString = fmt.Sprintf("g(\"%s\").V(\"%s\").hasLabel(\"%s\")", d.group, strings.Join(pkeys, ";"), d.label)
} else {
queryString = fmt.Sprintf("g(\"%s\").V(\"%s\").hasLabel(\"%s\").fields(%s)", d.group, strings.Join(pkeys, ";"), d.label, strings.Join(selector, ","))
}
request := aligraph.ReadRequest{
QueryString: queryString,
}
resp, err := d.igraphClient.Read(&request)
if err != nil {
return nil, err
}
result := make([]map[string]interface{}, 0, len(keys))
for _, resultData := range resp.Result {
for _, data := range resultData.Data {
properties := make(map[string]interface{}, len(data))
for field, value := range data {
if field == "label" {
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE, constants.FS_FLOAT:
properties[d.fieldMap[field]] = utils.ToFloat(value, -1024)
case constants.FS_INT32, constants.FS_INT64:
properties[d.fieldMap[field]] = utils.ToInt(value, -1024)
default:
properties[d.fieldMap[field]] = value
}
}
result = append(result, properties)
}
}
return result, nil
}
func (d *FeatureViewIGraphDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
var selectFields []string
if sequenceConfig.PlayTimeField == "" {
selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.TimestampField}
} else {
selectFields = []string{sequenceConfig.ItemIdField, sequenceConfig.EventField, sequenceConfig.PlayTimeField, sequenceConfig.TimestampField}
}
currTime := time.Now().Unix()
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
sequences := []*sequenceInfo{}
events := strings.Split(seqEvent, "|")
var pk string
if len(events) > 1 {
pks := make([]string, len(events))
for i, event := range events {
pks[i] = url.QueryEscape(fmt.Sprintf("%v_%s", key, event))
}
pk = strings.Join(pks, ";")
} else {
pk = url.QueryEscape(fmt.Sprintf("%v_%s", key, seqEvent))
}
queryString := fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr).limit(%d)",
d.group, pk, d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField, seqLen)
request := aligraph.ReadRequest{
QueryString: queryString,
}
resp, err := d.igraphClient.Read(&request)
if err != nil {
log.Println(err)
return nil
}
for _, resultData := range resp.Result {
for _, data := range resultData.Data {
seq := new(sequenceInfo)
for field, value := range data {
if field == "label" {
continue
}
switch field {
case sequenceConfig.EventField:
seq.event = utils.ToString(value, "")
case sequenceConfig.ItemIdField:
seq.itemId = utils.ToString(value, "")
case sequenceConfig.PlayTimeField:
seq.playTime = utils.ToFloat(value, 0)
case sequenceConfig.TimestampField:
seq.timestamp = utils.ToInt64(value, 0)
default:
}
}
if seq.event == "" || seq.itemId == "" {
continue
}
if t, exist := sequencePlayTimeMap[seq.event]; exist {
if seq.playTime <= t {
continue
}
}
sequences = append(sequences, seq)
}
}
return sequences
}
results := make([]map[string]interface{}, 0, len(keys))
var outmu sync.Mutex
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key interface{}) {
defer wg.Done()
properties := make(map[string]interface{})
var mu sync.Mutex
var eventWg sync.WaitGroup
for _, seqConfig := range onlineConfig {
eventWg.Add(1)
go func(seqConfig *api.SeqConfig) {
defer eventWg.Done()
var onlineSequences []*sequenceInfo
var offlineSequences []*sequenceInfo
//get data from edge
if onlineresult := fetchDataFunc(seqConfig.SeqEvent, seqConfig.SeqLen, key); onlineresult != nil {
onlineSequences = onlineresult
}
subproperties := makeSequenceFeatures(offlineSequences, onlineSequences, seqConfig, sequenceConfig, currTime)
mu.Lock()
defer mu.Unlock()
for k, value := range subproperties {
properties[k] = value
}
}(seqConfig)
}
eventWg.Wait()
properties[userIdField] = key
outmu.Lock()
results = append(results, properties)
outmu.Unlock()
}(key)
}
wg.Wait()
return results, nil
}
func (d *FeatureViewIGraphDao) GetUserBehaviorFeature(userIds []interface{}, events []interface{}, selectFields []string, sequenceConfig api.FeatureViewSeqConfig) ([]map[string]interface{}, error) {
if len(events) == 0 {
return []map[string]interface{}{}, errors.New("igraph not support GetBehaviorFeatures with empty events")
}
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
fetchDataFunc := func(userId interface{}) []map[string]interface{} {
var queryString string
var pkeys []string
for _, event := range events {
pkeys = append(pkeys, url.QueryEscape(fmt.Sprintf("%v_%v", userId, event)))
}
queryString = fmt.Sprintf("g(\"%s\").E(\"%s\").hasLabel(\"%s\").fields(\"%s\").order().by(\"%s\",Order.decr)",
d.group, strings.Join(pkeys, ";"), d.edgeName, strings.Join(selectFields, ";"), sequenceConfig.TimestampField)
request := aligraph.ReadRequest{
QueryString: queryString,
}
resp, err := d.igraphClient.Read(&request)
if err != nil {
log.Println(err)
return nil
}
results := []map[string]interface{}{}
for _, resultData := range resp.Result {
for _, data := range resultData.Data {
properties := make(map[string]interface{}, len(data))
for field, value := range data {
if field == "label" ||
field == fmt.Sprintf("%v_%v", d.primaryKeyField, sequenceConfig.EventField) ||
(sequenceConfig.DeduplicationMethodNum == 2 && field == fmt.Sprintf("%v_%v", sequenceConfig.ItemIdField, sequenceConfig.TimestampField)) {
continue
}
switch d.fieldTypeMap[field] {
case constants.FS_DOUBLE, constants.FS_FLOAT:
properties[field] = utils.ToFloat(value, -1024)
case constants.FS_INT32, constants.FS_INT64:
properties[field] = utils.ToInt(value, -1024)
default:
properties[field] = value
}
}
if t, exist := sequencePlayTimeMap[utils.ToString(properties[sequenceConfig.EventField], "")]; exist {
if utils.ToFloat(properties[sequenceConfig.PlayTimeField], 0.0) <= t {
continue
}
}
results = append(results, properties)
}
}
return results
}
results := make([]map[string]interface{}, 0, len(userIds)*(len(events)+1)*50)
var outmu sync.Mutex
var wg sync.WaitGroup
for _, userId := range userIds {
wg.Add(1)
go func(userId interface{}) {
defer wg.Done()
innerresult := fetchDataFunc(userId)
outmu.Lock()
results = append(results, innerresult...)
outmu.Unlock()
}(userId)
}
wg.Wait()
return results, nil
}