cmd/collector/collector.go (140 lines of code) (raw):

// +build darwin linux windows package main import ( "flag" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "os" "strconv" "syscall" conf "github.com/alibaba/MongoShake/v2/collector/configure" "github.com/alibaba/MongoShake/v2/collector/coordinator" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/quorum" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" ) type Exit struct{ Code int } func main() { var err error defer handleExit() defer LOG.Close() defer utils.Goodbye() // argument options configuration := flag.String("conf", "", "configure file absolute path") verbose := flag.Int("verbose", 0, "where log goes to: 0 - file,1 - file+stdout,2 - stdout") version := flag.Bool("version", false, "show version") flag.Parse() if *configuration == "" || *version == true { fmt.Println(utils.BRANCH) panic(Exit{0}) } var file *os.File if file, err = os.Open(*configuration); err != nil { crash(fmt.Sprintf("Configure file open failed. %v", err), -1) } defer file.Close() // read fcv and do comparison if _, err := conf.CheckFcv(*configuration, utils.FcvConfiguration.FeatureCompatibleVersion); err != nil { crash(err.Error(), -5) } configure := nimo.NewConfigLoader(file) configure.SetDateFormat(utils.GolangSecurityTime) if err := configure.Load(&conf.Options); err != nil { crash(fmt.Sprintf("Configure file %s parse failed. %v", *configuration, err), -2) } // verify collector options and revise if err = SanitizeOptions(); err != nil { crash(fmt.Sprintf("Conf.Options check failed: %s", err.Error()), -4) } if err := utils.InitialLogger(conf.Options.LogDirectory, conf.Options.LogFileName, conf.Options.LogLevel, conf.Options.LogFlush, *verbose); err != nil { crash(fmt.Sprintf("initial log.dir[%v] log.name[%v] failed[%v].", conf.Options.LogDirectory, conf.Options.LogFileName, err), -2) } else { LOG.Info("log init succ. log.dir[%v] log.name[%v] log.level[%v]", conf.Options.LogDirectory, conf.Options.LogFileName, conf.Options.LogLevel) } LOG.Info("MongoDB Version Source[%v] Target[%v]", conf.Options.SourceDBVersion, conf.Options.TargetDBVersion) conf.Options.Version = utils.BRANCH nimo.Profiling(int(conf.Options.SystemProfilePort)) signalProfile, _ := strconv.Atoi(utils.SIGNALPROFILE) signalStack, _ := strconv.Atoi(utils.SIGNALSTACK) if signalProfile > 0 { nimo.RegisterSignalForProfiling(syscall.Signal(signalProfile)) // syscall.SIGUSR2 nimo.RegisterSignalForPrintStack(syscall.Signal(signalStack), func(bytes []byte) { // syscall.SIGUSR1 LOG.Info(string(bytes)) }) } utils.Welcome() utils.Mkdirs(conf.Options.LogDirectory) // get exclusive process lock and write pid if utils.WritePidById(conf.Options.LogDirectory, conf.Options.Id) { startup() } } func startup() { // leader election at the beginning selectLeader() // initialize http api utils.FullSyncInitHttpApi(conf.Options.FullSyncHTTPListenPort) utils.IncrSyncInitHttpApi(conf.Options.IncrSyncHTTPListenPort) coordinator := &coordinator.ReplicationCoordinator{ MongoD: make([]*utils.MongoSource, len(conf.Options.MongoUrls)), } // register conf utils.FullSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} { return conf.GetSafeOptions() }) utils.IncrSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} { return conf.GetSafeOptions() }) // init for i, src := range conf.Options.MongoUrls { coordinator.MongoD[i] = new(utils.MongoSource) coordinator.MongoD[i].URL = src if len(conf.Options.IncrSyncOplogGIDS) != 0 { coordinator.MongoD[i].Gids = conf.Options.IncrSyncOplogGIDS } } if conf.Options.MongoSUrl != "" { coordinator.MongoS = &utils.MongoSource{ URL: conf.Options.MongoSUrl, ReplicaName: "mongos", } coordinator.RealSourceFullSync = []*utils.MongoSource{coordinator.MongoS} coordinator.RealSourceIncrSync = []*utils.MongoSource{coordinator.MongoS} if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog { coordinator.RealSourceIncrSync = coordinator.MongoD } } else { coordinator.RealSourceFullSync = coordinator.MongoD coordinator.RealSourceIncrSync = coordinator.MongoD } if conf.Options.MongoCsUrl != "" { coordinator.MongoCS = &utils.MongoSource{ URL: conf.Options.MongoCsUrl, } } // start mongodb replication if err := coordinator.Run(); err != nil { // initial or connection established failed LOG.Critical(fmt.Sprintf("run replication failed: %v", err)) crash(err.Error(), -6) } // if the sync mode is "document", mongoshake should exit here. if conf.Options.SyncMode == utils.VarSyncModeFull { return } // do not exit select {} } func selectLeader() { // first of all. ensure we are the Master if conf.Options.MasterQuorum && conf.Options.CheckpointStorage == utils.VarCheckpointStorageDatabase { // election become to Master. keep waiting if we are the candidate. election id is must fixed objectId, _ := primitive.ObjectIDFromHex("5204af979955496907000001") quorum.UseElectionObjectId(objectId) go quorum.BecomeMaster(conf.Options.CheckpointStorageUrl, utils.VarCheckpointStorageDbReplicaDefault) // wait until become to a real master <-quorum.MasterPromotionNotifier } else { quorum.AlwaysMaster() } } func crash(msg string, errCode int) { fmt.Println(msg) panic(Exit{errCode}) } func handleExit() { if e := recover(); e != nil { if exit, ok := e.(Exit); ok == true { os.Exit(exit.Code) } panic(e) } }