func CreateClusterLoadAssignment()

in grpc-xds/control-plane-go/pkg/xds/eds/cluster_load_assignment.go [28:96]


func CreateClusterLoadAssignment(edsServiceName string, servingPort uint32, nodeHash string, localityPriorityMapper LocalityPriorityMapper, endpoints []applications.ApplicationEndpoints) *endpointv3.ClusterLoadAssignment {
	endpointsByZone := map[string][]applications.ApplicationEndpoints{}
	for _, endpoint := range endpoints {
		endpointsByZone[endpoint.Zone] = append(endpointsByZone[endpoint.Zone], endpoint)
	}
	zones := make([]string, len(endpointsByZone))
	i := 0
	for zone := range endpointsByZone {
		zones[i] = zone
		i++
	}
	zonePriorities := localityPriorityMapper.BuildPriorityMap(nodeHash, zones)
	cla := &endpointv3.ClusterLoadAssignment{
		ClusterName: edsServiceName,
		Endpoints:   []*endpointv3.LocalityLbEndpoints{},
		// gRPC doesn't use the overprovisioning factor (effectively treats it as 100%), while the Envoy
		// default is 140%. See
		// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/priority
		// and
		// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/locality_weight
		Policy: &endpointv3.ClusterLoadAssignment_Policy{
			OverprovisioningFactor: wrapperspb.UInt32(100),
		},
	}
	for zone, endpoints := range endpointsByZone {
		localityLbEndpoints := &endpointv3.LocalityLbEndpoints{
			// LbEndpoints is mandatory.
			LbEndpoints: []*endpointv3.LbEndpoint{},
			// Weight is effectively mandatory, read the javadoc carefully :-)
			// Use number of endpoints in locality as weight, so assume all endpoints can handle
			// the same load.
			LoadBalancingWeight: wrapperspb.UInt32(uint32(len(endpoints))),
			// Locality must be unique for a given priority.
			Locality: &corev3.Locality{
				Zone: zone,
			},
			// Priority is optional and defaults to 0. If provided, must start from 0 and have no gaps.
			// Priority 0 is the highest priority.
			Priority: zonePriorities[zone],
		}
		for _, endpoint := range endpoints {
			for _, address := range endpoint.Addresses {
				localityLbEndpoints.LbEndpoints = append(localityLbEndpoints.LbEndpoints,
					&endpointv3.LbEndpoint{
						HealthStatus: endpoint.EndpointStatus.HealthStatus(),
						HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
							// Endpoint is mandatory.
							Endpoint: &endpointv3.Endpoint{
								// Address is mandatory, must be unique within the cluster.
								Address: &corev3.Address{
									Address: &corev3.Address_SocketAddress{
										SocketAddress: &corev3.SocketAddress{
											Protocol: corev3.SocketAddress_TCP,
											Address:  address, // mandatory, IPv4 or IPv6
											PortSpecifier: &corev3.SocketAddress_PortValue{
												PortValue: servingPort, // mandatory
											},
										},
									},
								},
							},
						},
					})
			}
		}
		cla.Endpoints = append(cla.Endpoints, localityLbEndpoints)
	}
	return cla
}