func()

in pkg/config/xds/apiclient/grpc_envoy.go [137:251]


func (g *AggGrpcApiClient) pipeline(output chan *DeltaResources) error {
	// all endpoint refer cluster or listener. map[type]map[resource name]endpoint info
	edsResources := make(map[resource.Type]map[string]refEndpoint)
	req := g.makeDiscoveryRequest(g.resourceNames, g.typeUrl)
	var handler discoveryResponseHandler
	switch g.typeUrl {
	case resource.ListenerType:
		handler = func(any2 []*anypb.Any) {
			for _, res := range any2 {
				l := listenerpb.Listener{}
				if err := res.UnmarshalTo(&l); err != nil {
					logger.Warnf("can not decode source %s , %v", res.TypeUrl, res)
					continue
				}
			}
		}
	case resource.ClusterType:
		handler = func(any2 []*anypb.Any) {
			// only one goroutine handle response, no need to lock for local var.
			for _, res := range any2 {
				logger.Infof("new resource found %s", res.TypeUrl)
				c := clusterpb.Cluster{}
				if err := res.UnmarshalTo(&c); err != nil {
					logger.Warnf("can not decode source %s , %v", res.TypeUrl, res)
					continue
				}
				//needn't lock edsResources
				g.getClusterResourceReference(&c, edsResources)
			}

			pendingResourceNames := make([]string, 0)
			clusterRefEndpoints := edsResources[resource.ClusterType]
			// list all pending pendingResourceNames
			for name, b := range clusterRefEndpoints {
				if b.IsPending {
					pendingResourceNames = append(pendingResourceNames, name)
				}
			}

			// do not block, watch new resource at another goroutine
			err := g.runEndpointReferences(pendingResourceNames, func(any2 []*anypb.Any) {
				// run on another goroutine
				extCluster := xdspb.PixiuExtensionClusters{
					Clusters: []*xdspb.Cluster{
						{
							Name:             "",
							TypeStr:          xds.ClusterType,
							Type:             0,
							EdsClusterConfig: nil,
							LbStr:            "",
							Lb:               0,
							HealthChecks:     nil,
							Endpoints:        make([]*xdspb.Endpoint, 0, len(any2)),
						},
					},
				}

				for _, one := range any2 {
					l := endpointpb.ClusterLoadAssignment{}
					if err := one.UnmarshalTo(&l); err != nil {
						logger.Warnf("unmarshal error", err)
						continue
					}
					c := clusterRefEndpoints[l.ClusterName].RawProto.(*clusterpb.Cluster)
					extCluster.Clusters[0].Name = g.readServiceNameOfCluster(c)

					for _, ep := range l.Endpoints {
						address := ep.LbEndpoints[0].GetEndpoint().GetAddress().GetSocketAddress()
						extCluster.Clusters[0].Endpoints = append(extCluster.Clusters[0].Endpoints, &xdspb.Endpoint{
							Id:   "",
							Name: "",
							Address: &xdspb.SocketAddress{
								Address: address.Address,
								Port:    int64(address.GetPortValue()),
							},
							Metadata: nil,
						})
					}
				}

				//make output
				output <- &DeltaResources{
					NewResources: []*ProtoAny{
						{
							typeConfig: &envoyconfigcorev3.TypedExtensionConfig{
								Name: "cluster", //todo cluster name
								TypedConfig: func() *anypb.Any { //make any.Any from extCluster
									a, err := anypb.New(&extCluster)
									if err != nil {
										logger.Warnf("can not make anypb.Any %v", err)
										return nil
									}
									return a
								}(),
							},
						},
					},
					RemovedResource: nil,
				}
			})
			if err != nil { //todo retry
				logger.Errorf("can not run reference request %v", err)
			}
		}

	default:
		return errors.Errorf("nedd listenerType of clusterType but get %s", g.typeUrl)
	}

	if err := g.runDelta(req, handler); err != nil {
		return errors.WithMessagef(err, "start run %s failed", req.TypeUrl)
	}

	return nil
}