in dp_check/dp_check.go [791:813]
func processAggregateClusterResponse(cdsReply *v3discoverypb.DiscoveryResponse, clusterName string) ([]string, error) {
aggregateCluster, err := getCluster(cdsReply, clusterName)
if err != nil {
return []string{}, fmt.Errorf("failed to get aggregate cluster from CDS response: %v", err)
}
if aggregateCluster.GetClusterType() == nil || aggregateCluster.GetClusterType().GetName() != "envoy.clusters.aggregate" {
return []string{}, fmt.Errorf("failed to receive an aggregate cluster from Traffic Director, set --xds_expect_fallback_configured=false if CFE fallback has been intentionally disabled for this service")
}
clusterConfig := &v3clusterextpb.ClusterConfig{}
if err := proto.Unmarshal(aggregateCluster.GetClusterType().GetTypedConfig().GetValue(), clusterConfig); err != nil {
return []string{}, fmt.Errorf("failed to unmarshal cluster_config resource from CDS response: %v", err)
}
if len(clusterConfig.GetClusters()) == 0 {
return []string{}, fmt.Errorf("no clusters received in aggregate cluster")
}
if len(clusterConfig.GetClusters()) != 2 {
for _, c := range clusterConfig.GetClusters() {
infoLog.Printf("Found cluster in aggregate cluster: |%v|", c)
}
return []string{}, fmt.Errorf("expected to receive 2 clusters in aggregate cluster, but received %v", len(clusterConfig.GetClusters()))
}
return clusterConfig.GetClusters(), nil
}