func validateClusterAndConstructClusterUpdate()

in xds/client/resource/unmarshal_cds.go [96:190]


func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
	var lbPolicy *ClusterLBPolicyRingHash
	// todo @(laurence) this direct set
	cluster.LbPolicy = v3clusterpb.Cluster_ROUND_ROBIN
	switch cluster.GetLbPolicy() {
	case v3clusterpb.Cluster_ROUND_ROBIN:
		lbPolicy = nil // The default is round_robin, and there's no config to set.
	case v3clusterpb.Cluster_RING_HASH:
		if !envconfig.XDSRingHash {
			return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
		}
		rhc := cluster.GetRingHashLbConfig()
		if rhc.GetHashFunction() != v3clusterpb.Cluster_RingHashLbConfig_XX_HASH {
			return ClusterUpdate{}, fmt.Errorf("unsupported ring_hash hash function %v in response: %+v", rhc.GetHashFunction(), cluster)
		}
		// Minimum defaults to 1024 entries, and limited to 8M entries Maximum
		// defaults to 8M entries, and limited to 8M entries
		var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize
		if min := rhc.GetMinimumRingSize(); min != nil {
			if min.GetValue() > ringHashSizeUpperBound {
				return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash mininum ring size %v in response: %+v", min.GetValue(), cluster)
			}
			minSize = min.GetValue()
		}
		if max := rhc.GetMaximumRingSize(); max != nil {
			if max.GetValue() > ringHashSizeUpperBound {
				return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash maxinum ring size %v in response: %+v", max.GetValue(), cluster)
			}
			maxSize = max.GetValue()
		}
		if minSize > maxSize {
			return ClusterUpdate{}, fmt.Errorf("ring_hash config min size %v is greater than max %v", minSize, maxSize)
		}
		lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize}
	default:
		return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
	}

	// Process security configuration received from the control plane iff the
	// corresponding environment variable is set.
	var sc *SecurityConfig
	if envconfig.XDSClientSideSecurity {
		var err error
		if sc, err = securityConfigFromCluster(cluster); err != nil {
			return ClusterUpdate{}, err
		}
	}

	ret := ClusterUpdate{
		ClusterName: cluster.GetName(),
		EnableLRS:   cluster.GetLrsServer().GetSelf() != nil,
		SecurityCfg: sc,
		MaxRequests: circuitBreakersFromCluster(cluster),
		LBPolicy:    lbPolicy,
	}

	// Validate and set cluster type from the response.
	// todo @laurence this set cluster
	if x, ok := cluster.GetClusterDiscoveryType().(*v3clusterpb.Cluster_Type); ok {
		x.Type = v3clusterpb.Cluster_EDS
	}
	switch {
	case cluster.GetType() == v3clusterpb.Cluster_EDS:
		if cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil {
			return ClusterUpdate{}, fmt.Errorf("unexpected edsConfig in response: %+v", cluster)
		}
		ret.ClusterType = ClusterTypeEDS
		ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName()
		return ret, nil
	case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS:
		if !envconfig.XDSAggregateAndDNS {
			return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
		}
		ret.ClusterType = ClusterTypeLogicalDNS
		dnsHN, err := dnsHostNameFromCluster(cluster)
		if err != nil {
			return ClusterUpdate{}, err
		}
		ret.DNSHostName = dnsHN
		return ret, nil
	case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate":
		if !envconfig.XDSAggregateAndDNS {
			return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
		}
		clusters := &v3aggregateclusterpb.ClusterConfig{}
		if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil {
			return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
		}
		ret.ClusterType = ClusterTypeAggregate
		ret.PrioritizedClusterNames = clusters.Clusters
		return ret, nil
	default:
		return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster)
	}
}