in grpc-xds/control-plane-go/pkg/xds/eds/cluster_load_assignment.go [28:96]
func CreateClusterLoadAssignment(edsServiceName string, servingPort uint32, nodeHash string, localityPriorityMapper LocalityPriorityMapper, endpoints []applications.ApplicationEndpoints) *endpointv3.ClusterLoadAssignment {
endpointsByZone := map[string][]applications.ApplicationEndpoints{}
for _, endpoint := range endpoints {
endpointsByZone[endpoint.Zone] = append(endpointsByZone[endpoint.Zone], endpoint)
}
zones := make([]string, len(endpointsByZone))
i := 0
for zone := range endpointsByZone {
zones[i] = zone
i++
}
zonePriorities := localityPriorityMapper.BuildPriorityMap(nodeHash, zones)
cla := &endpointv3.ClusterLoadAssignment{
ClusterName: edsServiceName,
Endpoints: []*endpointv3.LocalityLbEndpoints{},
// gRPC doesn't use the overprovisioning factor (effectively treats it as 100%), while the Envoy
// default is 140%. See
// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/priority
// and
// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/locality_weight
Policy: &endpointv3.ClusterLoadAssignment_Policy{
OverprovisioningFactor: wrapperspb.UInt32(100),
},
}
for zone, endpoints := range endpointsByZone {
localityLbEndpoints := &endpointv3.LocalityLbEndpoints{
// LbEndpoints is mandatory.
LbEndpoints: []*endpointv3.LbEndpoint{},
// Weight is effectively mandatory, read the javadoc carefully :-)
// Use number of endpoints in locality as weight, so assume all endpoints can handle
// the same load.
LoadBalancingWeight: wrapperspb.UInt32(uint32(len(endpoints))),
// Locality must be unique for a given priority.
Locality: &corev3.Locality{
Zone: zone,
},
// Priority is optional and defaults to 0. If provided, must start from 0 and have no gaps.
// Priority 0 is the highest priority.
Priority: zonePriorities[zone],
}
for _, endpoint := range endpoints {
for _, address := range endpoint.Addresses {
localityLbEndpoints.LbEndpoints = append(localityLbEndpoints.LbEndpoints,
&endpointv3.LbEndpoint{
HealthStatus: endpoint.EndpointStatus.HealthStatus(),
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
// Endpoint is mandatory.
Endpoint: &endpointv3.Endpoint{
// Address is mandatory, must be unique within the cluster.
Address: &corev3.Address{
Address: &corev3.Address_SocketAddress{
SocketAddress: &corev3.SocketAddress{
Protocol: corev3.SocketAddress_TCP,
Address: address, // mandatory, IPv4 or IPv6
PortSpecifier: &corev3.SocketAddress_PortValue{
PortValue: servingPort, // mandatory
},
},
},
},
},
},
})
}
}
cla.Endpoints = append(cla.Endpoints, localityLbEndpoints)
}
return cla
}