func()

in pkg/providers/controller.go [340:531]


func (c *Controller) run(ctx context.Context) {
	log.Infow("controller tries to leading ...",
		zap.String("namespace", c.namespace),
		zap.String("pod", c.name),
	)

	var cancelFunc context.CancelFunc
	ctx, cancelFunc = context.WithCancel(ctx)
	defer cancelFunc()

	// give up leader
	defer c.leaderContextCancelFunc()

	clusterOpts := &apisix.ClusterOptions{
		AdminAPIVersion:  c.cfg.APISIX.AdminAPIVersion,
		Name:             c.cfg.APISIX.DefaultClusterName,
		AdminKey:         c.cfg.APISIX.DefaultClusterAdminKey,
		BaseURL:          c.cfg.APISIX.DefaultClusterBaseURL,
		MetricsCollector: c.MetricsCollector,
		SyncComparison:   c.cfg.ApisixResourceSyncComparison,
	}
	err := c.apisix.AddCluster(ctx, clusterOpts)
	if err != nil && err != apisix.ErrDuplicatedCluster {
		// TODO give up the leader role
		log.Errorf("failed to add default cluster: %s", err)
		return
	}

	if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
		// TODO give up the leader role
		log.Errorf("failed to wait the default cluster to be ready: %s", err)

		// re-create apisix cluster, used in next c.run
		if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
			log.Errorf("failed to update default cluster: %s", err)
			return
		}
		return
	}

	// Creation Phase

	log.Info("creating controller")

	c.informers = c.initSharedInformers()
	common := &providertypes.Common{
		ControllerNamespace: c.namespace,
		ListerInformer:      c.informers,
		Config:              c.cfg,
		APISIX:              c.apisix,
		KubeClient:          c.kubeClient,
		MetricsCollector:    c.MetricsCollector,
		Recorder:            c.recorder,
	}

	c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)
	if err != nil {
		ctx.Done()
		return
	}

	c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
	if err != nil {
		ctx.Done()
		return
	}

	c.translator = translation.NewTranslator(&translation.TranslatorOptions{
		APIVersion:           c.cfg.Kubernetes.APIVersion,
		EndpointLister:       c.informers.EpLister,
		ServiceLister:        c.informers.SvcLister,
		SecretLister:         c.informers.SecretLister,
		PodLister:            c.informers.PodLister,
		ApisixUpstreamLister: c.informers.ApisixUpstreamLister,
		PodProvider:          c.podProvider,
		IngressClassName:     c.cfg.Kubernetes.IngressClass,
	})

	c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
	if err != nil {
		ctx.Done()
		return
	}

	c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
	if err != nil {
		ctx.Done()
		return
	}

	c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
	if err != nil {
		ctx.Done()
		return
	}

	if c.cfg.Kubernetes.EnableGatewayAPI {
		c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
			Cfg:               c.cfg,
			APISIX:            c.apisix,
			APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
			KubeTranslator:    c.translator,
			RestConfig:        nil,
			KubeClient:        c.kubeClient.Client,
			MetricsCollector:  c.MetricsCollector,
			NamespaceProvider: c.namespaceProvider,
			ListerInformer:    common.ListerInformer,
		})
		if err != nil {
			ctx.Done()
			return
		}
	}

	// Init Phase

	log.Info("init namespaces")

	if err = c.namespaceProvider.Init(ctx); err != nil {
		ctx.Done()
		return
	}

	log.Info("wait for resource sync")

	// Wait for resource sync
	if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
		ctx.Done()
		return
	}

	log.Info("init providers")

	// Compare resource
	if err = c.apisixProvider.Init(ctx); err != nil {
		ctx.Done()
		return
	}

	// Run Phase

	log.Info("try to run providers")

	e := utils.ParallelExecutor{}

	e.Add(func() {
		c.checkClusterHealth(ctx, cancelFunc)
	})

	e.Add(func() {
		c.namespaceProvider.Run(ctx)
	})

	e.Add(func() {
		c.kubeProvider.Run(ctx)
	})

	e.Add(func() {
		c.apisixProvider.Run(ctx)
	})

	e.Add(func() {
		c.ingressProvider.Run(ctx)
	})

	if c.cfg.Kubernetes.EnableGatewayAPI {
		e.Add(func() {
			c.gatewayProvider.Run(ctx)
		})
	}

	e.Add(func() {
		c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
	})
	c.MetricsCollector.ResetLeader(true)

	log.Infow("controller now is running as leader",
		zap.String("namespace", c.namespace),
		zap.String("pod", c.name),
	)

	<-ctx.Done()
	e.Wait()

	for _, execErr := range e.Errors() {
		log.Error(execErr.Error())
	}
	if len(e.Errors()) > 0 {
		log.Error("Start failed, abort...")
		cancelFunc()
	}
}