func()

in internal/uniqueness/leaderelection.go [108:166]


func (m *LeaderelectionManager) buildConfig(ctx context.Context) (le.LeaderElectionConfig, error) {
	podId, err := m.currentPodID()
	if err != nil {
		return le.LeaderElectionConfig{}, err
	}

	id := fmt.Sprintf("%s_%s", LeaderLeaseName, podId)
	ns, err := kubernetes.InClusterNamespace()
	if err != nil {
		ns = v1.NamespaceDefault
	}

	lease := metav1.ObjectMeta{
		Name:      LeaderLeaseName,
		Namespace: ns,
	}

	return le.LeaderElectionConfig{
		Lock: &rl.LeaseLock{
			LeaseMeta: lease,
			Client:    m.kubeClient.CoordinationV1(),
			LockConfig: rl.ResourceLockConfig{
				Identity: id,
			},
		},
		ReleaseOnCancel: true,
		LeaseDuration:   LeaseDuration,
		RenewDeadline:   RenewDeadline,
		RetryPeriod:     RetryPeriod,
		Callbacks: le.LeaderCallbacks{
			OnStartedLeading: func(_ context.Context) {
				m.log.Infof("Leader election lock GAINED, id: %v", id)
			},
			OnStoppedLeading: func() {
				// OnStoppedLeading gets called even if cloudbeat wasn't the leader, for example, if the context is canceled due to reconfiguration from fleet.
				// We re-run the manager to keep following leader status except for context cancellation events.
				m.log.Infof("Leader election lock LOST, id: %v", id)
				defer m.wg.Done()

				select {
				case <-ctx.Done():
					m.log.Info("Leader election is canceled")
					return
				default:
					go m.leader.Run(ctx)
					m.wg.Add(1)
					m.log.Infof("Re-running leader elector")
				}
			},
			OnNewLeader: func(identity string) {
				if identity == id {
					m.log.Infof("Leader election lock has been acquired by this pod, id: %v", identity)
				} else {
					m.log.Infof("Leader election lock has been acquired by another pod, id: %v", identity)
				}
			},
		},
	}, nil
}