module/feature_be_dao.go (556 lines of code) (raw):

package module import ( "fmt" "runtime/debug" "sort" "strings" "time" be "github.com/aliyun/aliyun-be-go-sdk" "github.com/alibaba/pairec/v2/context" "github.com/alibaba/pairec/v2/datasource/beengine" "github.com/alibaba/pairec/v2/log" "github.com/alibaba/pairec/v2/recconf" "github.com/alibaba/pairec/v2/utils" ) type RTCntField struct { fieldNames []string fieldDelims []string } type userRtCnt struct { key string cnt int32 timestamp int64 } type FeatureBeDao struct { *FeatureBaseDao hasPlayTimeField bool beClient *be.Client bizName string beRecallName string userFeatureKeyName string itemFeatureKeyName string timestampFeatureKeyName string eventFeatureKeyName string playTimeFeatureKeyName string tsFeatureKeyName string beItemFeatureKeyName string beTimestampFeatureKeyName string beEventFeatureKeyName string bePlayTimeFeatureKeyName string // such as "is_home", indicates whether it is // the current recommendation scene beIsHomeField string beRTCntFields []RTCntField beRTTable string rtCntMaxKey int rtCntWins []int // rtCnt feature window delay rtCntWinDelay int64 // user__style_id_rt_clk_30m // user__%s_rt_%s_%s outRTCntFeaPattern string // user__style_id_rt_home_clk_30m // user__%s_rt_home_%s_%s outHomeRTCntFeaPattern string outRTCntFieldAlias []string outRTCntWinNames []string outEventName string } func NewFeatureBeDao(config recconf.FeatureDaoConfig) *FeatureBeDao { var rtCntWins []int if config.FeatureType == Feature_Type_RT_Cnt { rtCntWinToks := strings.Split(config.RTCntWins, ",") for _, winTok := range rtCntWinToks { tWin := utils.ToInt(winTok, -1) if tWin <= 0 { log.Error(fmt.Sprintf("invalid rtCntWins: %s", winTok)) return nil } rtCntWins = append(rtCntWins, tWin) } } var beRTCntFields []RTCntField var outRTCntFieldAlias []string if config.FeatureType == Feature_Type_RT_Cnt { if len(config.BeRTCntFieldInfo) > 0 { for fieldId, fieldInfo := range config.BeRTCntFieldInfo { tmpToks := fieldInfo.FieldNames for tid := range tmpToks { tmpToks[tid] = strings.TrimSpace(tmpToks[tid]) } var delimToks []string if len(fieldInfo.Delims) > 0 { delimToks = fieldInfo.Delims } else { for i := 0; i < len(tmpToks); i++ { delimToks = append(delimToks, "") } } if len(delimToks) != len(tmpToks) { log.Error(fmt.Sprintf("len(fieldToks) != len(delimToks): %d vs %d, fieldId=%d, fieldTok=%s, delimStr=%s", len(tmpToks), len(delimToks), fieldId, strings.Join(fieldInfo.FieldNames, ":"), strings.Join(fieldInfo.Delims, ":"))) return nil } beRTCntFields = append(beRTCntFields, RTCntField{tmpToks, delimToks}) tmpAlias := strings.TrimSpace(fieldInfo.Alias) if len(tmpAlias) == 0 { tmpAlias = tmpToks[0] } outRTCntFieldAlias = append(outRTCntFieldAlias, tmpAlias) } } else { // [deprecicated], to be compatible with existing config rtCntFields := strings.Split(config.BeRTCntFields, ",") for _, fieldTok := range rtCntFields { tmpToks := strings.Split(fieldTok, ":") for tid := range tmpToks { tmpToks[tid] = strings.TrimSpace(tmpToks[tid]) } delimToks := make([]string, len(tmpToks)) beRTCntFields = append(beRTCntFields, RTCntField{tmpToks, delimToks}) } tAliasArr := strings.Split(config.OutRTCntFieldAlias, ",") for _, tmpAlias := range tAliasArr { outRTCntFieldAlias = append(outRTCntFieldAlias, strings.TrimSpace(tmpAlias)) } if len(beRTCntFields) != len(outRTCntFieldAlias) { log.Error(fmt.Sprintf("len(beRTCntFields) != len(outRTCntFieldAlias): %d vs %d", len(beRTCntFields), len(outRTCntFieldAlias))) return nil } } } dao := &FeatureBeDao{ FeatureBaseDao: NewFeatureBaseDao(&config), bizName: config.BizName, beRecallName: "sequence_feature", userFeatureKeyName: config.UserFeatureKeyName, itemFeatureKeyName: config.ItemFeatureKeyName, timestampFeatureKeyName: config.TimestampFeatureKeyName, eventFeatureKeyName: config.EventFeatureKeyName, playTimeFeatureKeyName: config.PlayTimeFeatureKeyName, tsFeatureKeyName: config.TsFeatureKeyName, hasPlayTimeField: true, beItemFeatureKeyName: config.BeItemFeatureKeyName, beTimestampFeatureKeyName: config.BeTimestampFeatureKeyName, beEventFeatureKeyName: config.BeEventFeatureKeyName, bePlayTimeFeatureKeyName: config.BePlayTimeFeatureKeyName, beIsHomeField: strings.TrimSpace(config.BeIsHomeField), beRTCntFields: beRTCntFields, beRTTable: strings.TrimSpace(config.BeRTTable), rtCntWins: rtCntWins, rtCntMaxKey: utils.ToInt(config.RTCntMaxKey, 100), rtCntWinDelay: utils.ToInt64(config.RTCntWinDelay, 5), outRTCntFeaPattern: strings.TrimSpace(config.OutRTCntFeaPattern), outHomeRTCntFeaPattern: strings.TrimSpace(config.OutHomeRTCntFeaPattern), outRTCntWinNames: strings.Split(config.OutRTCntWinNames, ","), outRTCntFieldAlias: outRTCntFieldAlias, outEventName: strings.TrimSpace(config.OutEventName), } if config.BeRecallName != "" { dao.beRecallName = config.BeRecallName } if dao.featureType == Feature_Type_RT_Cnt { for tid := range dao.outRTCntWinNames { dao.outRTCntWinNames[tid] = strings.TrimSpace( dao.outRTCntWinNames[tid]) } if len(dao.rtCntWins) != len(dao.outRTCntWinNames) { log.Error(fmt.Sprintf("len(rtCntWins)[%d] != len(outRTCntWinNames)[%d]", len(dao.rtCntWins), len(dao.outRTCntWinNames))) return nil } if len(dao.beRTCntFields) != len(dao.outRTCntFieldAlias) { log.Error(fmt.Sprintf("len(beRTCntFields)[%d] != len(outRTCntFieldAlias)[%d]", len(dao.beRTCntFields), len(dao.outRTCntFieldAlias))) return nil } log.Info(fmt.Sprintf("RTCntConfig:beIsHomeField=%s", dao.beIsHomeField)) log.Info(fmt.Sprintf("RTCntConfig:beRTTable=%s", dao.beRTTable)) log.Info(fmt.Sprintf("RTCntConfig:rtCntWinDelay=%d", dao.rtCntWinDelay)) log.Info(fmt.Sprintf("RTCntConfig:rtCntMaxKey=%d", dao.rtCntMaxKey)) } client, err := beengine.GetBeClient(config.BeName) if err != nil { log.Error(fmt.Sprintf("error=%v", err)) return nil } dao.beClient = client.BeClient if config.NoUsePlayTimeField { dao.hasPlayTimeField = false } return dao } func (d *FeatureBeDao) FeatureFetch(user *User, items []*Item, context *context.RecommendContext) { if d.featureStore == Feature_Store_User && d.featureType == Feature_Type_Sequence { d.userSequenceFeatureFetch(user, context) } else if d.featureStore == Feature_Store_User && d.featureType == Feature_Type_RT_Cnt { d.userRTCntFeatureFetch(user, context) } else if d.featureStore == Feature_Store_User { d.userFeatureFetch(user, context) // empty } else { d.itemsFeatureFetch(items, context) // empty } } func (d *FeatureBeDao) userFeatureFetch(user *User, context *context.RecommendContext) { } func debugMapKeys[T any](tmpMap map[string]T, sep string) string { var keys []string for key := range tmpMap { keys = append(keys, key) } return strings.Join(keys, sep) } func debugMap[T any](tmpMap map[string]T, sep string, kv_sep string) string { var kvs []string for k, v := range tmpMap { kvs = append(kvs, fmt.Sprintf("%s%s%s", k, kv_sep, utils.ToString(v, ""))) } sort.Slice(kvs, func(i, j int) bool { return kvs[i] < kvs[j] }) return strings.Join(kvs, sep) } func debugArr[T any](tmpArr []T) string { var valStr []string for _, v := range tmpArr { valStr = append(valStr, utils.ToString(v, "")) } return strings.Join(valStr, ",") } func buildRTCntOutput(tmpMap map[string]userRtCnt, rtCntMaxKey int) string { var eventCntKVs []string if len(tmpMap) > rtCntMaxKey { // sort by timestamp var eventCnts []userRtCnt for _, val := range tmpMap { eventCnts = append(eventCnts, val) } sort.Slice(eventCnts, func(x, y int) bool { return eventCnts[x].timestamp > eventCnts[y].timestamp }) for k := 0; k < rtCntMaxKey; k++ { eventCntKVs = append(eventCntKVs, fmt.Sprintf("%s:%d", eventCnts[k].key, eventCnts[k].cnt)) } } else { for _, val := range tmpMap { eventCntKVs = append(eventCntKVs, fmt.Sprintf("%s:%d", val.key, val.cnt)) } } return strings.Join(eventCntKVs, "") } func (d *FeatureBeDao) userRTCntFeatureFetch(user *User, context *context.RecommendContext) { defer func() { if err := recover(); err != nil { stack := string(debug.Stack()) log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t"))) return } }() comms := strings.Split(d.featureKey, ":") if len(comms) < 2 { log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey)) return } // uid key := user.StringProperty(comms[1]) if key == "" { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1])) return } currTime := time.Now().Unix() readRequest := be.NewReadRequest(d.bizName, d.sequenceLength) readRequest.IsRawRequest = true params := make(map[string]string) params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent) params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength) params[fmt.Sprintf("%s_table", d.beRecallName)] = d.beRTTable readRequest.SetQueryParams(params) if context.Debug { uri := readRequest.BuildUri() log.Info(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI())) } readResponse, err := d.beClient.Read(*readRequest) if err != nil { log.Error(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\terror=be error(%v)", context.RecommendId, err.Error())) return } matchItems := readResponse.Result.MatchItems if matchItems == nil || len(matchItems.FieldValues) == 0 { if context.Debug { log.Warning(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\twarning=NoMatchedItems", context.RecommendId, key, d.sequenceEvent)) } return } var cntMaps []map[string]userRtCnt var homeCntMaps []map[string]userRtCnt for i := 0; i < len(d.beRTCntFields); i++ { for j := 0; j < len(d.rtCntWins); j++ { cntMaps = append(cntMaps, make(map[string]userRtCnt)) homeCntMaps = append(homeCntMaps, make(map[string]userRtCnt)) } } if context.Debug { log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tcurrTime=%d\tlen(matchItems.FieldValues)=%drtCntWins=%s", context.RecommendId, key, d.sequenceEvent, currTime, len(matchItems.FieldValues), debugArr[int](d.rtCntWins))) } for _, values := range matchItems.FieldValues { valMap := make(map[string]string) var itemId string ts := int64(-1) if context.Debug { log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tmatchItems.FieldValues: len(values)=%d", context.RecommendId, key, d.sequenceEvent, len(values))) } for i, value := range values { valMap[matchItems.FieldNames[i]] = utils.ToString(value, "") if matchItems.FieldNames[i] == d.beItemFeatureKeyName { itemId = utils.ToString(value, "") } else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName { ts = utils.ToInt64(value, -1) } } if context.Debug { log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tvalMap=%s", context.RecommendId, key, d.sequenceEvent, debugMap[string](valMap, ";", ":"))) } if len(itemId) == 0 { log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(itemId): %s", context.RecommendId, key, d.sequenceEvent, itemId)) continue } if ts <= 0 { log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(ts): %d", context.RecommendId, key, d.sequenceEvent, ts)) continue } for i := 0; i < len(d.beRTCntFields); i++ { tmpCntField := d.beRTCntFields[i] var vals []string // for fields concatenated by multiple sub fields // such as rcc from root_category_id and child_category_id if len(tmpCntField.fieldNames) > 1 { for tId, fName := range tmpCntField.fieldNames { tVal, exist := valMap[fName] if !exist { log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s", fName, debugMapKeys[string](valMap, ","))) continue } else { var currToks []string tmpDelim := tmpCntField.fieldDelims[tId] if len(tmpDelim) > 0 && len(tVal) > 0 { currToks = strings.Split(tVal, tmpDelim) } else { currToks = append(currToks, tVal) } if tId != 0 { // append more fields var tmpVals []string for _, prev := range vals { for _, curr := range currToks { tmpVals = append(tmpVals, prev+"_"+curr) } } vals = tmpVals } else { // the first one vals = currToks } } } } else { // for single field tVal, exist := valMap[tmpCntField.fieldNames[0]] if !exist { log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s", tmpCntField.fieldNames[0], debugMapKeys[string](valMap, ","))) continue } else { tmpDelim := tmpCntField.fieldDelims[0] if len(tmpDelim) > 0 { vals = strings.Split(tVal, tmpDelim) } else { vals = append(vals, tVal) } } } isHome := utils.ToInt(valMap[d.beIsHomeField], 0) mapId := i * len(d.rtCntWins) for j := 0; j < len(d.rtCntWins); j++ { if ts >= (currTime-d.rtCntWinDelay) || ts < (currTime-d.rtCntWinDelay-int64(d.rtCntWins[j])) { continue } for _, val := range vals { if len(val) == 0 { // ignore empty value continue } tmpCnt, exist := cntMaps[mapId+j][val] if !exist { tmpCnt = userRtCnt{val, 1, ts} } else { tmpCnt.cnt += 1 if ts > tmpCnt.timestamp { tmpCnt.timestamp = ts } } cntMaps[mapId+j][val] = tmpCnt if isHome > 0 { tmpCnt, exist := homeCntMaps[mapId+j][val] if !exist { tmpCnt = userRtCnt{val, 1, ts} } else { tmpCnt.cnt += 1 if ts > tmpCnt.timestamp { tmpCnt.timestamp = ts } } homeCntMaps[mapId+j][val] = tmpCnt } } } } } properties := make(map[string]interface{}) for i := 0; i < len(d.beRTCntFields); i++ { mapId := i * len(d.rtCntWins) for j := 0; j < len(d.rtCntWins); j++ { // build kv string tmpMap := cntMaps[mapId+j] if len(tmpMap) > 0 { tmpRTFeaStr := buildRTCntOutput(tmpMap, d.rtCntMaxKey) // user__style_id_rt_clk_30m feaOutName := fmt.Sprintf(d.outRTCntFeaPattern, d.outRTCntFieldAlias[i], d.outEventName, d.outRTCntWinNames[j]) //user.AddProperty(feaOutName, tmpRTFeaStr) properties[feaOutName] = tmpRTFeaStr if context.Debug { log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_fea:%s=%s", context.RecommendId, key, d.sequenceEvent, feaOutName, tmpRTFeaStr)) } } // build home kv string tmpHomeMap := homeCntMaps[mapId+j] if len(tmpHomeMap) > 0 { tmpRTHomeFeaStr := buildRTCntOutput(tmpHomeMap, d.rtCntMaxKey) // user__style_id_rt_home_clk_30m homeFeaOutName := fmt.Sprintf(d.outHomeRTCntFeaPattern, d.outRTCntFieldAlias[i], d.outEventName, d.outRTCntWinNames[j]) //user.AddProperty(homeFeaOutName, tmpRTHomeFeaStr) properties[homeFeaOutName] = tmpRTHomeFeaStr if context.Debug { log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_home_fea:%s=%s", context.RecommendId, key, d.sequenceEvent, homeFeaOutName, tmpRTHomeFeaStr)) } } } } if d.cacheFeaturesName != "" { user.AddCacheFeatures(d.cacheFeaturesName, properties) } else { user.AddProperties(properties) } } func (d *FeatureBeDao) userSequenceFeatureFetch(user *User, context *context.RecommendContext) { defer func() { if err := recover(); err != nil { stack := string(debug.Stack()) log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t"))) return } }() comms := strings.Split(d.featureKey, ":") if len(comms) < 2 { log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey)) return } key := user.StringProperty(comms[1]) if key == "" { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1])) return } currTime := time.Now().Unix() var ( item_feature_key_name = "item_id" event_feature_key_name = "event" play_time_feature_key_name = "play_time" timestamp_feature_key_name = "timestamp" ts_feature_key_name = "ts" onlineSequences []*sequenceInfo ) if d.itemFeatureKeyName != "" { item_feature_key_name = d.itemFeatureKeyName } if d.eventFeatureKeyName != "" { event_feature_key_name = d.eventFeatureKeyName } if d.playTimeFeatureKeyName != "" { play_time_feature_key_name = d.playTimeFeatureKeyName } if d.timestampFeatureKeyName != "" { timestamp_feature_key_name = d.timestampFeatureKeyName } if d.tsFeatureKeyName != "" { ts_feature_key_name = d.tsFeatureKeyName } readRequest := be.NewReadRequest(d.bizName, d.sequenceLength) readRequest.IsRawRequest = true params := make(map[string]string) params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent) params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength) readRequest.SetQueryParams(params) if context.Debug { uri := readRequest.BuildUri() log.Info(fmt.Sprintf("requestId=%s\tevent=userSequenceFeatureFetch\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI())) } readResponse, err := d.beClient.Read(*readRequest) if err != nil { log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=be error(%v)", context.RecommendId, err)) return } matchItems := readResponse.Result.MatchItems if matchItems == nil || len(matchItems.FieldValues) == 0 { return } for _, values := range matchItems.FieldValues { seq := new(sequenceInfo) for i, value := range values { if matchItems.FieldNames[i] == d.beItemFeatureKeyName { seq.itemId = utils.ToString(value, "") } else if matchItems.FieldNames[i] == d.beEventFeatureKeyName { seq.event = utils.ToString(value, "") } else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName { seq.timestamp = utils.ToInt64(value, 0) } else if matchItems.FieldNames[i] == d.bePlayTimeFeatureKeyName { seq.playTime = utils.ToFloat(value, 0) } } if seq.itemId != "" && seq.event != "" { onlineSequences = append(onlineSequences, seq) } } // seqeunce feature correspond to easyrec processor sequencesValueMap := make(map[string][]string) sequenceMap := make(map[string]bool, len(onlineSequences)) for _, seq := range onlineSequences { key := fmt.Sprintf("%s#%s", seq.itemId, seq.event) if _, exist := sequenceMap[key]; !exist { sequenceMap[key] = true sequencesValueMap[item_feature_key_name] = append(sequencesValueMap[item_feature_key_name], seq.itemId) sequencesValueMap[timestamp_feature_key_name] = append(sequencesValueMap[timestamp_feature_key_name], fmt.Sprintf("%d", seq.timestamp)) sequencesValueMap[event_feature_key_name] = append(sequencesValueMap[event_feature_key_name], seq.event) if d.hasPlayTimeField { sequencesValueMap[play_time_feature_key_name] = append(sequencesValueMap[play_time_feature_key_name], fmt.Sprintf("%.2f", seq.playTime)) } sequencesValueMap[ts_feature_key_name] = append(sequencesValueMap[ts_feature_key_name], fmt.Sprintf("%d", currTime-seq.timestamp)) for index, field := range seq.dimensionFields { if field.Valid { sequencesValueMap[d.sequenceDimFields[index]] = append(sequencesValueMap[d.sequenceDimFields[index]], field.String) } } } } delim := d.sequenceDelim if delim == "" { delim = ";" } properties := make(map[string]interface{}) for key, value := range sequencesValueMap { curSequenceSubName := (d.sequenceName + "__" + key) properties[curSequenceSubName] = strings.Join(value, delim) } properties[d.sequenceName] = strings.Join(sequencesValueMap[item_feature_key_name], delim) if d.cacheFeaturesName != "" { user.AddCacheFeatures(d.cacheFeaturesName, properties) } else { user.AddProperties(properties) } } func (d *FeatureBeDao) itemsFeatureFetch(items []*Item, context *context.RecommendContext) { }