func()

in collector/coordinator/replication.go [41:107]


func (coordinator *ReplicationCoordinator) Run() error {
	// check all mongodb deployment and fetch the instance info
	if err := coordinator.sanitizeMongoDB(); err != nil {
		return err
	}
	LOG.Info("Collector startup. shard_by[%s] gids[%s]", conf.Options.IncrSyncShardKey, conf.Options.IncrSyncOplogGIDS)

	// run extra job if has
	if err := RunExtraJob(coordinator.RealSourceIncrSync); err != nil {
		return err
	}

	// all configurations has changed to immutable
	// opts, _ := json.Marshal(conf.Options)
	opts, _ := json.Marshal(conf.GetSafeOptions())
	LOG.Info("Collector configuration %s", string(opts))

	// sentinel: full and incr
	coordinator.fullSentinel = utils.NewSentinel(utils.TypeFull)
	coordinator.fullSentinel.Register()
	coordinator.incrSentinel = utils.NewSentinel(utils.TypeIncr)
	coordinator.incrSentinel.Register()

	syncMode, startTsMap, fullBeginTs, err := coordinator.selectSyncMode(conf.Options.SyncMode)
	if err != nil {
		return fmt.Errorf("select sync mode failed: %v", err)
	}

	/*
	 * Generally speaking, it's better to use several bridge timestamp so that
	 * each shard match one in sharding mode.
	 * TODO
	 */
	fullBegin := fullBeginTs
	if val, ok := fullBegin.(int64); ok {
		fullBegin = utils.ExtractTimestampForLog(val)
	}
	LOG.Info("start running with mode[%v], fullBeginTs[%v]", syncMode, fullBegin)

	switch syncMode {
	case utils.VarSyncModeAll:
		if conf.Options.FullSyncReaderOplogStoreDisk {
			LOG.Info("run parallel document oplog")
			if err := coordinator.parallelDocumentOplog(fullBeginTs); err != nil {
				return err
			}
		} else {
			LOG.Info("run serialize document oplog")
			if err := coordinator.serializeDocumentOplog(fullBeginTs); err != nil {
				return err
			}
		}
	case utils.VarSyncModeFull:
		if err := coordinator.startDocumentReplication(); err != nil {
			return err
		}
	case utils.VarSyncModeIncr:
		if err := coordinator.startOplogReplication(int64(0), int64(0), startTsMap); err != nil {
			return err
		}
	default:
		LOG.Critical("unknown sync mode %v", conf.Options.SyncMode)
		return errors.New("unknown sync mode " + conf.Options.SyncMode)
	}

	return nil
}