func()

in pkg/datasource/k8s/controllers/flowrules_controller.go [116:172]


func (r *FlowRulesReconciler) assembleFlowRules(rs *datasourcev1.FlowRules) []*flow.Rule {
	ret := make([]*flow.Rule, 0, len(rs.Spec.Rules))
	log := r.Logger
	for _, rule := range rs.Spec.Rules {
		cbRule := &flow.Rule{
			ID:                     rule.Id,
			Resource:               rule.Resource,
			TokenCalculateStrategy: 0,
			ControlBehavior:        0,
			Threshold:              float64(rule.Threshold),
			RelationStrategy:       0,
			RefResource:            rule.RefResource,
			MaxQueueingTimeMs:      uint32(rule.MaxQueueingTimeMs),
			WarmUpPeriodSec:        uint32(rule.WarmUpPeriodSec),
			WarmUpColdFactor:       uint32(rule.WarmUpColdFactor),
		}

		switch rule.TokenCalculateStrategy {
		case "":
			cbRule.TokenCalculateStrategy = flow.Direct
		case DirectTokenCalculateStrategy:
			cbRule.TokenCalculateStrategy = flow.Direct
		case WarmUpTokenCalculateStrategy:
			cbRule.TokenCalculateStrategy = flow.WarmUp
		default:
			log.Error(errors.New("unsupported TokenCalculateStrategy for flow.Rule"), "", "TokenCalculateStrategy", rule.TokenCalculateStrategy)
			continue
		}

		switch rule.ControlBehavior {
		case "":
			cbRule.ControlBehavior = flow.Reject
		case RejectControlBehavior:
			cbRule.ControlBehavior = flow.Reject
		case ThrottlingControlBehavior:
			cbRule.ControlBehavior = flow.Throttling
		default:
			log.Error(errors.New("unsupported ControlBehavior for flow.Rule"), "", "controlBehavior", rule.ControlBehavior)
			continue
		}

		switch rule.RelationStrategy {
		case "":
			cbRule.RelationStrategy = flow.CurrentResource
		case CurrentResourceRelationStrategy:
			cbRule.RelationStrategy = flow.CurrentResource
		case AssociatedResourceRelationStrategy:
			cbRule.RelationStrategy = flow.AssociatedResource
		default:
			log.Error(errors.New("unsupported RelationStrategy for flow.Rule"), "", "relationStrategy", rule.RelationStrategy)
			continue
		}

		ret = append(ret, cbRule)
	}
	return ret
}