func NewFakeDiscoveryServer()

in pilot/pkg/xds/fake.go [122:358]


func NewFakeDiscoveryServer(t test.Failer, opts FakeOptions) *FakeDiscoveryServer {
	stop := make(chan struct{})
	t.Cleanup(func() {
		close(stop)
	})

	m := opts.MeshConfig
	if m == nil {
		m = mesh.DefaultMeshConfig()
	}

	// Init with a dummy environment, since we have a circular dependency with the env creation.
	s := NewDiscoveryServer(&model.Environment{PushContext: model.NewPushContext()}, "pilot-123", map[string]string{})
	s.InitGenerators(s.Env, "dubbo-system")
	t.Cleanup(func() {
		s.JwtKeyResolver.Close()
		s.pushQueue.ShutDown()
	})

	serviceHandler := func(svc *model.Service, _ model.Event) {
		pushReq := &model.PushRequest{
			Full: true,
			ConfigsUpdated: map[model.ConfigKey]struct{}{{
				Kind:      gvk.ServiceEntry,
				Name:      string(svc.Hostname),
				Namespace: svc.Attributes.Namespace,
			}: {}},
			Reason: []model.TriggerReason{model.ServiceUpdate},
		}
		s.ConfigUpdate(pushReq)
	}

	if opts.DefaultClusterName == "" {
		opts.DefaultClusterName = "Kubernetes"
	}
	k8sObjects := getKubernetesObjects(t, opts)
	var defaultKubeClient kubelib.Client
	var defaultKubeController *kube.FakeController
	var registries []serviceregistry.Instance
	if opts.NetworksWatcher != nil {
		opts.NetworksWatcher.AddNetworksHandler(func() {
			s.ConfigUpdate(&model.PushRequest{
				Full:   true,
				Reason: []model.TriggerReason{model.NetworksTrigger},
			})
		})
	}
	var xdsUpdater model.XDSUpdater = s
	if opts.EnableFakeXDSUpdater {
		evChan := make(chan FakeXdsEvent, 1000)
		xdsUpdater = &FakeXdsUpdater{
			Events:   evChan,
			Delegate: s,
		}
	}
	creds := kubesecrets.NewMulticluster(opts.DefaultClusterName)
	s.Generators[v3.SecretType] = NewSecretGen(creds, s.Cache, opts.DefaultClusterName, nil)
	s.Generators[v3.ExtensionConfigurationType].(*EcdsGenerator).SetCredController(creds)
	for k8sCluster, objs := range k8sObjects {
		client := kubelib.NewFakeClientWithVersion(opts.KubernetesVersion, objs...)
		if opts.KubeClientModifier != nil {
			opts.KubeClientModifier(client)
		}
		k8s, _ := kube.NewFakeControllerWithOptions(kube.FakeControllerOptions{
			ServiceHandler:  serviceHandler,
			Client:          client,
			ClusterID:       k8sCluster,
			DomainSuffix:    "cluster.local",
			XDSUpdater:      xdsUpdater,
			NetworksWatcher: opts.NetworksWatcher,
			Mode:            opts.KubernetesEndpointMode,
			Stop:            stop,
		})
		// start default client informers after creating ingress/secret controllers
		if defaultKubeClient == nil || k8sCluster == opts.DefaultClusterName {
			defaultKubeClient = client
			defaultKubeController = k8s
		} else {
			client.RunAndWait(stop)
		}
		registries = append(registries, k8s)
		if err := creds.ClusterAdded(&multicluster.Cluster{ID: k8sCluster, Client: client}, nil); err != nil {
			t.Fatal(err)
		}
	}

	if opts.DisableSecretAuthorization {
		kubesecrets.DisableAuthorizationForTest(defaultKubeClient.Kube().(*fake.Clientset))
	}
	ingr := ingress.NewController(defaultKubeClient, mesh.NewFixedWatcher(m), kube.Options{
		DomainSuffix: "cluster.local",
	})
	defaultKubeClient.RunAndWait(stop)

	var gwc *gateway.Controller
	cg := v1alpha3.NewConfigGenTest(t, v1alpha3.TestOptions{
		Configs:             opts.Configs,
		ConfigString:        opts.ConfigString,
		ConfigTemplateInput: opts.ConfigTemplateInput,
		MeshConfig:          opts.MeshConfig,
		NetworksWatcher:     opts.NetworksWatcher,
		ServiceRegistries:   registries,
		PushContextLock:     &s.updateMutex,
		ConfigStoreCaches:   []model.ConfigStoreController{ingr},
		CreateConfigStore: func(c model.ConfigStoreController) model.ConfigStoreController {
			g := gateway.NewController(defaultKubeClient, c, kube.Options{
				DomainSuffix: "cluster.local",
			})
			gwc = g
			return gwc
		},
		SkipRun:   true,
		ClusterID: defaultKubeController.Cluster(),
		Services:  opts.Services,
		Gateways:  opts.Gateways,
	})
	cg.ServiceEntryRegistry.AppendServiceHandler(serviceHandler)
	s.updateMutex.Lock()
	s.Env = cg.Env()
	s.Env.GatewayAPIController = gwc
	if err := s.Env.InitNetworksManager(s); err != nil {
		t.Fatal(err)
	}
	// Disable debounce to reduce test times
	s.debounceOptions.debounceAfter = opts.DebounceTime
	s.MemRegistry = cg.MemRegistry
	s.MemRegistry.EDSUpdater = s
	s.updateMutex.Unlock()

	// Setup config handlers
	// TODO code re-use from server.go
	configHandler := func(_, curr config.Config, event model.Event) {
		pushReq := &model.PushRequest{
			Full: true,
			ConfigsUpdated: map[model.ConfigKey]struct{}{{
				Kind:      curr.GroupVersionKind,
				Name:      curr.Name,
				Namespace: curr.Namespace,
			}: {}},
			Reason: []model.TriggerReason{model.ConfigUpdate},
		}
		s.ConfigUpdate(pushReq)
	}
	schemas := collections.Pilot.All()
	if features.EnableGatewayAPI {
		schemas = collections.PilotGatewayAPI.All()
	}
	for _, schema := range schemas {
		// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
		if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.
			Resource().GroupVersionKind() {
			continue
		}
		if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.
			Resource().GroupVersionKind() {
			continue
		}

		cg.Store().RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler)
	}
	for _, registry := range registries {
		k8s, ok := registry.(*kube.FakeController)
		// this closely matches what we do in serviceregistry/kube/controller/multicluster.go
		if !ok || k8s.Cluster() != cg.ServiceEntryRegistry.Cluster() {
			continue
		}
		cg.ServiceEntryRegistry.AppendWorkloadHandler(k8s.WorkloadInstanceHandler)
		k8s.AppendWorkloadHandler(cg.ServiceEntryRegistry.WorkloadInstanceHandler)
	}
	s.WorkloadEntryController = workloadentry.NewController(cg.Store(), "test", keepalive.Infinity)

	if opts.DiscoveryServerModifier != nil {
		opts.DiscoveryServerModifier(s)
	}

	var listener net.Listener
	if opts.ListenerBuilder != nil {
		var err error
		if listener, err = opts.ListenerBuilder(); err != nil {
			t.Fatal(err)
		}
	} else {
		// Start in memory gRPC listener
		buffer := 1024 * 1024
		listener = bufconn.Listen(buffer)
	}

	grpcServer := grpc.NewServer()
	s.Register(grpcServer)
	go func() {
		if err := grpcServer.Serve(listener); err != nil && !(err == grpc.ErrServerStopped || err.Error() == "closed") {
			t.Fatal(err)
		}
	}()
	t.Cleanup(func() {
		grpcServer.Stop()
		_ = listener.Close()
	})
	// Start the discovery server
	s.Start(stop)
	cg.ServiceEntryRegistry.XdsUpdater = s
	// Now that handlers are added, get everything started
	cg.Run()
	cache.WaitForCacheSync(stop,
		cg.Registry.HasSynced,
		cg.Store().HasSynced)
	cg.ServiceEntryRegistry.ResyncEDS()

	// Send an update. This ensures that even if there are no configs provided, the push context is
	// initialized.
	s.ConfigUpdate(&model.PushRequest{Full: true})

	processStartTime = time.Now()

	// Wait until initial updates are committed
	c := s.InboundUpdates.Load()
	retry.UntilOrFail(t, func() bool {
		return s.CommittedUpdates.Load() >= c
	}, retry.Delay(time.Millisecond))

	// Mark ourselves ready
	s.CachesSynced()

	bufListener, _ := listener.(*bufconn.Listener)
	fake := &FakeDiscoveryServer{
		t:             t,
		Discovery:     s,
		Listener:      listener,
		BufListener:   bufListener,
		ConfigGenTest: cg,
		kubeClient:    defaultKubeClient,
		KubeRegistry:  defaultKubeController,
		XdsUpdater:    xdsUpdater,
	}

	return fake
}