in cmd/collector/collector.go [94:156]
func startup() {
// leader election at the beginning
selectLeader()
// initialize http api
utils.FullSyncInitHttpApi(conf.Options.FullSyncHTTPListenPort)
utils.IncrSyncInitHttpApi(conf.Options.IncrSyncHTTPListenPort)
coordinator := &coordinator.ReplicationCoordinator{
MongoD: make([]*utils.MongoSource, len(conf.Options.MongoUrls)),
}
// register conf
utils.FullSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} {
return conf.GetSafeOptions()
})
utils.IncrSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} {
return conf.GetSafeOptions()
})
// init
for i, src := range conf.Options.MongoUrls {
coordinator.MongoD[i] = new(utils.MongoSource)
coordinator.MongoD[i].URL = src
if len(conf.Options.IncrSyncOplogGIDS) != 0 {
coordinator.MongoD[i].Gids = conf.Options.IncrSyncOplogGIDS
}
}
if conf.Options.MongoSUrl != "" {
coordinator.MongoS = &utils.MongoSource{
URL: conf.Options.MongoSUrl,
ReplicaName: "mongos",
}
coordinator.RealSourceFullSync = []*utils.MongoSource{coordinator.MongoS}
coordinator.RealSourceIncrSync = []*utils.MongoSource{coordinator.MongoS}
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
coordinator.RealSourceIncrSync = coordinator.MongoD
}
} else {
coordinator.RealSourceFullSync = coordinator.MongoD
coordinator.RealSourceIncrSync = coordinator.MongoD
}
if conf.Options.MongoCsUrl != "" {
coordinator.MongoCS = &utils.MongoSource{
URL: conf.Options.MongoCsUrl,
}
}
// start mongodb replication
if err := coordinator.Run(); err != nil {
// initial or connection established failed
LOG.Critical(fmt.Sprintf("run replication failed: %v", err))
crash(err.Error(), -6)
}
// if the sync mode is "document", mongoshake should exit here.
if conf.Options.SyncMode == utils.VarSyncModeFull {
return
}
// do not exit
select {}
}