func macroControl()

in sort/traffic_control_sort.go [380:535]


func macroControl(controllerMap map[string]*PIDController, items []*module.Item, ctx *context.RecommendContext, wgCtrl *sync.WaitGroup) {
	defer wgCtrl.Done()
	begin := time.Now()
	var targetOutput map[string]float64
	var count int
	targetOutput, count = FlowControl(controllerMap, ctx)
	if len(targetOutput) == 0 || count == 0 {
		ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttraffic control task output is zero"))
		return
	}
	ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget out put: %v", targetOutput))
	begin = time.Now()
	itemScores := make([]float64, len(items))
	// 计算各个目标的偏好分的全局占比
	totalScore := 0.0
	maxScore := 0.0 // item 列表中的最大分
	targetScore := make(map[string]float64)
	for i, item := range items {
		score := item.Score
		if score == 0.0 {
			score = 1e-8
		}
		if i == 0 {
			maxScore = score
			itemScores[0] = math.E
		} else {
			v := score / maxScore // 归一化 rank score
			idx := int(v * 1000)
			if idx < 0 {
				idx = 0
			}
			if idx >= 1000 {
				idx = 999
			}
			itemScores[i] = expTable[idx]
		}
		posWeight := 0.006737946999
		if i < 500 {
			posWeight = positionWeight[i]
		}
		score *= posWeight
		totalScore += score
		for targetId, controller := range controllerMap {
			if alpha, ok := targetOutput[targetId]; ok && alpha != 0 && controller.IsControlledItem(item) {
				targetScore[targetId] += score
			}
		}
	}
	for targetId, score := range targetScore {
		targetScore[targetId] = score / totalScore
	}

	params := ctx.ExperimentResult.GetExperimentParams()
	maxUpliftTargetCnt := params.GetInt("pid_max_uplift_target_cnt", len(controllerMap))
	if maxUpliftTargetCnt < len(controllerMap) {
		// 按照偏好分采样 `maxUpliftTargetCnt` 个需要上提的目标,未被选中的上提目标调控力度置为0
		SampleControlTargetsByScore(maxUpliftTargetCnt, targetScore, targetOutput, ctx)
	}

	pidGamma := params.GetFloat("pid_gamma", 1.0)
	pidBeta := params.GetFloat("pid_beta", 1.0)
	// preprocess, adjust control signal
	for targetId, alpha := range targetOutput {
		if alpha > 0 { // uplift
			scoreWeight := targetScore[targetId]
			rho := 1.0 + pidGamma*tanh(pidBeta*scoreWeight) // 给更感兴趣的目标更大的提权,用来区分不同的调控目标
			alpha *= rho
			targetOutput[targetId] = alpha
		}
	}
	ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget output: %v", targetOutput))
	// compute delta rank
	begin = time.Now()
	pageNo := utils.ToInt(ctx.GetParameter("pageNum"), 1)
	if pageNo < 1 {
		pageNo = 1
	}
	keepCtrlIdScore := params.GetFloat("pid_keep_id_target_score_weight", 1.0)
	if keepCtrlIdScore < 0.3 {
		keepCtrlIdScore = 0.3
	}
	ctrlParams := &controlParams{
		targetScore:        targetScore,
		itemScores:         itemScores,
		eta:                params.GetFloat("pid_eta", 1.6),
		pageNo:             pageNo,
		keepCtrlIdScore:    keepCtrlIdScore,
		newCtrlIdThreshold: params.GetFloat("pid_new_id_target_threshold", 1.0),
		needNewCtrlId:      make(map[string]bool),
	}

	targetControlledNum := make(map[string]int, len(controllerMap))
	mu := sync.Mutex{}

	for targetId := range controllerMap {
		targetControlledNum[targetId] = len(items)
		newCtrlId := utils.GetExperimentParamByPath(params, fmt.Sprintf("pid_params.%s.new_ctrl_id", targetId), false)
		ctrlParams.needNewCtrlId[targetId] = newCtrlId.(bool)
	}

	// compute delta rank
	parallel := params.GetInt("pid_parallel", 10)
	ch := make(chan int, parallel)
	defer close(ch)
	var wg sync.WaitGroup
	batchSize := len(items) / parallel
	if len(items)%parallel != 0 {
		batchSize++
	}
	if batchSize < 1 {
		batchSize = 1
	}
	for b, e := 0, batchSize; b < len(items); b, e = e, e+batchSize {
		var candidates []*module.Item
		if e < len(items) {
			candidates = items[b:e]
		} else {
			candidates = items[b:]
		}
		ch <- b
		wg.Add(1)
		go func(b int, items []*module.Item) {
			defer wg.Done()
			for idx, item := range items {
				i := b + idx
				finalDeltaRank := 0.0
				for targetId, controller := range controllerMap {
					if !controller.IsControlledItem(item) {
						if ctx.Debug {
							mu.Lock()
							targetControlledNum[targetId] = targetControlledNum[targetId] - 1
							mu.Unlock()
						}
						continue
					}
					if alpha, ok := targetOutput[targetId]; ok && alpha != 0 {
						deltaRank := computeDeltaRank(controller, item, i, alpha, ctrlParams, ctx)
						finalDeltaRank += deltaRank // 形成合力
					}
				}

				if finalDeltaRank != 0.0 {
					item.IncrAlgoScore("__delta_rank__", finalDeltaRank)
					controlId, _ := item.IntProperty("__traffic_control_id__")
					if controlId == 0 && finalDeltaRank < 1.0 {
						item.AddProperty("__traffic_control_id__", item.GetProperty("_ORIGIN_POSITION_"))
					}
				}
			}
			<-ch
		}(b, candidates)
	}
	wg.Wait()
	ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tmacro control\tcount=%d\tcost=%d\tcontrolled target=%v",
		len(items), utils.CostTime(begin), targetControlledNum))
}