in pkg/providers/controller.go [139:218]
func (c *Controller) Run(stop chan struct{}) error {
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
go func() {
<-stop
rootCancel()
}()
c.MetricsCollector.ResetLeader(false)
go func() {
log.Info("start api server")
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
}()
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
Client: c.kubeClient.Client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
},
}
cfg := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: c.run,
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
}
},
OnStoppedLeading: func() {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
},
},
ReleaseOnCancel: true,
Name: "ingress-apisix",
}
elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
return err
}
election:
curCtx, cancel := context.WithCancel(rootCtx)
c.leaderContextCancelFunc = cancel
elector.Run(curCtx)
select {
case <-rootCtx.Done():
return nil
default:
goto election
}
}