func()

in module/feature_be_dao.go [266:500]


func (d *FeatureBeDao) userRTCntFeatureFetch(user *User, context *context.RecommendContext) {
	defer func() {
		if err := recover(); err != nil {
			stack := string(debug.Stack())
			log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=%v\tstack=%v", context.RecommendId, err, strings.ReplaceAll(stack, "\n", "\t")))
			return
		}
	}()

	comms := strings.Split(d.featureKey, ":")
	if len(comms) < 2 {
		log.Error(fmt.Sprintf("requestId=%s\tuid=%s\terror=featureKey error(%s)", context.RecommendId, user.Id, d.featureKey))
		return
	}

	// uid
	key := user.StringProperty(comms[1])
	if key == "" {
		log.Error(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\terror=property not found(%s)", context.RecommendId, comms[1]))
		return
	}

	currTime := time.Now().Unix()

	readRequest := be.NewReadRequest(d.bizName, d.sequenceLength)
	readRequest.IsRawRequest = true

	params := make(map[string]string)
	params[fmt.Sprintf("%s_list", d.beRecallName)] = fmt.Sprintf("%s_%s:1", key, d.sequenceEvent)
	params[fmt.Sprintf("%s_return_count", d.beRecallName)] = fmt.Sprintf("%d", d.sequenceLength)
	params[fmt.Sprintf("%s_table", d.beRecallName)] = d.beRTTable

	readRequest.SetQueryParams(params)

	if context.Debug {
		uri := readRequest.BuildUri()
		log.Info(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\tbizName=%s\turl=%s", context.RecommendId, d.bizName, uri.RequestURI()))
	}

	readResponse, err := d.beClient.Read(*readRequest)
	if err != nil {
		log.Error(fmt.Sprintf("module=FeatureBeDao\tfunc=userRTCntFeatureFetch\trequestId=%s\terror=be error(%v)", context.RecommendId, err.Error()))
		return
	}

	matchItems := readResponse.Result.MatchItems
	if matchItems == nil || len(matchItems.FieldValues) == 0 {
		if context.Debug {
			log.Warning(fmt.Sprintf("requestId=%s\tmodule=FeatureBeDao\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\twarning=NoMatchedItems",
				context.RecommendId, key, d.sequenceEvent))
		}
		return
	}

	var cntMaps []map[string]userRtCnt
	var homeCntMaps []map[string]userRtCnt
	for i := 0; i < len(d.beRTCntFields); i++ {
		for j := 0; j < len(d.rtCntWins); j++ {
			cntMaps = append(cntMaps, make(map[string]userRtCnt))
			homeCntMaps = append(homeCntMaps, make(map[string]userRtCnt))
		}
	}

	if context.Debug {
		log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tcurrTime=%d\tlen(matchItems.FieldValues)=%drtCntWins=%s",
			context.RecommendId, key, d.sequenceEvent, currTime, len(matchItems.FieldValues),
			debugArr[int](d.rtCntWins)))
	}
	for _, values := range matchItems.FieldValues {
		valMap := make(map[string]string)
		var itemId string
		ts := int64(-1)
		if context.Debug {
			log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tmatchItems.FieldValues: len(values)=%d",
				context.RecommendId, key, d.sequenceEvent, len(values)))
		}
		for i, value := range values {
			valMap[matchItems.FieldNames[i]] = utils.ToString(value, "")
			if matchItems.FieldNames[i] == d.beItemFeatureKeyName {
				itemId = utils.ToString(value, "")
			} else if matchItems.FieldNames[i] == d.beTimestampFeatureKeyName {
				ts = utils.ToInt64(value, -1)
			}
		}

		if context.Debug {
			log.Info(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tvalMap=%s",
				context.RecommendId, key, d.sequenceEvent, debugMap[string](valMap, ";", ":")))
		}

		if len(itemId) == 0 {
			log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(itemId): %s",
				context.RecommendId, key, d.sequenceEvent, itemId))
			continue
		}

		if ts <= 0 {
			log.Error(fmt.Sprintf("requestId=%s\tfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tinvalid(ts): %d",
				context.RecommendId, key, d.sequenceEvent, ts))
			continue
		}

		for i := 0; i < len(d.beRTCntFields); i++ {
			tmpCntField := d.beRTCntFields[i]
			var vals []string
			// for fields concatenated by multiple sub fields
			// such as rcc from root_category_id and child_category_id
			if len(tmpCntField.fieldNames) > 1 {
				for tId, fName := range tmpCntField.fieldNames {
					tVal, exist := valMap[fName]
					if !exist {
						log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s",
							fName, debugMapKeys[string](valMap, ",")))
						continue
					} else {
						var currToks []string
						tmpDelim := tmpCntField.fieldDelims[tId]
						if len(tmpDelim) > 0 && len(tVal) > 0 {
							currToks = strings.Split(tVal, tmpDelim)
						} else {
							currToks = append(currToks, tVal)
						}
						if tId != 0 { // append more fields
							var tmpVals []string
							for _, prev := range vals {
								for _, curr := range currToks {
									tmpVals = append(tmpVals, prev+"_"+curr)
								}
							}
							vals = tmpVals
						} else { // the first one
							vals = currToks
						}
					}
				}
			} else { // for single field
				tVal, exist := valMap[tmpCntField.fieldNames[0]]
				if !exist {
					log.Error(fmt.Sprintf("field key(%s) does not exist, available keys: %s",
						tmpCntField.fieldNames[0], debugMapKeys[string](valMap, ",")))
					continue
				} else {
					tmpDelim := tmpCntField.fieldDelims[0]
					if len(tmpDelim) > 0 {
						vals = strings.Split(tVal, tmpDelim)
					} else {
						vals = append(vals, tVal)
					}
				}
			}

			isHome := utils.ToInt(valMap[d.beIsHomeField], 0)
			mapId := i * len(d.rtCntWins)
			for j := 0; j < len(d.rtCntWins); j++ {
				if ts >= (currTime-d.rtCntWinDelay) || ts < (currTime-d.rtCntWinDelay-int64(d.rtCntWins[j])) {
					continue
				}

				for _, val := range vals {
					if len(val) == 0 {
						// ignore empty value
						continue
					}
					tmpCnt, exist := cntMaps[mapId+j][val]
					if !exist {
						tmpCnt = userRtCnt{val, 1, ts}
					} else {
						tmpCnt.cnt += 1
						if ts > tmpCnt.timestamp {
							tmpCnt.timestamp = ts
						}
					}
					cntMaps[mapId+j][val] = tmpCnt

					if isHome > 0 {
						tmpCnt, exist := homeCntMaps[mapId+j][val]
						if !exist {
							tmpCnt = userRtCnt{val, 1, ts}
						} else {
							tmpCnt.cnt += 1
							if ts > tmpCnt.timestamp {
								tmpCnt.timestamp = ts
							}
						}
						homeCntMaps[mapId+j][val] = tmpCnt
					}
				}
			}
		}
	}

	properties := make(map[string]interface{})
	for i := 0; i < len(d.beRTCntFields); i++ {
		mapId := i * len(d.rtCntWins)
		for j := 0; j < len(d.rtCntWins); j++ {
			// build kv string
			tmpMap := cntMaps[mapId+j]
			if len(tmpMap) > 0 {
				tmpRTFeaStr := buildRTCntOutput(tmpMap, d.rtCntMaxKey)
				// user__style_id_rt_clk_30m
				feaOutName := fmt.Sprintf(d.outRTCntFeaPattern,
					d.outRTCntFieldAlias[i],
					d.outEventName, d.outRTCntWinNames[j])
				//user.AddProperty(feaOutName, tmpRTFeaStr)
				properties[feaOutName] = tmpRTFeaStr
				if context.Debug {
					log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_fea:%s=%s",
						context.RecommendId, key, d.sequenceEvent, feaOutName, tmpRTFeaStr))
				}
			}

			// build home kv string
			tmpHomeMap := homeCntMaps[mapId+j]
			if len(tmpHomeMap) > 0 {
				tmpRTHomeFeaStr := buildRTCntOutput(tmpHomeMap, d.rtCntMaxKey)
				// user__style_id_rt_home_clk_30m
				homeFeaOutName := fmt.Sprintf(d.outHomeRTCntFeaPattern,
					d.outRTCntFieldAlias[i], d.outEventName, d.outRTCntWinNames[j])
				//user.AddProperty(homeFeaOutName, tmpRTHomeFeaStr)
				properties[homeFeaOutName] = tmpRTHomeFeaStr
				if context.Debug {
					log.Info(fmt.Sprintf("requestId=%sfunc=userRTCntFeatureFetch\tkey=%s\tevent=%s\tadd_user_home_fea:%s=%s",
						context.RecommendId, key, d.sequenceEvent, homeFeaOutName, tmpRTHomeFeaStr))
				}
			}
		}
	}

	if d.cacheFeaturesName != "" {
		user.AddCacheFeatures(d.cacheFeaturesName, properties)
	} else {
		user.AddProperties(properties)
	}

}