in sort/traffic_control_sort.go [538:682]
func FlowControl(controllerMap map[string]*PIDController, ctx *context.RecommendContext) (map[string]float64, int) {
// 获取(granularity="Global")类型的调控目标 当前已累计完成的流量
targetOutput := make(map[string]float64)
var scene string
var good bool
s := ctx.GetParameter("scene")
if scene, good = s.(string); !good {
ctx.LogError("failed to get scene name")
return targetOutput, 0
}
// 获取流量实时统计值
runEnv := os.Getenv("PAIREC_ENVIRONMENT")
expId := ctx.ExperimentResult.GetExpId()
traffics := experimentClient.GetTrafficControlTargetTraffic(runEnv, scene, expId, "ER_ALL")
if ctx.Debug {
data, _ := json.Marshal(traffics)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tflow control\texpId:%s\ttraffics:%s", expId, string(data)))
}
allTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
expTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
for _, traffic := range traffics {
if traffic.ItemOrExpId == "ER_ALL" {
allTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
} else {
expTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
}
}
hasTraffic := false
retCh := make(chan struct {
string
float64
}, utils.MinInt(len(controllerMap), 64))
defer close(retCh)
gCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Millisecond*12)
defer cancel()
for targetId, controller := range controllerMap {
go func(gCtx gocontext.Context, targetId string, controller *PIDController) {
defer func() {
if err := recover(); err != nil {
log.Warning(fmt.Sprintf("traffic control timeout in background: <taskId:%s/targetId:%s>[targetName:%s]",
controller.task.TrafficControlTaskId, targetId, controller.target.Name))
}
}()
taskId := controller.task.TrafficControlTaskId
var targetTraffic, taskTraffic, output, setValue float64
var binId = ""
var trafficMap map[string]experiments.TrafficControlTargetTraffic
var measureTime time.Time
if controller.task.ControlType == constants.TrafficControlTaskControlTypePercent {
if controller.IsAllocateExpWise() {
trafficMap = expTargetTrafficMap
binId = expId
} else {
trafficMap = allTargetTrafficMap
}
if input, ok := trafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
taskTraffic = input.TaskTraffic
measureTime = input.RecordTime
} else {
targetTraffic = float64(0)
taskTraffic = float64(1)
measureTime = time.Now().Truncate(time.Second)
}
if controller.IsAllocateExpWise() && targetTraffic < controller.GetMinExpTraffic() {
// 用全局流量代替冷启动的实验流量
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]"+
"\texp=%s\ttargetTraffic=%.0f change to global targetTraffic",
taskId, targetId, controller.target.Name, expId, targetTraffic))
binId = ""
if input, ok := allTargetTrafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
taskTraffic = input.TaskTraffic
} else {
targetTraffic = float64(0)
taskTraffic = float64(1)
}
}
trafficPercentage := targetTraffic / taskTraffic
controller.SetMeasurement(binId, trafficPercentage, measureTime)
output, setValue = controller.Compute(binId, ctx)
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
"traffic=%.0f, percentage=%f, setValue=%f, output=%f, exp=%s", taskId, targetId,
controller.target.Name, targetTraffic, trafficPercentage, setValue/100, output, binId))
if targetTraffic > 0 {
hasTraffic = true
}
} else {
if input, ok := allTargetTrafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
measureTime = input.RecordTime
} else {
targetTraffic = float64(0)
measureTime = time.Now().Truncate(time.Second)
}
controller.SetMeasurement("", targetTraffic, measureTime)
output, setValue = controller.Compute("", ctx)
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
"traffic=%.0f, setValue=%f, output=%f",
taskId, targetId, controller.target.Name, targetTraffic, setValue, output))
if targetTraffic > 0 {
hasTraffic = true
}
}
select {
case <-gCtx.Done():
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\ttimeout in goruntine: <taskId:%s/targetId:%s>[targetName:%s]",
taskId, targetId, controller.target.Name))
return
case retCh <- struct {
string
float64
}{targetId, output}:
}
}(gCtx, targetId, controller)
}
cnt := 0
Loop:
for range controllerMap {
select {
case pair := <-retCh:
output := pair.float64
if output != 0 {
cnt++
targetOutput[pair.string] = output
}
case <-gCtx.Done():
if errors.Is(gCtx.Err(), gocontext.DeadlineExceeded) {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\ttraffic controller timeout: %v", gCtx.Err()))
}
break Loop
}
}
if !hasTraffic {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\tno traffic data detected, maybe flink job is not running"))
for k := range targetOutput {
delete(targetOutput, k)
}
cnt = 0
}
return targetOutput, cnt
}