in collector/coordinator/replication.go [41:107]
func (coordinator *ReplicationCoordinator) Run() error {
// check all mongodb deployment and fetch the instance info
if err := coordinator.sanitizeMongoDB(); err != nil {
return err
}
LOG.Info("Collector startup. shard_by[%s] gids[%s]", conf.Options.IncrSyncShardKey, conf.Options.IncrSyncOplogGIDS)
// run extra job if has
if err := RunExtraJob(coordinator.RealSourceIncrSync); err != nil {
return err
}
// all configurations has changed to immutable
// opts, _ := json.Marshal(conf.Options)
opts, _ := json.Marshal(conf.GetSafeOptions())
LOG.Info("Collector configuration %s", string(opts))
// sentinel: full and incr
coordinator.fullSentinel = utils.NewSentinel(utils.TypeFull)
coordinator.fullSentinel.Register()
coordinator.incrSentinel = utils.NewSentinel(utils.TypeIncr)
coordinator.incrSentinel.Register()
syncMode, startTsMap, fullBeginTs, err := coordinator.selectSyncMode(conf.Options.SyncMode)
if err != nil {
return fmt.Errorf("select sync mode failed: %v", err)
}
/*
* Generally speaking, it's better to use several bridge timestamp so that
* each shard match one in sharding mode.
* TODO
*/
fullBegin := fullBeginTs
if val, ok := fullBegin.(int64); ok {
fullBegin = utils.ExtractTimestampForLog(val)
}
LOG.Info("start running with mode[%v], fullBeginTs[%v]", syncMode, fullBegin)
switch syncMode {
case utils.VarSyncModeAll:
if conf.Options.FullSyncReaderOplogStoreDisk {
LOG.Info("run parallel document oplog")
if err := coordinator.parallelDocumentOplog(fullBeginTs); err != nil {
return err
}
} else {
LOG.Info("run serialize document oplog")
if err := coordinator.serializeDocumentOplog(fullBeginTs); err != nil {
return err
}
}
case utils.VarSyncModeFull:
if err := coordinator.startDocumentReplication(); err != nil {
return err
}
case utils.VarSyncModeIncr:
if err := coordinator.startOplogReplication(int64(0), int64(0), startTsMap); err != nil {
return err
}
default:
LOG.Critical("unknown sync mode %v", conf.Options.SyncMode)
return errors.New("unknown sync mode " + conf.Options.SyncMode)
}
return nil
}