in pilot/pkg/xds/fake.go [122:358]
func NewFakeDiscoveryServer(t test.Failer, opts FakeOptions) *FakeDiscoveryServer {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
m := opts.MeshConfig
if m == nil {
m = mesh.DefaultMeshConfig()
}
// Init with a dummy environment, since we have a circular dependency with the env creation.
s := NewDiscoveryServer(&model.Environment{PushContext: model.NewPushContext()}, "pilot-123", map[string]string{})
s.InitGenerators(s.Env, "dubbo-system")
t.Cleanup(func() {
s.JwtKeyResolver.Close()
s.pushQueue.ShutDown()
})
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.ConfigUpdate(pushReq)
}
if opts.DefaultClusterName == "" {
opts.DefaultClusterName = "Kubernetes"
}
k8sObjects := getKubernetesObjects(t, opts)
var defaultKubeClient kubelib.Client
var defaultKubeController *kube.FakeController
var registries []serviceregistry.Instance
if opts.NetworksWatcher != nil {
opts.NetworksWatcher.AddNetworksHandler(func() {
s.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: []model.TriggerReason{model.NetworksTrigger},
})
})
}
var xdsUpdater model.XDSUpdater = s
if opts.EnableFakeXDSUpdater {
evChan := make(chan FakeXdsEvent, 1000)
xdsUpdater = &FakeXdsUpdater{
Events: evChan,
Delegate: s,
}
}
creds := kubesecrets.NewMulticluster(opts.DefaultClusterName)
s.Generators[v3.SecretType] = NewSecretGen(creds, s.Cache, opts.DefaultClusterName, nil)
s.Generators[v3.ExtensionConfigurationType].(*EcdsGenerator).SetCredController(creds)
for k8sCluster, objs := range k8sObjects {
client := kubelib.NewFakeClientWithVersion(opts.KubernetesVersion, objs...)
if opts.KubeClientModifier != nil {
opts.KubeClientModifier(client)
}
k8s, _ := kube.NewFakeControllerWithOptions(kube.FakeControllerOptions{
ServiceHandler: serviceHandler,
Client: client,
ClusterID: k8sCluster,
DomainSuffix: "cluster.local",
XDSUpdater: xdsUpdater,
NetworksWatcher: opts.NetworksWatcher,
Mode: opts.KubernetesEndpointMode,
Stop: stop,
})
// start default client informers after creating ingress/secret controllers
if defaultKubeClient == nil || k8sCluster == opts.DefaultClusterName {
defaultKubeClient = client
defaultKubeController = k8s
} else {
client.RunAndWait(stop)
}
registries = append(registries, k8s)
if err := creds.ClusterAdded(&multicluster.Cluster{ID: k8sCluster, Client: client}, nil); err != nil {
t.Fatal(err)
}
}
if opts.DisableSecretAuthorization {
kubesecrets.DisableAuthorizationForTest(defaultKubeClient.Kube().(*fake.Clientset))
}
ingr := ingress.NewController(defaultKubeClient, mesh.NewFixedWatcher(m), kube.Options{
DomainSuffix: "cluster.local",
})
defaultKubeClient.RunAndWait(stop)
var gwc *gateway.Controller
cg := v1alpha3.NewConfigGenTest(t, v1alpha3.TestOptions{
Configs: opts.Configs,
ConfigString: opts.ConfigString,
ConfigTemplateInput: opts.ConfigTemplateInput,
MeshConfig: opts.MeshConfig,
NetworksWatcher: opts.NetworksWatcher,
ServiceRegistries: registries,
PushContextLock: &s.updateMutex,
ConfigStoreCaches: []model.ConfigStoreController{ingr},
CreateConfigStore: func(c model.ConfigStoreController) model.ConfigStoreController {
g := gateway.NewController(defaultKubeClient, c, kube.Options{
DomainSuffix: "cluster.local",
})
gwc = g
return gwc
},
SkipRun: true,
ClusterID: defaultKubeController.Cluster(),
Services: opts.Services,
Gateways: opts.Gateways,
})
cg.ServiceEntryRegistry.AppendServiceHandler(serviceHandler)
s.updateMutex.Lock()
s.Env = cg.Env()
s.Env.GatewayAPIController = gwc
if err := s.Env.InitNetworksManager(s); err != nil {
t.Fatal(err)
}
// Disable debounce to reduce test times
s.debounceOptions.debounceAfter = opts.DebounceTime
s.MemRegistry = cg.MemRegistry
s.MemRegistry.EDSUpdater = s
s.updateMutex.Unlock()
// Setup config handlers
// TODO code re-use from server.go
configHandler := func(_, curr config.Config, event model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: curr.GroupVersionKind,
Name: curr.Name,
Namespace: curr.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ConfigUpdate},
}
s.ConfigUpdate(pushReq)
}
schemas := collections.Pilot.All()
if features.EnableGatewayAPI {
schemas = collections.PilotGatewayAPI.All()
}
for _, schema := range schemas {
// This resource type was handled in external/servicediscovery.go, no need to rehandle here.
if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.
Resource().GroupVersionKind() {
continue
}
if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.
Resource().GroupVersionKind() {
continue
}
cg.Store().RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler)
}
for _, registry := range registries {
k8s, ok := registry.(*kube.FakeController)
// this closely matches what we do in serviceregistry/kube/controller/multicluster.go
if !ok || k8s.Cluster() != cg.ServiceEntryRegistry.Cluster() {
continue
}
cg.ServiceEntryRegistry.AppendWorkloadHandler(k8s.WorkloadInstanceHandler)
k8s.AppendWorkloadHandler(cg.ServiceEntryRegistry.WorkloadInstanceHandler)
}
s.WorkloadEntryController = workloadentry.NewController(cg.Store(), "test", keepalive.Infinity)
if opts.DiscoveryServerModifier != nil {
opts.DiscoveryServerModifier(s)
}
var listener net.Listener
if opts.ListenerBuilder != nil {
var err error
if listener, err = opts.ListenerBuilder(); err != nil {
t.Fatal(err)
}
} else {
// Start in memory gRPC listener
buffer := 1024 * 1024
listener = bufconn.Listen(buffer)
}
grpcServer := grpc.NewServer()
s.Register(grpcServer)
go func() {
if err := grpcServer.Serve(listener); err != nil && !(err == grpc.ErrServerStopped || err.Error() == "closed") {
t.Fatal(err)
}
}()
t.Cleanup(func() {
grpcServer.Stop()
_ = listener.Close()
})
// Start the discovery server
s.Start(stop)
cg.ServiceEntryRegistry.XdsUpdater = s
// Now that handlers are added, get everything started
cg.Run()
cache.WaitForCacheSync(stop,
cg.Registry.HasSynced,
cg.Store().HasSynced)
cg.ServiceEntryRegistry.ResyncEDS()
// Send an update. This ensures that even if there are no configs provided, the push context is
// initialized.
s.ConfigUpdate(&model.PushRequest{Full: true})
processStartTime = time.Now()
// Wait until initial updates are committed
c := s.InboundUpdates.Load()
retry.UntilOrFail(t, func() bool {
return s.CommittedUpdates.Load() >= c
}, retry.Delay(time.Millisecond))
// Mark ourselves ready
s.CachesSynced()
bufListener, _ := listener.(*bufconn.Listener)
fake := &FakeDiscoveryServer{
t: t,
Discovery: s,
Listener: listener,
BufListener: bufListener,
ConfigGenTest: cg,
kubeClient: defaultKubeClient,
KubeRegistry: defaultKubeController,
XdsUpdater: xdsUpdater,
}
return fake
}