in pkg/datasource/k8s/controllers/flowrules_controller.go [116:172]
func (r *FlowRulesReconciler) assembleFlowRules(rs *datasourcev1.FlowRules) []*flow.Rule {
ret := make([]*flow.Rule, 0, len(rs.Spec.Rules))
log := r.Logger
for _, rule := range rs.Spec.Rules {
cbRule := &flow.Rule{
ID: rule.Id,
Resource: rule.Resource,
TokenCalculateStrategy: 0,
ControlBehavior: 0,
Threshold: float64(rule.Threshold),
RelationStrategy: 0,
RefResource: rule.RefResource,
MaxQueueingTimeMs: uint32(rule.MaxQueueingTimeMs),
WarmUpPeriodSec: uint32(rule.WarmUpPeriodSec),
WarmUpColdFactor: uint32(rule.WarmUpColdFactor),
}
switch rule.TokenCalculateStrategy {
case "":
cbRule.TokenCalculateStrategy = flow.Direct
case DirectTokenCalculateStrategy:
cbRule.TokenCalculateStrategy = flow.Direct
case WarmUpTokenCalculateStrategy:
cbRule.TokenCalculateStrategy = flow.WarmUp
default:
log.Error(errors.New("unsupported TokenCalculateStrategy for flow.Rule"), "", "TokenCalculateStrategy", rule.TokenCalculateStrategy)
continue
}
switch rule.ControlBehavior {
case "":
cbRule.ControlBehavior = flow.Reject
case RejectControlBehavior:
cbRule.ControlBehavior = flow.Reject
case ThrottlingControlBehavior:
cbRule.ControlBehavior = flow.Throttling
default:
log.Error(errors.New("unsupported ControlBehavior for flow.Rule"), "", "controlBehavior", rule.ControlBehavior)
continue
}
switch rule.RelationStrategy {
case "":
cbRule.RelationStrategy = flow.CurrentResource
case CurrentResourceRelationStrategy:
cbRule.RelationStrategy = flow.CurrentResource
case AssociatedResourceRelationStrategy:
cbRule.RelationStrategy = flow.AssociatedResource
default:
log.Error(errors.New("unsupported RelationStrategy for flow.Rule"), "", "relationStrategy", rule.RelationStrategy)
continue
}
ret = append(ret, cbRule)
}
return ret
}