quorum/quorum.go (175 lines of code) (raw):

package quorum import ( "context" "fmt" conf "github.com/alibaba/MongoShake/v2/collector/configure" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "math/rand" "net" "os" "time" utils "github.com/alibaba/MongoShake/v2/common" LOG "github.com/vinllen/log4go" ) const ( HeartBeatPeriodInSeconds = 5 HeartBeatTimeoutInSeconds = HeartBeatPeriodInSeconds * 3 HeartBeatPeriod = time.Second * HeartBeatPeriodInSeconds ) const ( PromoteMaster = 0 DescendMaster = 1 ) // become or lost master status notifier var MasterPromotionNotifier chan bool var electionObjectId primitive.ObjectID var master bool func init() { MasterPromotionNotifier = make(chan bool, 1) } func masterChanged(status int) { if status == PromoteMaster { if len(MasterPromotionNotifier) == 0 { MasterPromotionNotifier <- true LOG.Info("become the master and notify waiter") } master = true } else { master = false } } func IsMaster() bool { return master } func AlwaysMaster() { master = true } func UseElectionObjectId(electionId primitive.ObjectID) { electionObjectId = electionId } type ElectionEntry struct { ObjectId primitive.ObjectID `bson:"_id"` PID int `bson:"pid"` Host string `bson:"host"` Heartbeat int64 `bson:"heartbeat"` } const ( QUORUM_COLLECTION string = "election" ) const ( STATUS_LOOKASIDE int = iota STATUS_COMPETE_MASTER STATUS_MASTER STATUS_FOLLOW STATUS_SESSION_CLOSE ) func BecomeMaster(uri string, db string) error { retry := 30 for retry != 0 { if conn, err := makeSession(uri); err == nil { masterCollection := conn.Client.Database(db).Collection(QUORUM_COLLECTION) status := STATUS_LOOKASIDE entry := new(ElectionEntry) // keep quorum Keep: for { if status == STATUS_LOOKASIDE || status == STATUS_MASTER { wait(HeartBeatPeriod) } switch status { case STATUS_LOOKASIDE: // take from database firstly err = masterCollection.FindOne(context.Background(), bson.M{"_id": electionObjectId}).Decode(entry) switch err { case nil: if entry.Host == getNetAddr() && entry.PID == os.Getpid() { // master is me. just update heartbeat status = STATUS_MASTER } else { status = STATUS_FOLLOW } case mongo.ErrNoDocuments: LOG.Debug("No master node found. we elect myself") status = STATUS_COMPETE_MASTER default: LOG.Warn("Fetch master election info %s failed. %v", electionObjectId, err) status = STATUS_SESSION_CLOSE } case STATUS_MASTER: if _, err := masterCollection.UpdateOne(context.Background(), bson.D{{"_id", electionObjectId}, {"pid", os.Getpid()}}, bson.M{"$set": promotion()}); err == nil { masterChanged(PromoteMaster) } else { LOG.Warn("Update master election info failed. %v", err) status = STATUS_LOOKASIDE } case STATUS_COMPETE_MASTER: // there is no one master competeMaster(masterCollection) status = STATUS_LOOKASIDE case STATUS_FOLLOW: if master { masterChanged(DescendMaster) } // there has been already another master. check its heartbeat heartbeat := entry.Heartbeat if time.Now().Unix()-heartbeat >= int64(HeartBeatTimeoutInSeconds) { // I wanna be the master. DON'T care about the success of update masterCollection.UpdateOne(context.Background(), bson.D{{"_id", electionObjectId}},bson.M{"$set": promotion()}) LOG.Info("Expired master found. compete to become master") // wait random time. just disrupt others compete wait(time.Millisecond * time.Duration(rand.Uint32()%2500+1)) } else { // follow current master LOG.Info("Follow current master %v", entry) } status = STATUS_LOOKASIDE case STATUS_SESSION_CLOSE: conn.Close() break Keep } } } retry-- } return fmt.Errorf("unreachable master election mongo %s", uri) } func competeMaster(coll *mongo.Collection) bool { if _, err := coll.InsertOne(context.Background(), promotion()); err == nil { LOG.Info("This node become master with election info %v", master) return true } else if utils.DuplicateKey(err) { LOG.Warn("Another node is compete to master. we hold on a second") } return false } func makeSession(uri string) (*utils.MongoCommunityConn, error) { if conn, err := utils.NewMongoCommunityConn(uri, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); err == nil { return conn, nil } else { return nil, err } } func promotion() bson.D { return bson.D{ {"_id", electionObjectId}, {"pid", os.Getpid()}, {"host", getNetAddr()}, {"heartbeat", time.Now().Unix()}, } } func wait(duration time.Duration) { time.Sleep(duration) } func getNetAddr() string { addressArray, err := net.InterfaceAddrs() if err != nil { LOG.Critical("Get network interface address failed. %v", err) return "error" } for _, ip := range addressArray { if ipnet, ok := ip.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { return ipnet.IP.String() } } } return "error" }