sort/traffic_control_sort.go (970 lines of code) (raw):
package sort
import (
gocontext "context"
"encoding/json"
"errors"
"fmt"
"github.com/Knetic/govaluate"
"github.com/alibaba/pairec/v2/abtest"
"github.com/alibaba/pairec/v2/constants"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/utils"
"github.com/aliyun/aliyun-pairec-config-go-sdk/v2/experiments"
"golang.org/x/exp/rand"
"gonum.org/v1/gonum/stat/sampleuv"
)
type TrafficControlSort struct {
name string
config *recconf.PIDControllerConfig
exp2controllers map[string]map[string]*PIDController // key1: expId; key2: targetId
controllerLock sync.RWMutex
cloneInstances map[string]*TrafficControlSort
boostScoreSort *BoostScoreSort
context *context.RecommendContext
}
var positionWeight []float64
var expTable []float64
var tanhTable []float64
var sigmoidTable []float64
var experimentClient *experiments.ExperimentClient
func init() {
positionWeight = make([]float64, 500)
for i := 0; i < 500; i++ {
positionWeight[i] = math.Exp(-0.01 * float64(i))
}
expTable = make([]float64, 1000)
for i := 0; i < 1000; i++ {
expTable[i] = math.Exp(float64(i) / 1000.0)
}
tanhTable = make([]float64, 3000)
for i := 0; i < 3000; i++ {
tanhTable[i] = math.Tanh(float64(i) / 1000.0) // 值域范围 [0, 3)
}
sigmoidTable = make([]float64, 10000)
for i := 0; i < 10000; i++ {
x := float64(i)/1000.0 + 5.0 // 范围 [5, 15)
sigmoidTable[i] = 1.0 / (1.0 + math.Exp(10-x))
}
}
func NewTrafficControlSort(config recconf.SortConfig) *TrafficControlSort {
experimentClient = abtest.GetExperimentClient()
if experimentClient == nil {
log.Warning("module=TrafficControlSort\tget experiment client failed.")
}
conf := config.PIDConf
trafficControlSort := TrafficControlSort{
config: &conf,
exp2controllers: make(map[string]map[string]*PIDController),
name: config.Name,
cloneInstances: make(map[string]*TrafficControlSort),
}
if len(conf.BoostScoreConditions) > 0 {
boostConf := recconf.SortConfig{
Debug: config.Debug,
BoostScoreConditions: conf.BoostScoreConditions,
}
trafficControlSort.boostScoreSort = NewBoostScoreSort(boostConf)
}
go func() {
for {
tmpExpControllers := make(map[string]map[string]*PIDController)
trafficControlSort.controllerLock.RLock()
for expId, controllers := range trafficControlSort.exp2controllers {
tmpExpControllers[expId] = controllers
}
trafficControlSort.controllerLock.RUnlock()
for expId := range tmpExpControllers {
trafficControlSort.loadTrafficControlTaskMetaData(expId)
}
time.Sleep(time.Minute) // 这里需要更新频繁一点,不然web页面上meta信息的修改不能及时反应出来
}
}()
return &trafficControlSort
}
func (p *TrafficControlSort) Sort(sortData *SortData) error {
items, good := sortData.Data.([]*module.Item)
if !good {
msg := "sort data type error"
log.Error(fmt.Sprintf("module=TrafficControlSort\terror=%s", msg))
return errors.New(msg)
}
if len(items) == 0 {
return nil
}
user := sortData.User
start := time.Now()
ctx := sortData.Context
if p.context == nil {
p.context = ctx
}
params := ctx.ExperimentResult.GetExperimentParams()
if p.boostScoreSort != nil && params.Get("pid_boost_score", true).(bool) {
err := p.boostScoreSort.Sort(sortData)
if err != nil {
ctx.LogError(fmt.Sprintf("module=TrafficControlSort\tBoostScore\terror=%v", err))
}
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tBoostScoreSort\tcount=%d\tcost=%d", len(items), utils.CostTime(start)))
}
sort.Sort(sort.Reverse(ItemScoreSlice(items)))
for i, item := range items {
item.AddProperty("__traffic_control_id__", i+1)
item.AddProperty("_ORIGIN_POSITION_", i+1)
}
controllerInfo := p.getPidControllers(ctx)
controllerMap := isControlUser(user, controllerInfo)
globalControls, singleControls := splitController(controllerMap, ctx)
if len(globalControls) == 0 && len(singleControls) == 0 {
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tcount=%d\tboth global traffic control and single traffic control are zero", len(items)))
sortData.Data = items
return nil
}
if enable := setHyperParams(controllerMap, ctx); !enable {
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tcount=%d\ttraffic control hyper params turn off", len(items)))
sortData.Data = items
return nil
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tglobal control num %d, single control num %d", len(globalControls), len(singleControls)))
wgCtrl := sync.WaitGroup{}
if len(singleControls) > 0 {
wgCtrl.Add(1)
go microControl(singleControls, items, ctx, &wgCtrl)
}
if len(globalControls) > 0 {
wgCtrl.Add(1)
go macroControl(globalControls, items, ctx, &wgCtrl)
}
wgCtrl.Wait()
pageNo := utils.ToInt(ctx.GetParameter("pageNum"), 1)
pageSize := ctx.Size // utils.ToInt(ctx.GetParameter("pageSize"), 10)
if pageNo < 1 {
pageNo = 1
}
limitFirstPage := params.GetInt("limit_uplift_at_first_page", 0)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tlimit_uplift_at_first_page=%d", limitFirstPage))
for i, item := range items {
finalDeltaRank := item.GetAlgoScore("__delta_rank__")
if finalDeltaRank != 0.0 {
rank := float64(i+1) - finalDeltaRank
if pageNo <= 1 && limitFirstPage != 0 {
if i < pageSize {
item.AddProperty("_NEW_POSITION_", i+1)
} else {
if rank <= float64(pageSize) { // 保证第一页流量调控的结果仅作为打散的候补出现
rank = float64(pageSize) + 1 + tanh(0.001*rank) // rank > pageSize
}
item.AddProperty("_NEW_POSITION_", rank)
}
} else {
item.AddProperty("_NEW_POSITION_", rank)
}
} else {
item.AddProperty("_NEW_POSITION_", i+1)
}
}
sort.Sort(ItemRankSlice(items))
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tcount=%d\tcost=%d", len(items), utils.CostTime(start)))
sortData.Data = items
return nil
}
func (p *TrafficControlSort) loadTrafficControlTaskMetaData(expId string) map[string]*PIDController {
// 调用 SDK 获取调控计划的元信息, 创建 FlowControllers
runEnv := os.Getenv("PAIREC_ENVIRONMENT")
timestamp := p.config.Timestamp
tasks := experimentClient.GetTrafficControlTaskMetaData(runEnv, timestamp)
if len(tasks) == 0 {
log.Info(fmt.Sprintf("module=TrafficControlSort\tcurrent timestamp=%d\tnot find traffic control task.", timestamp))
return nil
}
var oldControllerMap map[string]*PIDController
p.controllerLock.RLock()
if controllerMap, ok := p.exp2controllers[expId]; ok {
oldControllerMap = controllerMap
}
p.controllerLock.RUnlock()
controllerMap := make(map[string]*PIDController, 0)
for i, task := range tasks {
taskUserExpress, err := ParseExpression(task.UserConditionArray, task.UserConditionExpress)
if err != nil {
log.Error(fmt.Sprintf("module=TrafficControlSort\tparse user condition field, please check %s or %s",
task.UserConditionArray, task.UserConditionExpress))
}
for _, value := range task.TrafficControlTargets {
target := value
if target.TrafficControlTargetId == "" {
continue
}
if target.Status == constants.TrafficControlTargetStatusClosed {
continue
}
params := experimentClient.GetSceneParams(task.SceneName)
freeze := params.GetInt(fmt.Sprintf("pid_freeze_target_%s_minutes", target.TrafficControlTargetId), 0)
run := params.GetString(fmt.Sprintf("pid_run_with_zero_input_%s", task.TrafficControlTaskId), "true")
runWithZeroInput := strings.ToLower(run) == "true"
if oldControllerMap != nil {
pidController, ok := oldControllerMap[target.TrafficControlTargetId]
if ok {
// update meta info
pidController.task = &tasks[i]
pidController.target = &target
controllerMap[target.TrafficControlTargetId] = pidController
if (taskUserExpress) != "" {
pidController.SetUserExpress(taskUserExpress)
}
pidController.GenerateItemExpress()
pidController.SetFreezeMinutes(freeze)
pidController.SetRunWithZeroInput(runWithZeroInput)
continue
}
}
controller := NewPIDController(&tasks[i], &target, p.config, expId)
if controller != nil {
if taskUserExpress != "" {
controller.SetUserExpress(taskUserExpress)
}
controller.SetFreezeMinutes(freeze)
controller.SetRunWithZeroInput(runWithZeroInput)
controllerMap[target.TrafficControlTargetId] = controller
}
}
}
p.controllerLock.Lock()
p.exp2controllers[expId] = controllerMap
p.controllerLock.Unlock()
log.Info(fmt.Sprintf("module=TrafficControlSort\tcurrent timestamp=%d\tload %d Traffic Control Task for exp=%s.",
timestamp, len(controllerMap), expId))
return controllerMap
}
func loadTargetItemTraffic(ctx *context.RecommendContext, items []*module.Item, controllerMap map[string]*PIDController) map[string]map[string]float64 {
var scene string
var good bool
s := ctx.GetParameter("scene")
if scene, good = s.(string); !good {
ctx.LogError("module=TrafficControlSort\tfailed to get scene name")
return nil
}
itemIds := make([]string, len(items), len(items))
for i, item := range items {
itemIds[i] = string(item.Id)
}
// sdk 可能会返回已过期的Target下Item的历史流量,这样的话取最大值就是不对的
result := make(map[string]map[string]float64) // key1: targetId, key2:expId, value: traffic
runEnv := os.Getenv("PAIREC_ENVIRONMENT")
traffics := experimentClient.GetTrafficControlTargetTraffic(runEnv, scene, itemIds...)
hasTraffic := false
for _, traffic := range traffics {
if ctrl, ok := controllerMap[traffic.TrafficControlTargetId]; ok {
ctrl.SetMeasurement(traffic.ItemOrExpId, traffic.TargetTraffic, traffic.RecordTime)
} else {
continue
}
if traffic.TargetTraffic <= 0 {
continue
}
hasTraffic = true
if dict, ok := result[traffic.TrafficControlTargetId]; ok {
dict[traffic.ItemOrExpId] = traffic.TargetTraffic
} else {
dict = make(map[string]float64)
dict[traffic.ItemOrExpId] = traffic.TargetTraffic
result[traffic.TrafficControlTargetId] = dict
}
}
if hasTraffic {
return result
}
return nil
}
func (p *TrafficControlSort) getPidControllers(ctx *context.RecommendContext) map[string]*PIDController {
var experiment string
params := ctx.ExperimentResult.GetExperimentParams()
expId := params.Get("pid_experiment_id", nil)
expLayer := params.Get("pid_experiment_layer", nil)
if expId != nil {
experiment = expId.(string)
} else if expLayer != nil {
layer := expLayer.(string)
n := len(layer)
if !strings.Contains(layer, "#") {
ctx.LogWarning(fmt.Sprintf("pid experiment layer `%s` maybe a prefix of another layer", layer))
}
recExpId := ctx.ExperimentResult.GetExpId()
expIds := strings.Split(recExpId, "_")
for i, id := range expIds {
if i == 0 || len(id) < n {
continue
}
if id[:n] == layer {
experiment = id
break
}
}
if experiment == "" && recExpId != "" {
ctx.LogError(fmt.Sprintf("parse pid experiment layer failed: `%s`", expLayer))
}
}
p.controllerLock.RLock()
if controllers, ok := p.exp2controllers[experiment]; ok {
p.controllerLock.RUnlock()
return controllers
}
p.controllerLock.RUnlock()
return p.loadTrafficControlTaskMetaData(experiment)
}
func splitController(controllers map[string]*PIDController, ctx *context.RecommendContext) (map[string]*PIDController, map[string]*PIDController) {
wholeCtrls := make(map[string]*PIDController)
singleCtrls := make(map[string]*PIDController)
if nil == controllers || len(controllers) == 0 {
return wholeCtrls, singleCtrls
}
for targetId, controller := range controllers {
if !controller.IsControlledTraffic(ctx) {
continue
}
if controller.task.ControlGranularity == constants.TrafficControlTaskControlGranularitySingle {
singleCtrls[targetId] = controller
} else {
wholeCtrls[targetId] = controller
}
}
return wholeCtrls, singleCtrls
}
// 宏观调控,针对目标整体
func macroControl(controllerMap map[string]*PIDController, items []*module.Item, ctx *context.RecommendContext, wgCtrl *sync.WaitGroup) {
defer wgCtrl.Done()
begin := time.Now()
var targetOutput map[string]float64
var count int
targetOutput, count = FlowControl(controllerMap, ctx)
if len(targetOutput) == 0 || count == 0 {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttraffic control task output is zero"))
return
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget out put: %v", targetOutput))
begin = time.Now()
itemScores := make([]float64, len(items))
// 计算各个目标的偏好分的全局占比
totalScore := 0.0
maxScore := 0.0 // item 列表中的最大分
targetScore := make(map[string]float64)
for i, item := range items {
score := item.Score
if score == 0.0 {
score = 1e-8
}
if i == 0 {
maxScore = score
itemScores[0] = math.E
} else {
v := score / maxScore // 归一化 rank score
idx := int(v * 1000)
if idx < 0 {
idx = 0
}
if idx >= 1000 {
idx = 999
}
itemScores[i] = expTable[idx]
}
posWeight := 0.006737946999
if i < 500 {
posWeight = positionWeight[i]
}
score *= posWeight
totalScore += score
for targetId, controller := range controllerMap {
if alpha, ok := targetOutput[targetId]; ok && alpha != 0 && controller.IsControlledItem(item) {
targetScore[targetId] += score
}
}
}
for targetId, score := range targetScore {
targetScore[targetId] = score / totalScore
}
params := ctx.ExperimentResult.GetExperimentParams()
maxUpliftTargetCnt := params.GetInt("pid_max_uplift_target_cnt", len(controllerMap))
if maxUpliftTargetCnt < len(controllerMap) {
// 按照偏好分采样 `maxUpliftTargetCnt` 个需要上提的目标,未被选中的上提目标调控力度置为0
SampleControlTargetsByScore(maxUpliftTargetCnt, targetScore, targetOutput, ctx)
}
pidGamma := params.GetFloat("pid_gamma", 1.0)
pidBeta := params.GetFloat("pid_beta", 1.0)
// preprocess, adjust control signal
for targetId, alpha := range targetOutput {
if alpha > 0 { // uplift
scoreWeight := targetScore[targetId]
rho := 1.0 + pidGamma*tanh(pidBeta*scoreWeight) // 给更感兴趣的目标更大的提权,用来区分不同的调控目标
alpha *= rho
targetOutput[targetId] = alpha
}
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmacro control\ttarget output: %v", targetOutput))
// compute delta rank
begin = time.Now()
pageNo := utils.ToInt(ctx.GetParameter("pageNum"), 1)
if pageNo < 1 {
pageNo = 1
}
keepCtrlIdScore := params.GetFloat("pid_keep_id_target_score_weight", 1.0)
if keepCtrlIdScore < 0.3 {
keepCtrlIdScore = 0.3
}
ctrlParams := &controlParams{
targetScore: targetScore,
itemScores: itemScores,
eta: params.GetFloat("pid_eta", 1.6),
pageNo: pageNo,
keepCtrlIdScore: keepCtrlIdScore,
newCtrlIdThreshold: params.GetFloat("pid_new_id_target_threshold", 1.0),
needNewCtrlId: make(map[string]bool),
}
targetControlledNum := make(map[string]int, len(controllerMap))
mu := sync.Mutex{}
for targetId := range controllerMap {
targetControlledNum[targetId] = len(items)
newCtrlId := utils.GetExperimentParamByPath(params, fmt.Sprintf("pid_params.%s.new_ctrl_id", targetId), false)
ctrlParams.needNewCtrlId[targetId] = newCtrlId.(bool)
}
// compute delta rank
parallel := params.GetInt("pid_parallel", 10)
ch := make(chan int, parallel)
defer close(ch)
var wg sync.WaitGroup
batchSize := len(items) / parallel
if len(items)%parallel != 0 {
batchSize++
}
if batchSize < 1 {
batchSize = 1
}
for b, e := 0, batchSize; b < len(items); b, e = e, e+batchSize {
var candidates []*module.Item
if e < len(items) {
candidates = items[b:e]
} else {
candidates = items[b:]
}
ch <- b
wg.Add(1)
go func(b int, items []*module.Item) {
defer wg.Done()
for idx, item := range items {
i := b + idx
finalDeltaRank := 0.0
for targetId, controller := range controllerMap {
if !controller.IsControlledItem(item) {
if ctx.Debug {
mu.Lock()
targetControlledNum[targetId] = targetControlledNum[targetId] - 1
mu.Unlock()
}
continue
}
if alpha, ok := targetOutput[targetId]; ok && alpha != 0 {
deltaRank := computeDeltaRank(controller, item, i, alpha, ctrlParams, ctx)
finalDeltaRank += deltaRank // 形成合力
}
}
if finalDeltaRank != 0.0 {
item.IncrAlgoScore("__delta_rank__", finalDeltaRank)
controlId, _ := item.IntProperty("__traffic_control_id__")
if controlId == 0 && finalDeltaRank < 1.0 {
item.AddProperty("__traffic_control_id__", item.GetProperty("_ORIGIN_POSITION_"))
}
}
}
<-ch
}(b, candidates)
}
wg.Wait()
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\tmacro control\tcount=%d\tcost=%d\tcontrolled target=%v",
len(items), utils.CostTime(begin), targetControlledNum))
}
// FlowControl 非单品(整体)目标流量调控,返回各个目标的调控力度
func FlowControl(controllerMap map[string]*PIDController, ctx *context.RecommendContext) (map[string]float64, int) {
// 获取(granularity="Global")类型的调控目标 当前已累计完成的流量
targetOutput := make(map[string]float64)
var scene string
var good bool
s := ctx.GetParameter("scene")
if scene, good = s.(string); !good {
ctx.LogError("failed to get scene name")
return targetOutput, 0
}
// 获取流量实时统计值
runEnv := os.Getenv("PAIREC_ENVIRONMENT")
expId := ctx.ExperimentResult.GetExpId()
traffics := experimentClient.GetTrafficControlTargetTraffic(runEnv, scene, expId, "ER_ALL")
if ctx.Debug {
data, _ := json.Marshal(traffics)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tflow control\texpId:%s\ttraffics:%s", expId, string(data)))
}
allTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
expTargetTrafficMap := make(map[string]experiments.TrafficControlTargetTraffic)
for _, traffic := range traffics {
if traffic.ItemOrExpId == "ER_ALL" {
allTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
} else {
expTargetTrafficMap[traffic.TrafficControlTargetId] = traffic
}
}
hasTraffic := false
retCh := make(chan struct {
string
float64
}, utils.MinInt(len(controllerMap), 64))
defer close(retCh)
gCtx, cancel := gocontext.WithTimeout(gocontext.Background(), time.Millisecond*12)
defer cancel()
for targetId, controller := range controllerMap {
go func(gCtx gocontext.Context, targetId string, controller *PIDController) {
defer func() {
if err := recover(); err != nil {
log.Warning(fmt.Sprintf("traffic control timeout in background: <taskId:%s/targetId:%s>[targetName:%s]",
controller.task.TrafficControlTaskId, targetId, controller.target.Name))
}
}()
taskId := controller.task.TrafficControlTaskId
var targetTraffic, taskTraffic, output, setValue float64
var binId = ""
var trafficMap map[string]experiments.TrafficControlTargetTraffic
var measureTime time.Time
if controller.task.ControlType == constants.TrafficControlTaskControlTypePercent {
if controller.IsAllocateExpWise() {
trafficMap = expTargetTrafficMap
binId = expId
} else {
trafficMap = allTargetTrafficMap
}
if input, ok := trafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
taskTraffic = input.TaskTraffic
measureTime = input.RecordTime
} else {
targetTraffic = float64(0)
taskTraffic = float64(1)
measureTime = time.Now().Truncate(time.Second)
}
if controller.IsAllocateExpWise() && targetTraffic < controller.GetMinExpTraffic() {
// 用全局流量代替冷启动的实验流量
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]"+
"\texp=%s\ttargetTraffic=%.0f change to global targetTraffic",
taskId, targetId, controller.target.Name, expId, targetTraffic))
binId = ""
if input, ok := allTargetTrafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
taskTraffic = input.TaskTraffic
} else {
targetTraffic = float64(0)
taskTraffic = float64(1)
}
}
trafficPercentage := targetTraffic / taskTraffic
controller.SetMeasurement(binId, trafficPercentage, measureTime)
output, setValue = controller.Compute(binId, ctx)
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
"traffic=%.0f, percentage=%f, setValue=%f, output=%f, exp=%s", taskId, targetId,
controller.target.Name, targetTraffic, trafficPercentage, setValue/100, output, binId))
if targetTraffic > 0 {
hasTraffic = true
}
} else {
if input, ok := allTargetTrafficMap[targetId]; ok {
targetTraffic = input.TargetTraffic
measureTime = input.RecordTime
} else {
targetTraffic = float64(0)
measureTime = time.Now().Truncate(time.Second)
}
controller.SetMeasurement("", targetTraffic, measureTime)
output, setValue = controller.Compute("", ctx)
ctx.LogInfo(fmt.Sprintf("module=TrafficControlSort\t<taskId:%s/targetId:%s>[targetName:%s]\t"+
"traffic=%.0f, setValue=%f, output=%f",
taskId, targetId, controller.target.Name, targetTraffic, setValue, output))
if targetTraffic > 0 {
hasTraffic = true
}
}
select {
case <-gCtx.Done():
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\ttimeout in goruntine: <taskId:%s/targetId:%s>[targetName:%s]",
taskId, targetId, controller.target.Name))
return
case retCh <- struct {
string
float64
}{targetId, output}:
}
}(gCtx, targetId, controller)
}
cnt := 0
Loop:
for range controllerMap {
select {
case pair := <-retCh:
output := pair.float64
if output != 0 {
cnt++
targetOutput[pair.string] = output
}
case <-gCtx.Done():
if errors.Is(gCtx.Err(), gocontext.DeadlineExceeded) {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\ttraffic controller timeout: %v", gCtx.Err()))
}
break Loop
}
}
if !hasTraffic {
ctx.LogWarning(fmt.Sprintf("module=TrafficControlSort\tflow control\tno traffic data detected, maybe flink job is not running"))
for k := range targetOutput {
delete(targetOutput, k)
}
cnt = 0
}
return targetOutput, cnt
}
// SampleControlTargetsByScore 按照偏好分权重选择n个上提目标,未被选中的目标调控值置0
func SampleControlTargetsByScore(maxUpliftTargetCnt int, targetScore, alpha map[string]float64, ctx *context.RecommendContext) {
if maxUpliftTargetCnt >= len(targetScore) || maxUpliftTargetCnt <= 0 {
return
}
targetIds := make([]string, 0, len(targetScore))
scores := make([]float64, 0, len(targetScore))
sum := 0.0
for targetId, score := range targetScore {
if alpha[targetId] > 0 { // only affect targets to be uplifted
targetIds = append(targetIds, targetId)
scores = append(scores, score)
sum += score
}
}
num := len(scores)
if num == 0 || maxUpliftTargetCnt >= num {
return
}
// normalize
for j := range scores {
scores[j] /= sum
}
w := sampleuv.NewWeighted(
scores,
rand.New(rand.NewSource(uint64(time.Now().UnixNano()))))
selected := make(map[string]bool)
for j := 0; j < maxUpliftTargetCnt; j++ {
if i, ok := w.Take(); ok {
targetId := targetIds[i]
selected[targetId] = true
}
}
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tsample control target\tselected uplift target ids %v score", selected))
for _, targetId := range targetIds {
if alpha[targetId] <= 0 {
continue
}
if _, ok := selected[targetId]; !ok {
alpha[targetId] = 0
}
}
}
// 微观调控,针对单个item
func microControl(controllerMap map[string]*PIDController, items []*module.Item, ctx *context.RecommendContext, wgCtrl *sync.WaitGroup) {
defer wgCtrl.Done()
itemTargetTraffic := loadTargetItemTraffic(ctx, items, controllerMap) // key1: targetId, key2: itemId, value: traffic
if ctx.Debug {
data, _ := json.Marshal(itemTargetTraffic)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmicro control\titem target traffic:%s", string(data)))
}
params := ctx.ExperimentResult.GetExperimentParams()
maxUpliftCnt := params.GetInt("pid_max_uplift_item_cnt", 5)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tmicro control\tpid_max_uplift_item_cnt=%d", maxUpliftCnt))
upliftCnt := 0
maxScore := 0.0
//Calculate the number of items controlled by each target, key:targetId; value:control item sum
controlNumberMap := make(map[string]int)
for i, item := range items {
score := item.Score
if score == 0 {
score = 1e-8
}
if i == 0 {
maxScore = score
}
deltaRank := 0.0
for targetId, controller := range controllerMap {
if !controller.IsControlledItem(item) {
continue
}
controlNumberMap[targetId] = controlNumberMap[targetId] + 1
traffic := float64(0)
if dict, ok := itemTargetTraffic[targetId]; ok {
if value, okay := dict[string(item.Id)]; okay {
traffic = value
}
}
alpha, setValue := controller.Compute(string(item.Id), ctx)
delta := alpha
pos, _ := item.IntProperty("_ORIGIN_POSITION_")
if alpha > 0 { // uplift
if i == 0 {
delta *= math.E
} else {
v := score / maxScore // 归一化 rank score
idx := int(v * 1000)
if idx < 0 {
idx = 0
}
if idx >= 1000 {
idx = 999
}
delta *= expTable[idx]
}
}
deltaRank += delta // 多个目标调控方向不一致时,需要扳手腕看谁力气大
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\t[targetId:%s/targetName:%s], itemId:%s, "+
"origin pos=%d, traffic=%.0f, setValue=%f, percentage=%f, alpha=%f, delta rank=%f",
targetId, controller.target.Name, item.Id, pos, traffic, setValue, traffic/setValue, alpha, delta))
}
if deltaRank != 0.0 {
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\titem:%v\tdelta rank:%v", item.Id, deltaRank))
if deltaRank < 0 {
item.IncrAlgoScore("__delta_rank__", deltaRank)
} else if upliftCnt < maxUpliftCnt { // uplift
item.IncrAlgoScore("__delta_rank__", deltaRank)
upliftCnt++
pos, _ := item.IntProperty("_ORIGIN_POSITION_")
if pos > ctx.Size && deltaRank >= 1.0 {
item.AddProperty("__traffic_control_id__", 0)
}
}
}
}
}
type controlParams struct {
targetScore map[string]float64
itemScores []float64
eta float64
pageNo int
newCtrlIdThreshold float64
keepCtrlIdScore float64
needNewCtrlId map[string]bool
}
// computeDeltaRank 计算位置偏移值
func computeDeltaRank(c *PIDController, item *module.Item, rank int, alpha float64, args *controlParams, ctx *context.RecommendContext) float64 {
scoreWeight := args.targetScore[c.target.TrafficControlTargetId]
itemScore := args.itemScores[rank]
var deltaRank = alpha
if alpha < 0.0 { // pull down
rho := args.eta * (1.0 - tanh(scoreWeight))
deltaRank *= sigmoid(float64(rank), rho)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tcompute delta rank\titem %s [%s/%s], "+
"score proportion=%.3f, rho=%.3f, alpha=%.6f, origin pos=%d, delta rank=%.6f [pull down]",
item.Id, c.target.TrafficControlTargetId, c.target.Name, scoreWeight, rho, alpha, rank+1, deltaRank))
} else { // uplift
deltaRank *= itemScore // item.Score 越大,提权越多;用来在不同提取目标间竞争
distinctStartPos := ctx.Size
if scoreWeight > args.keepCtrlIdScore && args.pageNo > 1 {
multiple := (scoreWeight - 0.3) * 10
distinctStartPos += int(multiple * float64(ctx.Size))
}
if rank > distinctStartPos && deltaRank >= 1.0 {
needNewCtrlId := args.needNewCtrlId[c.target.TrafficControlTargetId] || c.target.SplitParts.SetValues[0]/100 > int64(args.newCtrlIdThreshold)
if c.task.ControlType == constants.TrafficControlTaskControlTypePercent && needNewCtrlId {
targetId, _ := strconv.Atoi(c.target.TrafficControlTargetId)
item.AddProperty("__traffic_control_id__", -targetId) //改成负的
} else {
controlId, _ := item.IntProperty("__traffic_control_id__")
if controlId > 0 { // 已经被别的controller置为负数时不再更新为0
item.AddProperty("__traffic_control_id__", 0)
}
}
}
controlId, _ := item.IntProperty("__traffic_control_id__")
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tcompute delta rank\titem:%s\t[targetId:%s/targetName:%s],"+
"score proportion=%.3f,norm_score=%.3f, alpha=%.6f, origin pos=%d, delta rank=%.6f, traffic_control_id=%d [uplift]",
item.Id, c.target.TrafficControlTargetId, c.target.Name, scoreWeight, itemScore, alpha, rank+1, deltaRank, controlId))
}
return deltaRank
}
func isControlUser(user *module.User, controllerMap map[string]*PIDController) map[string]*PIDController {
controllerNewMap := make(map[string]*PIDController)
for targetId, controller := range controllerMap {
userExpression := controller.userExpression
if userExpression == "" {
controllerNewMap[targetId] = controller
continue
}
expression, err := govaluate.NewEvaluableExpression(userExpression)
if err != nil {
log.Error(fmt.Sprintf("module=PIDController\tgenerate user expression field, expression:%s, err:%v", userExpression, err))
return controllerNewMap
}
properties := user.Properties
result, err := expression.Evaluate(properties)
if err != nil {
log.Error(fmt.Sprintf("module=PIDController\tcompute user expression field, err:%v", err))
return controllerNewMap
}
ok := ToBool(result, false)
if ok {
controllerNewMap[targetId] = controller
}
}
return controllerNewMap
}
type ItemRankSlice []*module.Item
func (us ItemRankSlice) Len() int {
return len(us)
}
func (us ItemRankSlice) Less(i, j int) bool {
iRank, _ := us[i].FloatProperty("_NEW_POSITION_")
jRank, _ := us[j].FloatProperty("_NEW_POSITION_")
if iRank != jRank {
return iRank < jRank
}
iOriRank, _ := us[i].IntProperty("_ORIGIN_POSITION_")
jOriRank, _ := us[j].IntProperty("_ORIGIN_POSITION_")
return iOriRank < jOriRank
}
func (us ItemRankSlice) Swap(i, j int) {
tmp := us[i]
us[i] = us[j]
us[j] = tmp
}
func tanh(x float64) float64 {
idx := int(x * 1000)
if idx < 0 {
idx = 0
} else if idx >= 3000 {
idx = 2999
}
return tanhTable[idx]
}
func sigmoid(x, rho float64) float64 {
idx := int(rho*x*1000.0) - 5000.0
if idx < 0 {
idx = 0
} else if idx >= 10000 {
return 1
}
return sigmoidTable[idx]
}
func setHyperParams(controllers map[string]*PIDController, ctx *context.RecommendContext) bool {
params := ctx.ExperimentResult.GetExperimentParams()
on := params.GetInt("pid_control_enable", 1)
if on == 0 {
return false
}
offPrefix := params.GetString("pid_off_target_name_prefix", "")
if offPrefix != "" {
for _, controller := range controllers {
if strings.HasPrefix(controller.target.Name, offPrefix) {
controller.SetOnline(false)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tset target turn off, targetName:%s", controller.target.Name))
}
}
}
onPrefix := params.GetString("pid_on_target_name_prefix", "")
if onPrefix != "" {
for _, controller := range controllers {
if strings.HasPrefix(controller.target.Name, onPrefix) {
controller.SetOnline(true)
ctx.LogDebug(fmt.Sprintf("module=TrafficControlSort\tset target turn on, targetName:%s", controller.target.Name))
}
}
}
planParams := params.Get("pid_plan_params", nil)
if planParams != nil {
if values, ok := planParams.(map[string]interface{}); ok {
for pid, args := range values {
if dict, good := args.(map[string]interface{}); good {
if _on, exist := dict["online"]; exist {
for _, c := range controllers {
if c.task.TrafficControlTaskId == pid {
c.SetOnline(_on.(bool))
}
}
}
}
}
}
}
hyperParams := params.Get("pid_params", nil)
if hyperParams == nil {
return true
}
if values, ok := hyperParams.(map[string]interface{}); ok {
hasDefaultValue := false
var defaultKp, defaultKi, defaultKd, defaultErrDiscount float64
var defaultStartPageNo = 0
if args, exist := values["default"]; exist {
if dict, good := args.(map[string]interface{}); good {
hasDefaultValue = true
if _kp, okay := dict["kp"]; okay {
defaultKp = _kp.(float64)
}
if _ki, okay := dict["ki"]; okay {
defaultKi = _ki.(float64)
}
if _kd, okay := dict["kd"]; okay {
defaultKd = _kd.(float64)
}
if _d, okay := dict["err_discount"]; okay {
defaultErrDiscount = _d.(float64)
}
if _s, okay := dict["start_page_num"]; okay {
defaultStartPageNo = int(_s.(float64))
}
}
}
if hasDefaultValue {
for _, c := range controllers {
if _, okay := values[c.target.TrafficControlTargetId]; !okay {
if defaultKp != 0 {
c.SetParameters(defaultKp, defaultKi, defaultKd)
}
c.SetStartPageNum(defaultStartPageNo)
c.SetErrDiscount(defaultErrDiscount)
}
}
}
for pid, args := range values {
if pid == "default" {
continue
}
if c, okay := controllers[pid]; okay {
dict, good := args.(map[string]interface{})
if !good {
if hasDefaultValue {
c.SetParameters(defaultKp, defaultKi, defaultKd)
}
continue
}
var kp, ki, kd float64
if _kp, exist := dict["kp"]; exist {
kp = _kp.(float64)
}
if _ki, exist := dict["ki"]; exist {
ki = _ki.(float64)
}
if _kd, exist := dict["kd"]; exist {
kd = _kd.(float64)
}
c.SetParameters(kp, ki, kd)
if threshold, exist := dict["integral_threshold"]; exist {
c.SetIntegralThreshold(threshold.(float64))
}
if threshold, exist := dict["err_threshold"]; exist {
c.SetErrorThreshold(threshold.(float64))
}
if discount, exist := dict["err_discount"]; exist {
c.SetErrDiscount(discount.(float64))
}
if _exp, exist := dict["allocate_exp_wise"]; exist {
c.SetAllocateExpWise(_exp.(bool))
}
if _s, exist := dict["start_page_num"]; exist {
startPageNo := int(_s.(float64))
c.SetStartPageNum(startPageNo)
}
if _s, exist := dict["min_exp_traffic"]; exist {
c.SetMinExpTraffic(_s.(float64))
}
if _on, exist := dict["online"]; exist {
c.SetOnline(_on.(bool))
}
}
}
}
return true
}