in pkg/config/xds/apiclient/grpc_envoy.go [137:251]
func (g *AggGrpcApiClient) pipeline(output chan *DeltaResources) error {
// all endpoint refer cluster or listener. map[type]map[resource name]endpoint info
edsResources := make(map[resource.Type]map[string]refEndpoint)
req := g.makeDiscoveryRequest(g.resourceNames, g.typeUrl)
var handler discoveryResponseHandler
switch g.typeUrl {
case resource.ListenerType:
handler = func(any2 []*anypb.Any) {
for _, res := range any2 {
l := listenerpb.Listener{}
if err := res.UnmarshalTo(&l); err != nil {
logger.Warnf("can not decode source %s , %v", res.TypeUrl, res)
continue
}
}
}
case resource.ClusterType:
handler = func(any2 []*anypb.Any) {
// only one goroutine handle response, no need to lock for local var.
for _, res := range any2 {
logger.Infof("new resource found %s", res.TypeUrl)
c := clusterpb.Cluster{}
if err := res.UnmarshalTo(&c); err != nil {
logger.Warnf("can not decode source %s , %v", res.TypeUrl, res)
continue
}
//needn't lock edsResources
g.getClusterResourceReference(&c, edsResources)
}
pendingResourceNames := make([]string, 0)
clusterRefEndpoints := edsResources[resource.ClusterType]
// list all pending pendingResourceNames
for name, b := range clusterRefEndpoints {
if b.IsPending {
pendingResourceNames = append(pendingResourceNames, name)
}
}
// do not block, watch new resource at another goroutine
err := g.runEndpointReferences(pendingResourceNames, func(any2 []*anypb.Any) {
// run on another goroutine
extCluster := xdspb.PixiuExtensionClusters{
Clusters: []*xdspb.Cluster{
{
Name: "",
TypeStr: xds.ClusterType,
Type: 0,
EdsClusterConfig: nil,
LbStr: "",
Lb: 0,
HealthChecks: nil,
Endpoints: make([]*xdspb.Endpoint, 0, len(any2)),
},
},
}
for _, one := range any2 {
l := endpointpb.ClusterLoadAssignment{}
if err := one.UnmarshalTo(&l); err != nil {
logger.Warnf("unmarshal error", err)
continue
}
c := clusterRefEndpoints[l.ClusterName].RawProto.(*clusterpb.Cluster)
extCluster.Clusters[0].Name = g.readServiceNameOfCluster(c)
for _, ep := range l.Endpoints {
address := ep.LbEndpoints[0].GetEndpoint().GetAddress().GetSocketAddress()
extCluster.Clusters[0].Endpoints = append(extCluster.Clusters[0].Endpoints, &xdspb.Endpoint{
Id: "",
Name: "",
Address: &xdspb.SocketAddress{
Address: address.Address,
Port: int64(address.GetPortValue()),
},
Metadata: nil,
})
}
}
//make output
output <- &DeltaResources{
NewResources: []*ProtoAny{
{
typeConfig: &envoyconfigcorev3.TypedExtensionConfig{
Name: "cluster", //todo cluster name
TypedConfig: func() *anypb.Any { //make any.Any from extCluster
a, err := anypb.New(&extCluster)
if err != nil {
logger.Warnf("can not make anypb.Any %v", err)
return nil
}
return a
}(),
},
},
},
RemovedResource: nil,
}
})
if err != nil { //todo retry
logger.Errorf("can not run reference request %v", err)
}
}
default:
return errors.Errorf("nedd listenerType of clusterType but get %s", g.typeUrl)
}
if err := g.runDelta(req, handler); err != nil {
return errors.WithMessagef(err, "start run %s failed", req.TypeUrl)
}
return nil
}