in grpc-xds/control-plane-go/pkg/xds/snapshot_builder.go [64:138]
func (b *SnapshotBuilder) AddGRPCApplications(apps []applications.Application) (*SnapshotBuilder, error) {
for _, app := range apps {
if b.listeners[app.Name] == nil {
apiListener, err := lds.CreateAPIListener(app.Name, app.Name)
if err != nil {
return nil, fmt.Errorf("could not create LDS API listener for gRPC application %+v: %w", app, err)
}
b.listeners[apiListener.Name] = apiListener
if b.features.EnableFederation {
xdstpListenerName := xdstpListener(b.authority, app.Name)
xdstpRouteConfigurationName := xdstpRouteConfiguration(b.authority, app.Name)
xdstpListener, err := lds.CreateAPIListener(xdstpListenerName, xdstpRouteConfigurationName)
if err != nil {
return nil, fmt.Errorf("could not create federation LDS API listener for authority=%s and gRPC application %+v: %w", b.authority, app, err)
}
b.listeners[xdstpListener.Name] = xdstpListener
}
}
if b.routeConfigurations[app.Name] == nil {
routeConfiguration := rds.CreateRouteConfigurationForAPIListener(app.Name, app.Name, app.PathPrefix, app.Name)
b.routeConfigurations[routeConfiguration.Name] = routeConfiguration
if b.features.EnableFederation {
xdstpRouteConfigurationName := xdstpRouteConfiguration(b.authority, app.Name)
xdstpClusterName := xdstpCluster(b.authority, app.Name)
xdstpRouteConfiguration := rds.CreateRouteConfigurationForAPIListener(xdstpRouteConfigurationName, app.Name, app.PathPrefix, xdstpClusterName)
b.routeConfigurations[xdstpRouteConfiguration.Name] = xdstpRouteConfiguration
}
}
if b.clusters[app.Name] == nil {
cluster, err := cds.CreateCluster(
app.Name,
app.Name,
app.Namespace,
app.ServiceAccountName,
app.HealthCheckPort,
app.HealthCheckProtocol,
"",
b.features.EnableDataPlaneTLS,
b.features.RequireDataPlaneClientCerts)
if err != nil {
return nil, fmt.Errorf("could not create CDS Cluster for gRPC application %+v: %w", app, err)
}
b.clusters[cluster.Name] = cluster
if b.features.EnableFederation {
xdstpClusterName := xdstpCluster(b.authority, app.Name)
xdstpEDSServiceName := xdstpEdsService(b.authority, app.Name)
xdstpCluster, err := cds.CreateCluster(
xdstpClusterName,
xdstpEDSServiceName,
app.Namespace,
app.ServiceAccountName,
app.HealthCheckPort,
app.HealthCheckProtocol,
"",
b.features.EnableDataPlaneTLS,
b.features.RequireDataPlaneClientCerts)
if err != nil {
return nil, fmt.Errorf("could not create federation CDS Cluster for authority=%s and gRPC application %+v: %w", b.authority, app, err)
}
b.clusters[xdstpCluster.Name] = xdstpCluster
}
}
// Merge endpoints from multiple informers for the same app:
endpointsByClusterKey := fmt.Sprintf("%s-%d", app.Name, app.ServingPort)
b.endpointsByCluster[endpointsByClusterKey] = append(b.endpointsByCluster[endpointsByClusterKey], app.Endpoints...)
clusterLoadAssignment := eds.CreateClusterLoadAssignment(app.Name, app.ServingPort, b.nodeHash, b.localityPriorityMapper, b.endpointsByCluster[endpointsByClusterKey])
b.clusterLoadAssignments[clusterLoadAssignment.ClusterName] = clusterLoadAssignment
if b.features.EnableFederation {
xdstpEDSServiceName := xdstpEdsService(b.authority, app.Name)
xdstpClusterLoadAssignment := eds.CreateClusterLoadAssignment(xdstpEDSServiceName, app.ServingPort, b.nodeHash, b.localityPriorityMapper, b.endpointsByCluster[endpointsByClusterKey])
b.clusterLoadAssignments[xdstpClusterLoadAssignment.ClusterName] = xdstpClusterLoadAssignment
}
}
return b, nil
}