collector/coordinator/replication.go (196 lines of code) (raw):

package coordinator import ( "encoding/json" "errors" "fmt" "math" "sync" "github.com/alibaba/MongoShake/v2/collector" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" ) var ( testSelectSyncMode = false ) // ReplicationCoordinator global coordinator instance. consist of // one syncerGroup and a number of workers type ReplicationCoordinator struct { MongoD []*utils.MongoSource // the source mongod MongoS *utils.MongoSource // the source mongos MongoCS *utils.MongoSource // the source mongocs RealSourceFullSync []*utils.MongoSource // point to MongoD if source is mongod, otherwise MongoS RealSourceIncrSync []*utils.MongoSource // point to MongoD if source is mongod, otherwise MongoS // Sentinel listener fullSentinel *utils.Sentinel incrSentinel *utils.Sentinel // syncerGroup and workerGroup number is 1:N in ReplicaSet. // 1:1 while replicated in shard cluster syncerGroup []*collector.OplogSyncer } func (coordinator *ReplicationCoordinator) Run() error { // check all mongodb deployment and fetch the instance info if err := coordinator.sanitizeMongoDB(); err != nil { return err } LOG.Info("Collector startup. shard_by[%s] gids[%s]", conf.Options.IncrSyncShardKey, conf.Options.IncrSyncOplogGIDS) // run extra job if has if err := RunExtraJob(coordinator.RealSourceIncrSync); err != nil { return err } // all configurations has changed to immutable // opts, _ := json.Marshal(conf.Options) opts, _ := json.Marshal(conf.GetSafeOptions()) LOG.Info("Collector configuration %s", string(opts)) // sentinel: full and incr coordinator.fullSentinel = utils.NewSentinel(utils.TypeFull) coordinator.fullSentinel.Register() coordinator.incrSentinel = utils.NewSentinel(utils.TypeIncr) coordinator.incrSentinel.Register() syncMode, startTsMap, fullBeginTs, err := coordinator.selectSyncMode(conf.Options.SyncMode) if err != nil { return fmt.Errorf("select sync mode failed: %v", err) } /* * Generally speaking, it's better to use several bridge timestamp so that * each shard match one in sharding mode. * TODO */ fullBegin := fullBeginTs if val, ok := fullBegin.(int64); ok { fullBegin = utils.ExtractTimestampForLog(val) } LOG.Info("start running with mode[%v], fullBeginTs[%v]", syncMode, fullBegin) switch syncMode { case utils.VarSyncModeAll: if conf.Options.FullSyncReaderOplogStoreDisk { LOG.Info("run parallel document oplog") if err := coordinator.parallelDocumentOplog(fullBeginTs); err != nil { return err } } else { LOG.Info("run serialize document oplog") if err := coordinator.serializeDocumentOplog(fullBeginTs); err != nil { return err } } case utils.VarSyncModeFull: if err := coordinator.startDocumentReplication(); err != nil { return err } case utils.VarSyncModeIncr: if err := coordinator.startOplogReplication(int64(0), int64(0), startTsMap); err != nil { return err } default: LOG.Critical("unknown sync mode %v", conf.Options.SyncMode) return errors.New("unknown sync mode " + conf.Options.SyncMode) } return nil } func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error { var conn *utils.MongoCommunityConn var err error var hasUniqIndex = false rs := map[string]int{} // try to connect CheckpointStorage checkpointStorageUrl := conf.Options.CheckpointStorageUrl if conn, err = utils.NewMongoCommunityConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.CheckpointStorageUrlMongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil { LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' "+ "if 'context.storage.url' is empty", checkpointStorageUrl, err) return err } conn.Close() for i, src := range coordinator.MongoD { if conn, err = utils.NewMongoCommunityConn(src.URL, conf.Options.MongoConnectMode, true, utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil { LOG.Critical("Connect mongo server error. %v, url : %s. "+ "See https://github.com/alibaba/MongoShake/wiki/FAQ"+ "#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL) return err } // a conventional ReplicaSet should have local.oplog.rs collection if conf.Options.SyncMode != utils.VarSyncModeFull && // conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog && conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog && !conn.HasOplogNs(utils.GetListCollectionQueryCondition(conn)) { LOG.Critical("There has no oplog collection in mongo db server") conn.Close() return errors.New("no oplog ns in mongo. " + "See https://github.com/alibaba/MongoShake/wiki/FAQ" + "#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error") } // check if there has dup server every replica set in RS or Shard rsName := conn.AcquireReplicaSetName() // rsName will be set to default if empty if rsName == "" { rsName = fmt.Sprintf("default-%d", i) LOG.Warn("Source mongodb have empty replica set name, url[%s], change to default[%s]", utils.BlockMongoUrlPassword(src.URL, "***"), rsName) } if _, exist := rs[rsName]; exist { LOG.Critical("There has duplicate replica set name : %s", rsName) conn.Close() return errors.New("duplicated replica set source") } rs[rsName] = 1 src.ReplicaName = rsName // look around if there has unique index if !hasUniqIndex && conf.Options.IncrSyncShardKey == oplog.ShardAutomatic { hasUniqIndex = conn.HasUniqueIndex(utils.GetListCollectionQueryCondition(conn)) } // doesn't reuse current connection conn.Close() } // we choose sharding by collection if there are unique index // existing in collections if conf.Options.IncrSyncShardKey == oplog.ShardAutomatic { if hasUniqIndex { conf.Options.IncrSyncShardKey = oplog.ShardByNamespace } else { conf.Options.IncrSyncShardKey = oplog.ShardByID } } // TODO, check unique exist on given namespace if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 { } return nil } // run incr-sync after full-sync func (coordinator *ReplicationCoordinator) serializeDocumentOplog(fullBeginTs interface{}) error { var err error if err = coordinator.startDocumentReplication(); err != nil { return fmt.Errorf("start document replication failed: %v", err) } // get current newest timestamp var fullFinishTs, oldestTs int64 if conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless && len(coordinator.MongoD) > 0 { _, fullFinishTs, _, oldestTs, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile) if err != nil { return fmt.Errorf("get full sync finish timestamp failed[%v]", err) } } else { fullFinishTs = int64(math.MaxInt64) } LOG.Info("------------------------full sync done!------------------------") fullBegin := fullBeginTs if val, ok := fullBeginTs.(int64); ok { fullBegin = utils.ExtractTimestampForLog(val) // the oldest oplog is lost if oldestTs >= val { err = fmt.Errorf("incr sync ts[%v] is less than current oldest ts[%v], this error means user's "+ "oplog collection size is too small or full sync continues too long", fullBegin, utils.ExtractTimestampForLog(oldestTs)) LOG.Error(err) return err } LOG.Info("oldestTs[%v] fullBeginTs[%v] fullFinishTs[%v]", utils.ExtractTimestampForLog(oldestTs), fullBegin, utils.ExtractTimestampForLog(fullFinishTs)) } LOG.Info("finish full sync, start incr sync with timestamp: fullBeginTs[%v], fullFinishTs[%v]", fullBegin, utils.ExtractTimestampForLog(fullFinishTs)) return coordinator.startOplogReplication(fullBeginTs, fullFinishTs, nil) } // TODO, set initSyncFinishTs into worker // run full-sync and incr-sync in parallel func (coordinator *ReplicationCoordinator) parallelDocumentOplog(fullBeginTs interface{}) error { var docError error var docWg sync.WaitGroup docWg.Add(1) nimo.GoRoutine(func() { defer docWg.Done() if err := coordinator.startDocumentReplication(); err != nil { docError = LOG.Critical("document Replication error. %v", err) return } LOG.Info("------------------------full sync done!------------------------") }) // during document replication, oplog syncer fetch oplog and store on disk, in order to avoid oplog roll up // fullSyncFinishPosition means no need to check the end time to disable DDL if err := coordinator.startOplogReplication(fullBeginTs, int64(0), nil); err != nil { return LOG.Critical("start oplog replication failed: %v", err) } // wait for document replication to finish, set docEndTs to oplog syncer, start oplog replication docWg.Wait() if docError != nil { return docError } LOG.Info("finish document replication, change oplog replication to %v", utils.LogFetchStage(utils.FetchStageStoreDiskApply)) for _, syncer := range coordinator.syncerGroup { syncer.StartDiskApply() } return nil }