in xds/client/resource/unmarshal_cds.go [96:190]
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
var lbPolicy *ClusterLBPolicyRingHash
// todo @(laurence) this direct set
cluster.LbPolicy = v3clusterpb.Cluster_ROUND_ROBIN
switch cluster.GetLbPolicy() {
case v3clusterpb.Cluster_ROUND_ROBIN:
lbPolicy = nil // The default is round_robin, and there's no config to set.
case v3clusterpb.Cluster_RING_HASH:
if !envconfig.XDSRingHash {
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
rhc := cluster.GetRingHashLbConfig()
if rhc.GetHashFunction() != v3clusterpb.Cluster_RingHashLbConfig_XX_HASH {
return ClusterUpdate{}, fmt.Errorf("unsupported ring_hash hash function %v in response: %+v", rhc.GetHashFunction(), cluster)
}
// Minimum defaults to 1024 entries, and limited to 8M entries Maximum
// defaults to 8M entries, and limited to 8M entries
var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize
if min := rhc.GetMinimumRingSize(); min != nil {
if min.GetValue() > ringHashSizeUpperBound {
return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash mininum ring size %v in response: %+v", min.GetValue(), cluster)
}
minSize = min.GetValue()
}
if max := rhc.GetMaximumRingSize(); max != nil {
if max.GetValue() > ringHashSizeUpperBound {
return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash maxinum ring size %v in response: %+v", max.GetValue(), cluster)
}
maxSize = max.GetValue()
}
if minSize > maxSize {
return ClusterUpdate{}, fmt.Errorf("ring_hash config min size %v is greater than max %v", minSize, maxSize)
}
lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize}
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
// Process security configuration received from the control plane iff the
// corresponding environment variable is set.
var sc *SecurityConfig
if envconfig.XDSClientSideSecurity {
var err error
if sc, err = securityConfigFromCluster(cluster); err != nil {
return ClusterUpdate{}, err
}
}
ret := ClusterUpdate{
ClusterName: cluster.GetName(),
EnableLRS: cluster.GetLrsServer().GetSelf() != nil,
SecurityCfg: sc,
MaxRequests: circuitBreakersFromCluster(cluster),
LBPolicy: lbPolicy,
}
// Validate and set cluster type from the response.
// todo @laurence this set cluster
if x, ok := cluster.GetClusterDiscoveryType().(*v3clusterpb.Cluster_Type); ok {
x.Type = v3clusterpb.Cluster_EDS
}
switch {
case cluster.GetType() == v3clusterpb.Cluster_EDS:
if cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil {
return ClusterUpdate{}, fmt.Errorf("unexpected edsConfig in response: %+v", cluster)
}
ret.ClusterType = ClusterTypeEDS
ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName()
return ret, nil
case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS:
if !envconfig.XDSAggregateAndDNS {
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
ret.ClusterType = ClusterTypeLogicalDNS
dnsHN, err := dnsHostNameFromCluster(cluster)
if err != nil {
return ClusterUpdate{}, err
}
ret.DNSHostName = dnsHN
return ret, nil
case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate":
if !envconfig.XDSAggregateAndDNS {
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
clusters := &v3aggregateclusterpb.ClusterConfig{}
if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil {
return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
ret.ClusterType = ClusterTypeAggregate
ret.PrioritizedClusterNames = clusters.Clusters
return ret, nil
default:
return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
}
}