func startup()

in cmd/collector/collector.go [94:156]


func startup() {
	// leader election at the beginning
	selectLeader()

	// initialize http api
	utils.FullSyncInitHttpApi(conf.Options.FullSyncHTTPListenPort)
	utils.IncrSyncInitHttpApi(conf.Options.IncrSyncHTTPListenPort)
	coordinator := &coordinator.ReplicationCoordinator{
		MongoD: make([]*utils.MongoSource, len(conf.Options.MongoUrls)),
	}

	// register conf
	utils.FullSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} {
		return conf.GetSafeOptions()
	})
	utils.IncrSyncHttpApi.RegisterAPI("/conf", nimo.HttpGet, func([]byte) interface{} {
		return conf.GetSafeOptions()
	})

	// init
	for i, src := range conf.Options.MongoUrls {
		coordinator.MongoD[i] = new(utils.MongoSource)
		coordinator.MongoD[i].URL = src
		if len(conf.Options.IncrSyncOplogGIDS) != 0 {
			coordinator.MongoD[i].Gids = conf.Options.IncrSyncOplogGIDS
		}
	}
	if conf.Options.MongoSUrl != "" {
		coordinator.MongoS = &utils.MongoSource{
			URL:         conf.Options.MongoSUrl,
			ReplicaName: "mongos",
		}
		coordinator.RealSourceFullSync = []*utils.MongoSource{coordinator.MongoS}
		coordinator.RealSourceIncrSync = []*utils.MongoSource{coordinator.MongoS}
		if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
			coordinator.RealSourceIncrSync = coordinator.MongoD
		}
	} else {
		coordinator.RealSourceFullSync = coordinator.MongoD
		coordinator.RealSourceIncrSync = coordinator.MongoD
	}

	if conf.Options.MongoCsUrl != "" {
		coordinator.MongoCS = &utils.MongoSource{
			URL: conf.Options.MongoCsUrl,
		}
	}

	// start mongodb replication
	if err := coordinator.Run(); err != nil {
		// initial or connection established failed
		LOG.Critical(fmt.Sprintf("run replication failed: %v", err))
		crash(err.Error(), -6)
	}

	// if the sync mode is "document", mongoshake should exit here.
	if conf.Options.SyncMode == utils.VarSyncModeFull {
		return
	}

	// do not exit
	select {}
}