func FlowControl()

in sort/traffic_control_sort.go [538:682]


func FlowControl(controllerMap map[string]*PIDController, ctx *context.RecommendContext) (map[string]float64, int) {
	// 获取(granularity="Global")类型的调控目标 当前已累计完成的流量
	targetOutput := make(map[string]float64)

	var scene string
	var good bool
	s := ctx.GetParameter("scene")
	if scene, good = s.(string); !good {
		ctx.LogError("failed to get scene name")
		return targetOutput, 0
	}
	// 获取流量实时统计值
	runEnv := os.Getenv("PAIREC_ENVIRONMENT")
	expId := ctx.ExperimentResult.GetExpId()
	traffics := experimentClient.GetTrafficControlTargetTraffic(runEnv, scene, expId, "ER_ALL")
	if ctx.Debug {
		data, _ := json.Marshal(traffics)
		ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tflow control\texpId:%s\ttraffics:%s", expId, string(data)))
	}
	allTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
	expTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
	for _, traffic := range traffics {
		if traffic.ItemOrExpId == "ER_ALL" {
			allTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
		} else {
			expTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
		}
	}
	hasTraffic := false
	retCh := make(chan struct {
		string
		float64
	}, utils.MinInt(len(controllerMap), 64))
	defer close(retCh)

	gCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Millisecond*12)
	defer cancel()
	for targetId, controller := range controllerMap {
		go func(gCtx gocontext.Context, targetId string, controller *PIDController) {
			defer func() {
				if err := recover(); err != nil {
					log.Warning(fmt.Sprintf("traffic control timeout in background: <taskId:%s/targetId:%s>[targetName:%s]",
						controller.task.TrafficControlTaskId, targetId, controller.target.Name))
				}
			}()
			taskId := controller.task.TrafficControlTaskId
			var targetTraffic, taskTraffic, output, setValue float64
			var binId = ""
			var trafficMap map[string]experiments.TrafficControlTargetTraffic
			var measureTime time.Time
			if controller.task.ControlType == constants.TrafficControlTaskControlTypePercent {
				if controller.IsAllocateExpWise() {
					trafficMap = expTargetTrafficMap
					binId = expId
				} else {
					trafficMap = allTargetTrafficMap
				}
				if input, ok := trafficMap[targetId]; ok {
					targetTraffic = input.TargetTraffic
					taskTraffic = input.TaskTraffic
					measureTime = input.RecordTime
				} else {
					targetTraffic = float64(0)
					taskTraffic = float64(1)
					measureTime = time.Now().Truncate(time.Second)
				}
				if controller.IsAllocateExpWise() && targetTraffic < controller.GetMinExpTraffic() {
					// 用全局流量代替冷启动的实验流量
					ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]"+
						"\texp=%s\ttargetTraffic=%.0f change to global targetTraffic",
						taskId, targetId, controller.target.Name, expId, targetTraffic))
					binId = ""
					if input, ok := allTargetTrafficMap[targetId]; ok {
						targetTraffic = input.TargetTraffic
						taskTraffic = input.TaskTraffic
					} else {
						targetTraffic = float64(0)
						taskTraffic = float64(1)
					}
				}

				trafficPercentage := targetTraffic / taskTraffic
				controller.SetMeasurement(binId, trafficPercentage, measureTime)
				output, setValue = controller.Compute(binId, ctx)
				ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
					"traffic=%.0f, percentage=%f, setValue=%f, output=%f, exp=%s", taskId, targetId,
					controller.target.Name, targetTraffic, trafficPercentage, setValue/100, output, binId))
				if targetTraffic > 0 {
					hasTraffic = true
				}
			} else {
				if input, ok := allTargetTrafficMap[targetId]; ok {
					targetTraffic = input.TargetTraffic
					measureTime = input.RecordTime
				} else {
					targetTraffic = float64(0)
					measureTime = time.Now().Truncate(time.Second)
				}
				controller.SetMeasurement("", targetTraffic, measureTime)
				output, setValue = controller.Compute("", ctx)
				ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
					"traffic=%.0f, setValue=%f, output=%f",
					taskId, targetId, controller.target.Name, targetTraffic, setValue, output))
				if targetTraffic > 0 {
					hasTraffic = true
				}
			}
			select {
			case <-gCtx.Done():
				ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\ttimeout in goruntine: <taskId:%s/targetId:%s>[targetName:%s]",
					taskId, targetId, controller.target.Name))
				return
			case retCh <- struct {
				string
				float64
			}{targetId, output}:
			}
		}(gCtx, targetId, controller)
	}
	cnt := 0
Loop:
	for range controllerMap {
		select {
		case pair := <-retCh:
			output := pair.float64
			if output != 0 {
				cnt++
				targetOutput[pair.string] = output
			}
		case <-gCtx.Done():
			if errors.Is(gCtx.Err(), gocontext.DeadlineExceeded) {
				ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\ttraffic controller timeout: %v", gCtx.Err()))
			}
			break Loop
		}
	}
	if !hasTraffic {
		ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\tno traffic data detected, maybe flink job is not running"))
		for k := range targetOutput {
			delete(targetOutput, k)
		}
		cnt = 0
	}
	return targetOutput, cnt
}