in pkg/providers/controller.go [340:531]
func (c *Controller) run(ctx context.Context) {
log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
// give up leader
defer c.leaderContextCancelFunc()
clusterOpts := &apisix.ClusterOptions{
AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion,
Name: c.cfg.APISIX.DefaultClusterName,
AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
MetricsCollector: c.MetricsCollector,
SyncComparison: c.cfg.ApisixResourceSyncComparison,
}
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
}
if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)
// re-create apisix cluster, used in next c.run
if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
return
}
// Creation Phase
log.Info("creating controller")
c.informers = c.initSharedInformers()
common := &providertypes.Common{
ControllerNamespace: c.namespace,
ListerInformer: c.informers,
Config: c.cfg,
APISIX: c.apisix,
KubeClient: c.kubeClient,
MetricsCollector: c.MetricsCollector,
Recorder: c.recorder,
}
c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)
if err != nil {
ctx.Done()
return
}
c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
if err != nil {
ctx.Done()
return
}
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
APIVersion: c.cfg.Kubernetes.APIVersion,
EndpointLister: c.informers.EpLister,
ServiceLister: c.informers.SvcLister,
SecretLister: c.informers.SecretLister,
PodLister: c.informers.PodLister,
ApisixUpstreamLister: c.informers.ApisixUpstreamLister,
PodProvider: c.podProvider,
IngressClassName: c.cfg.Kubernetes.IngressClass,
})
c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
if err != nil {
ctx.Done()
return
}
c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
if err != nil {
ctx.Done()
return
}
c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
if err != nil {
ctx.Done()
return
}
if c.cfg.Kubernetes.EnableGatewayAPI {
c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
Cfg: c.cfg,
APISIX: c.apisix,
APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
KubeTranslator: c.translator,
RestConfig: nil,
KubeClient: c.kubeClient.Client,
MetricsCollector: c.MetricsCollector,
NamespaceProvider: c.namespaceProvider,
ListerInformer: common.ListerInformer,
})
if err != nil {
ctx.Done()
return
}
}
// Init Phase
log.Info("init namespaces")
if err = c.namespaceProvider.Init(ctx); err != nil {
ctx.Done()
return
}
log.Info("wait for resource sync")
// Wait for resource sync
if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
ctx.Done()
return
}
log.Info("init providers")
// Compare resource
if err = c.apisixProvider.Init(ctx); err != nil {
ctx.Done()
return
}
// Run Phase
log.Info("try to run providers")
e := utils.ParallelExecutor{}
e.Add(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
e.Add(func() {
c.namespaceProvider.Run(ctx)
})
e.Add(func() {
c.kubeProvider.Run(ctx)
})
e.Add(func() {
c.apisixProvider.Run(ctx)
})
e.Add(func() {
c.ingressProvider.Run(ctx)
})
if c.cfg.Kubernetes.EnableGatewayAPI {
e.Add(func() {
c.gatewayProvider.Run(ctx)
})
}
e.Add(func() {
c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
})
c.MetricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
<-ctx.Done()
e.Wait()
for _, execErr := range e.Errors() {
log.Error(execErr.Error())
}
if len(e.Errors()) > 0 {
log.Error("Start failed, abort...")
cancelFunc()
}
}