in module/feature_be_dao.go [266:500]
func (d *FeatureBeDao) userRTCntFeatureFetch(user *User, context *context.RecommendContext) {
defer func() {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
return
}
}()
comms := strings.Split(d.featureKey, ":")
if len(comms) < 2 {
log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
return
}
// uid
key := user.StringProperty(comms[1])
if key == "" {
log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1]))
return
}
currTime := time.Now().Unix()
readRequest := be.NewReadRequest(d.bizName, d.sequenceLength)
readRequest.IsRawRequest = true
params := make(map[string]string)
params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent)
params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength)
params[fmt.Sprintf("%s_table", d.beRecallName)] = d.beRTTable
readRequest.SetQueryParams(params)
if context.Debug {
uri := readRequest.BuildUri()
log.Info(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI()))
}
readResponse, err := d.beClient.Read(*readRequest)
if err != nil {
log.Error(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\terror=be error(%v)", context.RecommendId, err.Error()))
return
}
matchItems := readResponse.Result.MatchItems
if matchItems == nil || len(matchItems.FieldValues) == 0 {
if context.Debug {
log.Warning(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\twarning=NoMatchedItems",
context.RecommendId, key, d.sequenceEvent))
}
return
}
var cntMaps []map[string]userRtCnt
var homeCntMaps []map[string]userRtCnt
for i := 0; i < len(d.beRTCntFields); i++ {
for j := 0; j < len(d.rtCntWins); j++ {
cntMaps = append(cntMaps, make(map[string]userRtCnt))
homeCntMaps = append(homeCntMaps, make(map[string]userRtCnt))
}
}
if context.Debug {
log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tcurrTime=%d\tlen(matchItems.FieldValues)=%drtCntWins=%s",
context.RecommendId, key, d.sequenceEvent, currTime, len(matchItems.FieldValues),
debugArr[int](d.rtCntWins)))
}
for _, values := range matchItems.FieldValues {
valMap := make(map[string]string)
var itemId string
ts := int64(-1)
if context.Debug {
log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tmatchItems.FieldValues: len(values)=%d",
context.RecommendId, key, d.sequenceEvent, len(values)))
}
for i, value := range values {
valMap[matchItems.FieldNames[i]] = utils.ToString(value, "")
if matchItems.FieldNames[i] == d.beItemFeatureKeyName {
itemId = utils.ToString(value, "")
} else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName {
ts = utils.ToInt64(value, -1)
}
}
if context.Debug {
log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tvalMap=%s",
context.RecommendId, key, d.sequenceEvent, debugMap[string](valMap, ";", ":")))
}
if len(itemId) == 0 {
log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(itemId): %s",
context.RecommendId, key, d.sequenceEvent, itemId))
continue
}
if ts <= 0 {
log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(ts): %d",
context.RecommendId, key, d.sequenceEvent, ts))
continue
}
for i := 0; i < len(d.beRTCntFields); i++ {
tmpCntField := d.beRTCntFields[i]
var vals []string
// for fields concatenated by multiple sub fields
// such as rcc from root_category_id and child_category_id
if len(tmpCntField.fieldNames) > 1 {
for tId, fName := range tmpCntField.fieldNames {
tVal, exist := valMap[fName]
if !exist {
log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s",
fName, debugMapKeys[string](valMap, ",")))
continue
} else {
var currToks []string
tmpDelim := tmpCntField.fieldDelims[tId]
if len(tmpDelim) > 0 && len(tVal) > 0 {
currToks = strings.Split(tVal, tmpDelim)
} else {
currToks = append(currToks, tVal)
}
if tId != 0 { // append more fields
var tmpVals []string
for _, prev := range vals {
for _, curr := range currToks {
tmpVals = append(tmpVals, prev+"_"+curr)
}
}
vals = tmpVals
} else { // the first one
vals = currToks
}
}
}
} else { // for single field
tVal, exist := valMap[tmpCntField.fieldNames[0]]
if !exist {
log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s",
tmpCntField.fieldNames[0], debugMapKeys[string](valMap, ",")))
continue
} else {
tmpDelim := tmpCntField.fieldDelims[0]
if len(tmpDelim) > 0 {
vals = strings.Split(tVal, tmpDelim)
} else {
vals = append(vals, tVal)
}
}
}
isHome := utils.ToInt(valMap[d.beIsHomeField], 0)
mapId := i * len(d.rtCntWins)
for j := 0; j < len(d.rtCntWins); j++ {
if ts >= (currTime-d.rtCntWinDelay) || ts < (currTime-d.rtCntWinDelay-int64(d.rtCntWins[j])) {
continue
}
for _, val := range vals {
if len(val) == 0 {
// ignore empty value
continue
}
tmpCnt, exist := cntMaps[mapId+j][val]
if !exist {
tmpCnt = userRtCnt{val, 1, ts}
} else {
tmpCnt.cnt += 1
if ts > tmpCnt.timestamp {
tmpCnt.timestamp = ts
}
}
cntMaps[mapId+j][val] = tmpCnt
if isHome > 0 {
tmpCnt, exist := homeCntMaps[mapId+j][val]
if !exist {
tmpCnt = userRtCnt{val, 1, ts}
} else {
tmpCnt.cnt += 1
if ts > tmpCnt.timestamp {
tmpCnt.timestamp = ts
}
}
homeCntMaps[mapId+j][val] = tmpCnt
}
}
}
}
}
properties := make(map[string]interface{})
for i := 0; i < len(d.beRTCntFields); i++ {
mapId := i * len(d.rtCntWins)
for j := 0; j < len(d.rtCntWins); j++ {
// build kv string
tmpMap := cntMaps[mapId+j]
if len(tmpMap) > 0 {
tmpRTFeaStr := buildRTCntOutput(tmpMap, d.rtCntMaxKey)
// user__style_id_rt_clk_30m
feaOutName := fmt.Sprintf(d.outRTCntFeaPattern,
d.outRTCntFieldAlias[i],
d.outEventName, d.outRTCntWinNames[j])
//user.AddProperty(feaOutName, tmpRTFeaStr)
properties[feaOutName] = tmpRTFeaStr
if context.Debug {
log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_fea:%s=%s",
context.RecommendId, key, d.sequenceEvent, feaOutName, tmpRTFeaStr))
}
}
// build home kv string
tmpHomeMap := homeCntMaps[mapId+j]
if len(tmpHomeMap) > 0 {
tmpRTHomeFeaStr := buildRTCntOutput(tmpHomeMap, d.rtCntMaxKey)
// user__style_id_rt_home_clk_30m
homeFeaOutName := fmt.Sprintf(d.outHomeRTCntFeaPattern,
d.outRTCntFieldAlias[i], d.outEventName, d.outRTCntWinNames[j])
//user.AddProperty(homeFeaOutName, tmpRTHomeFeaStr)
properties[homeFeaOutName] = tmpRTHomeFeaStr
if context.Debug {
log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_home_fea:%s=%s",
context.RecommendId, key, d.sequenceEvent, homeFeaOutName, tmpRTHomeFeaStr))
}
}
}
}
if d.cacheFeaturesName != "" {
user.AddCacheFeatures(d.cacheFeaturesName, properties)
} else {
user.AddProperties(properties)
}
}