in xds/client/resource/unmarshal_rds.go [234:423]
func routesProtoToSlice(routes []*v3routepb.Route, csps map[string]clusterspecifier.BalancerConfig, logger dubbogoLogger.Logger, v2 bool) ([]*Route, map[string]bool, error) {
var routesRet []*Route
var cspNames = make(map[string]bool)
for _, r := range routes {
match := r.GetMatch()
if match == nil {
return nil, nil, fmt.Errorf("route %+v doesn't have a match", r)
}
if len(match.GetQueryParameters()) != 0 {
// Ignore route with query parameters.
logger.Warnf("route %+v has query parameter matchers, the route will be ignored", r)
continue
}
pathSp := match.GetPathSpecifier()
if pathSp == nil {
return nil, nil, fmt.Errorf("route %+v doesn't have a path specifier", r)
}
var route Route
switch pt := pathSp.(type) {
case *v3routepb.RouteMatch_Prefix:
route.Prefix = &pt.Prefix
case *v3routepb.RouteMatch_Path:
route.Path = &pt.Path
case *v3routepb.RouteMatch_SafeRegex:
regex := pt.SafeRegex.GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
}
route.Regex = re
default:
return nil, nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt)
}
if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil {
route.CaseInsensitive = !caseSensitive.Value
}
for _, h := range match.GetHeaders() {
var header HeaderMatcher
switch ht := h.GetHeaderMatchSpecifier().(type) {
case *v3routepb.HeaderMatcher_ExactMatch:
header.ExactMatch = &ht.ExactMatch
case *v3routepb.HeaderMatcher_SafeRegexMatch:
regex := ht.SafeRegexMatch.GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
}
header.RegexMatch = re
case *v3routepb.HeaderMatcher_RangeMatch:
header.RangeMatch = &Int64Range{
Start: ht.RangeMatch.Start,
End: ht.RangeMatch.End,
}
case *v3routepb.HeaderMatcher_PresentMatch:
header.PresentMatch = &ht.PresentMatch
case *v3routepb.HeaderMatcher_PrefixMatch:
header.PrefixMatch = &ht.PrefixMatch
case *v3routepb.HeaderMatcher_SuffixMatch:
header.SuffixMatch = &ht.SuffixMatch
default:
return nil, nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht)
}
header.Name = h.GetName()
invert := h.GetInvertMatch()
header.InvertMatch = &invert
route.Headers = append(route.Headers, &header)
}
if fr := match.GetRuntimeFraction(); fr != nil {
d := fr.GetDefaultValue()
n := d.GetNumerator()
switch d.GetDenominator() {
case v3typepb.FractionalPercent_HUNDRED:
n *= 10000
case v3typepb.FractionalPercent_TEN_THOUSAND:
n *= 100
case v3typepb.FractionalPercent_MILLION:
}
route.Fraction = &n
}
switch r.GetAction().(type) {
case *v3routepb.Route_Route:
route.WeightedClusters = make(map[string]WeightedCluster)
action := r.GetRoute()
// Hash Policies are only applicable for a Ring Hash LB.
if envconfig.XDSRingHash {
hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger)
if err != nil {
return nil, nil, err
}
route.HashPolicies = hp
}
switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
case *v3routepb.RouteAction_WeightedClusters:
wcs := a.WeightedClusters
var totalWeight uint32
for _, c := range wcs.Clusters {
w := c.GetWeight().GetValue()
if w == 0 {
continue
}
wc := WeightedCluster{Weight: w}
if !v2 {
cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig())
if err != nil {
return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err)
}
wc.HTTPFilterConfigOverride = cfgs
}
route.WeightedClusters[c.GetName()] = wc
totalWeight += w
}
// envoy xds doc
// default TotalWeight https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto.html#envoy-v3-api-field-config-route-v3-weightedcluster-total-weight
wantTotalWeight := uint32(100)
if tw := wcs.GetTotalWeight(); tw != nil {
wantTotalWeight = tw.GetValue()
}
if totalWeight != wantTotalWeight {
return nil, nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, expected total weight from response: %v", r, a, totalWeight, wantTotalWeight)
}
if totalWeight == 0 {
return nil, nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a)
}
case *v3routepb.RouteAction_ClusterHeader:
continue
case *v3routepb.RouteAction_ClusterSpecifierPlugin:
if !envconfig.XDSRLS {
return nil, nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a)
}
if _, ok := csps[a.ClusterSpecifierPlugin]; !ok {
// "When processing RouteActions, if any action includes a
// cluster_specifier_plugin value that is not in
// RouteConfiguration.cluster_specifier_plugins, the
// resource will be NACKed." - RLS in xDS design
return nil, nil, fmt.Errorf("route %+v, action %+v, specifies a cluster specifier plugin %+v that is not in Route Configuration", r, a, a.ClusterSpecifierPlugin)
}
cspNames[a.ClusterSpecifierPlugin] = true
route.ClusterSpecifierPlugin = a.ClusterSpecifierPlugin
default:
return nil, nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a)
}
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
dur := msd.GetGrpcTimeoutHeaderMax()
if dur == nil {
dur = msd.GetMaxStreamDuration()
}
if dur != nil {
d := dur.AsDuration()
route.MaxStreamDuration = &d
}
var err error
route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy())
if err != nil {
return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err)
}
route.ActionType = RouteActionRoute
case *v3routepb.Route_NonForwardingAction:
// Expected to be used on server side.
route.ActionType = RouteActionNonForwardingAction
default:
route.ActionType = RouteActionUnsupported
}
if !v2 {
cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig())
if err != nil {
return nil, nil, fmt.Errorf("route %+v: %v", r, err)
}
route.HTTPFilterConfigOverride = cfgs
}
routesRet = append(routesRet, &route)
}
return routesRet, cspNames, nil
}