in internal/uniqueness/leaderelection.go [108:166]
func (m *LeaderelectionManager) buildConfig(ctx context.Context) (le.LeaderElectionConfig, error) {
podId, err := m.currentPodID()
if err != nil {
return le.LeaderElectionConfig{}, err
}
id := fmt.Sprintf("%s_%s", LeaderLeaseName, podId)
ns, err := kubernetes.InClusterNamespace()
if err != nil {
ns = v1.NamespaceDefault
}
lease := metav1.ObjectMeta{
Name: LeaderLeaseName,
Namespace: ns,
}
return le.LeaderElectionConfig{
Lock: &rl.LeaseLock{
LeaseMeta: lease,
Client: m.kubeClient.CoordinationV1(),
LockConfig: rl.ResourceLockConfig{
Identity: id,
},
},
ReleaseOnCancel: true,
LeaseDuration: LeaseDuration,
RenewDeadline: RenewDeadline,
RetryPeriod: RetryPeriod,
Callbacks: le.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
m.log.Infof("Leader election lock GAINED, id: %v", id)
},
OnStoppedLeading: func() {
// OnStoppedLeading gets called even if cloudbeat wasn't the leader, for example, if the context is canceled due to reconfiguration from fleet.
// We re-run the manager to keep following leader status except for context cancellation events.
m.log.Infof("Leader election lock LOST, id: %v", id)
defer m.wg.Done()
select {
case <-ctx.Done():
m.log.Info("Leader election is canceled")
return
default:
go m.leader.Run(ctx)
m.wg.Add(1)
m.log.Infof("Re-running leader elector")
}
},
OnNewLeader: func(identity string) {
if identity == id {
m.log.Infof("Leader election lock has been acquired by this pod, id: %v", identity)
} else {
m.log.Infof("Leader election lock has been acquired by another pod, id: %v", identity)
}
},
},
}, nil
}