controller/tasks/etcd/leader_elector.go (73 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package etcd import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader/campaign" xwatch "github.com/m3db/m3/src/x/watch" ) // ElectionStatus represents the leader election status type ElectionStatus int const ( // Unknown status Unknown ElectionStatus = iota // Leader status Leader // Follower status Follower ) // LeaderElector is the interface for start leader election type LeaderElector interface { // Start starts leader election Start() error // nofity channel for change in status C() <-chan struct{} // Status check current leader election status Status() ElectionStatus // Resign leader status if leader Resign() error // Close election Close() error } type leaderElector struct { leaderService services.LeaderService statusWatchable xwatch.Watchable watch xwatch.Watch } func (l *leaderElector) C() <-chan struct{} { return l.watch.C() } func (l *leaderElector) Start() error { campaignOpts, err := services.NewCampaignOptions() if err != nil { return err } statusCh, err := l.leaderService.Campaign("", campaignOpts) if err != nil { return err } _, l.watch, err = l.statusWatchable.Watch() if err != nil { return err } go func() { for status := range statusCh { _ = l.statusWatchable.Update(status) } }() return nil } func (l *leaderElector) Status() ElectionStatus { state := l.statusWatchable.Get() if state == nil { return Unknown } switch state.(campaign.Status).State { case campaign.Follower: return Follower case campaign.Leader: return Leader default: return Unknown } } func (l *leaderElector) Close() error { return l.leaderService.Close() } func (l *leaderElector) Resign() error { return l.leaderService.Resign("") } // NewLeaderElector creates a leader elector func NewLeaderElector(service services.LeaderService) LeaderElector { return &leaderElector{ leaderService: service, statusWatchable: xwatch.NewWatchable(), } }