in sort/traffic_control_sort.go [380:535]
func macroControl(controllerMap map[string]*PIDController, items []*module.Item, ctx *context.RecommendContext, wgCtrl *sync.WaitGroup) {
defer wgCtrl.Done()
begin := time.Now()
var targetOutput map[string]float64
var count int
targetOutput, count = FlowControl(controllerMap, ctx)
if len(targetOutput) == 0 || count == 0 {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttraffic control task output is zero"))
return
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget out put: %v", targetOutput))
begin = time.Now()
itemScores := make([]float64, len(items))
// 计算各个目标的偏好分的全局占比
totalScore := 0.0
maxScore := 0.0 // item 列表中的最大分
targetScore := make(map[string]float64)
for i, item := range items {
score := item.Score
if score == 0.0 {
score = 1e-8
}
if i == 0 {
maxScore = score
itemScores[0] = math.E
} else {
v := score / maxScore // 归一化 rank score
idx := int(v * 1000)
if idx < 0 {
idx = 0
}
if idx >= 1000 {
idx = 999
}
itemScores[i] = expTable[idx]
}
posWeight := 0.006737946999
if i < 500 {
posWeight = positionWeight[i]
}
score *= posWeight
totalScore += score
for targetId, controller := range controllerMap {
if alpha, ok := targetOutput[targetId]; ok && alpha != 0 && controller.IsControlledItem(item) {
targetScore[targetId] += score
}
}
}
for targetId, score := range targetScore {
targetScore[targetId] = score / totalScore
}
params := ctx.ExperimentResult.GetExperimentParams()
maxUpliftTargetCnt := params.GetInt("pid_max_uplift_target_cnt", len(controllerMap))
if maxUpliftTargetCnt < len(controllerMap) {
// 按照偏好分采样 `maxUpliftTargetCnt` 个需要上提的目标,未被选中的上提目标调控力度置为0
SampleControlTargetsByScore(maxUpliftTargetCnt, targetScore, targetOutput, ctx)
}
pidGamma := params.GetFloat("pid_gamma", 1.0)
pidBeta := params.GetFloat("pid_beta", 1.0)
// preprocess, adjust control signal
for targetId, alpha := range targetOutput {
if alpha > 0 { // uplift
scoreWeight := targetScore[targetId]
rho := 1.0 + pidGamma*tanh(pidBeta*scoreWeight) // 给更感兴趣的目标更大的提权,用来区分不同的调控目标
alpha *= rho
targetOutput[targetId] = alpha
}
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget output: %v", targetOutput))
// compute delta rank
begin = time.Now()
pageNo := utils.ToInt(ctx.GetParameter("pageNum"), 1)
if pageNo < 1 {
pageNo = 1
}
keepCtrlIdScore := params.GetFloat("pid_keep_id_target_score_weight", 1.0)
if keepCtrlIdScore < 0.3 {
keepCtrlIdScore = 0.3
}
ctrlParams := &controlParams{
targetScore: targetScore,
itemScores: itemScores,
eta: params.GetFloat("pid_eta", 1.6),
pageNo: pageNo,
keepCtrlIdScore: keepCtrlIdScore,
newCtrlIdThreshold: params.GetFloat("pid_new_id_target_threshold", 1.0),
needNewCtrlId: make(map[string]bool),
}
targetControlledNum := make(map[string]int, len(controllerMap))
mu := sync.Mutex{}
for targetId := range controllerMap {
targetControlledNum[targetId] = len(items)
newCtrlId := utils.GetExperimentParamByPath(params, fmt.Sprintf("pid_params.%s.new_ctrl_id", targetId), false)
ctrlParams.needNewCtrlId[targetId] = newCtrlId.(bool)
}
// compute delta rank
parallel := params.GetInt("pid_parallel", 10)
ch := make(chan int, parallel)
defer close(ch)
var wg sync.WaitGroup
batchSize := len(items) / parallel
if len(items)%parallel != 0 {
batchSize++
}
if batchSize < 1 {
batchSize = 1
}
for b, e := 0, batchSize; b < len(items); b, e = e, e+batchSize {
var candidates []*module.Item
if e < len(items) {
candidates = items[b:e]
} else {
candidates = items[b:]
}
ch <- b
wg.Add(1)
go func(b int, items []*module.Item) {
defer wg.Done()
for idx, item := range items {
i := b + idx
finalDeltaRank := 0.0
for targetId, controller := range controllerMap {
if !controller.IsControlledItem(item) {
if ctx.Debug {
mu.Lock()
targetControlledNum[targetId] = targetControlledNum[targetId] - 1
mu.Unlock()
}
continue
}
if alpha, ok := targetOutput[targetId]; ok && alpha != 0 {
deltaRank := computeDeltaRank(controller, item, i, alpha, ctrlParams, ctx)
finalDeltaRank += deltaRank // 形成合力
}
}
if finalDeltaRank != 0.0 {
item.IncrAlgoScore("__delta_rank__", finalDeltaRank)
controlId, _ := item.IntProperty("__traffic_control_id__")
if controlId == 0 && finalDeltaRank < 1.0 {
item.AddProperty("__traffic_control_id__", item.GetProperty("_ORIGIN_POSITION_"))
}
}
}
<-ch
}(b, candidates)
}
wg.Wait()
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tmacro control\tcount=%d\tcost=%d\tcontrolled target=%v",
len(items), utils.CostTime(begin), targetControlledNum))
}