collector/docsyncer/doc_syncer.go (483 lines of code) (raw):

package docsyncer import ( "errors" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "sort" "sync" "sync/atomic" "time" "github.com/alibaba/MongoShake/v2/collector/ckpt" conf "github.com/alibaba/MongoShake/v2/collector/configure" "github.com/alibaba/MongoShake/v2/collector/filter" "github.com/alibaba/MongoShake/v2/collector/transform" utils "github.com/alibaba/MongoShake/v2/common" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" ) const ( MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024 ) func IsShardingToSharding(fromIsSharding bool, toConn *utils.MongoCommunityConn) bool { if conf.Options.FullSyncExecutorDebug { LOG.Info("full_sync.executor.debug set, no need to check IsShardingToSharding") return false } var source, target string if fromIsSharding { source = "sharding" } else { source = "replica" } err := toConn.Client.Database("config").Collection("version").FindOne(nil, bson.M{}).Err() if err != nil { target = "replica" } else { target = "sharding" } LOG.Info("replication from [%s] to [%s]", source, target) if source == "sharding" && target == "sharding" { return true } return false } func in(target string, str_array []string) bool { sort.Strings(str_array) index := sort.SearchStrings(str_array, target) if index < len(str_array) && str_array[index] == target { return true } return false } func StartDropDestCollection(nsSet map[utils.NS]struct{}, toConn *utils.MongoCommunityConn, nsTrans *transform.NamespaceTransform) error { if conf.Options.FullSyncExecutorDebug { LOG.Info("full_sync.executor.debug set, no need to drop collection") return nil } for ns := range nsSet { toNS := utils.NewNS(nsTrans.Transform(ns.Str())) if !conf.Options.FullSyncCollectionDrop { // do not drop colNames, err := toConn.Client.Database(toNS.Database).ListCollectionNames(nil, utils.GetListCollectionQueryCondition(toConn)) if err != nil { LOG.Critical("Get collection names of db %v of dest mongodb failed. %v", toNS.Database, err) return err } // judge whether toNs exists for _, colName := range colNames { if colName == toNS.Collection { LOG.Warn("ns %v to be synced already exists in dest mongodb", toNS) break } } } else { // need drop err := toConn.Client.Database(toNS.Database).Collection(toNS.Collection).Drop(nil) if err != nil && err.Error() != "ns not found" { LOG.Critical("Drop collection ns %v of dest mongodb failed. %v", toNS, err) return errors.New(fmt.Sprintf("Drop collection ns %v of dest mongodb failed. %v", toNS, err)) } } } return nil } func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoCommunityConn, nsTrans *transform.NamespaceTransform) error { LOG.Info("document syncer namespace spec for sharding begin") var fromConn *utils.MongoCommunityConn var err error if fromConn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); err != nil { LOG.Info("Connect to [%s] failed. err[%v]", csUrl, err) return err } defer fromConn.Close() filterList := filter.NewDocFilterList() dbTrans := transform.NewDBTransform(conf.Options.TransformNamespace) type dbSpec struct { Db string `bson:"_id"` Partitioned bool `bson:"partitioned"` } var dbSpecDoc dbSpec var docCursor *mongo.Cursor // enable sharding for db docCursor, err = fromConn.Client.Database("config").Collection("databases").Find(nil, bson.M{}) if err != nil { return err } for docCursor.Next(nil) { err = bson.Unmarshal(docCursor.Current, &dbSpecDoc) if err != nil { LOG.Error("parse docCursor.Current[%v] failed", docCursor.Current) continue } if dbSpecDoc.Partitioned { if filterList.IterateFilter(dbSpecDoc.Db + ".$cmd") { LOG.Debug("DB is filtered. %v", dbSpecDoc.Db) continue } var todbSpecDoc dbSpec todbList := dbTrans.Transform(dbSpecDoc.Db) for _, todb := range todbList { err = toConn.Client.Database("config").Collection("databases").FindOne(nil, bson.D{{"_id", todb}}).Decode(&todbSpecDoc) if err == nil && todbSpecDoc.Partitioned { continue } err = toConn.Client.Database("admin").RunCommand(nil, bson.D{{"enablesharding", todb}}).Err() if err != nil { LOG.Critical("Enable sharding for db %v of dest mongodb failed. %v", todb, err) return errors.New(fmt.Sprintf("Enable sharding for db %v of dest mongodb failed. %v", todb, err)) } LOG.Info("Enable sharding for db %v of dest mongodb successful", todb) } } } if err := docCursor.Close(nil); err != nil { LOG.Critical("Close iterator of config.database failed. %v", err) } type colSpec struct { Ns string `bson:"_id"` Key *bson.Raw `bson:"key"` Unique bool `bson:"unique"` Dropped bool `bson:"dropped"` } var colSpecDoc colSpec var colDocCursor *mongo.Cursor // enable sharding for db(shardCollection) colDocCursor, err = fromConn.Client.Database("config").Collection( "collections").Find(nil, bson.D{}) for colDocCursor.Next(nil) { err = bson.Unmarshal(colDocCursor.Current, &colSpecDoc) if err != nil { LOG.Error("parse colDocCursor.Current[%v] failed", colDocCursor.Current) continue } if !colSpecDoc.Dropped { if filterList.IterateFilter(colSpecDoc.Ns) { LOG.Debug("Namespace is filtered. %v", colSpecDoc.Ns) continue } toNs := nsTrans.Transform(colSpecDoc.Ns) err = toConn.Client.Database("admin").RunCommand(nil, bson.D{{"shardCollection", toNs}, {"key", colSpecDoc.Key}, {"unique", colSpecDoc.Unique}}).Err() if err != nil && !in(toNs, conf.Options.SkipNSShareKeyVerify) { LOG.Critical("Shard collection for ns %v of dest mongodb failed. %v", toNs, err) return errors.New(fmt.Sprintf("Shard collection for ns %v of dest mongodb failed. %v", toNs, err)) } LOG.Info("Shard collection for ns %v of dest mongodb successful", toNs) } } if err = docCursor.Close(nil); err != nil { LOG.Critical("Close iterator of config.collections failed. %v", err) } LOG.Info("document syncer namespace spec for sharding successful") return nil } func StartIndexSync(indexMap map[utils.NS][]bson.D, toUrl string, nsTrans *transform.NamespaceTransform, background bool) (syncError error) { if conf.Options.FullSyncExecutorDebug { LOG.Info("full_sync.executor.debug set, no need to sync index") return nil } type IndexNS struct { ns utils.NS indexList []bson.D } LOG.Info("start writing index with background[%v], indexMap length[%v]", background, len(indexMap)) if len(indexMap) == 0 { LOG.Info("finish writing index, but no data") return nil } collExecutorParallel := conf.Options.FullSyncReaderCollectionParallel namespaces := make(chan *IndexNS, collExecutorParallel) nimo.GoRoutine(func() { for ns, indexList := range indexMap { namespaces <- &IndexNS{ns: ns, indexList: indexList} } close(namespaces) }) var wg sync.WaitGroup wg.Add(collExecutorParallel) for i := 0; i < collExecutorParallel; i++ { nimo.GoRoutine(func() { var conn *utils.MongoCommunityConn var err error if conn, err = utils.NewMongoCommunityConn(toUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernLocal, utils.ReadWriteConcernMajority, conf.Options.TunnelMongoSslRootCaFile); err != nil { LOG.Error("write index but create client fail: %v", err) return } defer conn.Close() defer wg.Done() for { indexNs, ok := <-namespaces if !ok { break } ns := indexNs.ns toNS := ns if nsTrans != nil { toNS = utils.NewNS(nsTrans.Transform(ns.Str())) } for _, index := range indexNs.indexList { // ignore _id if utils.HaveIdIndexKey(index) { continue } newIndex := bson.D{} for _, v := range index { if v.Key == "ns" || v.Key == "v" || v.Key == "background" { continue } newIndex = append(newIndex, v) } newIndex = append(newIndex, primitive.E{Key: "background", Value: background}) if out := conn.Client.Database(toNS.Database).RunCommand(nil, bson.D{ {"createIndexes", toNS.Collection}, {"indexes", []bson.D{newIndex}}, }); out.Err() != nil { LOG.Warn("Create indexes for ns %v of dest mongodb failed. %v", ns, out.Err()) } } LOG.Info("Create indexes for ns %v of dest mongodb finish", toNS) } }) } wg.Wait() LOG.Info("finish writing index") return syncError } func Checkpoint(ckptMap map[string]utils.TimestampNode) error { for name, ts := range ckptMap { ckptManager := ckpt.NewCheckpointManager(name, 0) ckptManager.Get() // load checkpoint in ckptManager if err := ckptManager.Update(ts.Newest); err != nil { return err } } return nil } /************************************************************************/ // 1 shard -> 1 DBSyncer type DBSyncer struct { // syncer id id int // source mongodb url FromMongoUrl string fromReplset string // destination mongodb url ToMongoUrl string // start time of sync startTime time.Time // source is sharding? FromIsSharding bool nsTrans *transform.NamespaceTransform // filter orphan duplicate record orphanFilter *filter.OrphanFilter mutex sync.Mutex qos *utils.Qos // not owned replMetric *utils.ReplicationMetric // below are metric info metricNsMapLock sync.Mutex metricNsMap map[utils.NS]*CollectionMetric // namespace map: db.collection -> collection metric } func NewDBSyncer( id int, fromMongoUrl string, fromReplset string, toMongoUrl string, nsTrans *transform.NamespaceTransform, orphanFilter *filter.OrphanFilter, qos *utils.Qos, fromIsSharding bool) *DBSyncer { syncer := &DBSyncer{ id: id, FromMongoUrl: fromMongoUrl, fromReplset: fromReplset, ToMongoUrl: toMongoUrl, nsTrans: nsTrans, orphanFilter: orphanFilter, qos: qos, metricNsMap: make(map[utils.NS]*CollectionMetric), replMetric: utils.NewMetric(fromReplset, utils.TypeFull, utils.METRIC_TPS|utils.METRIC_SUCCESS), FromIsSharding: fromIsSharding, } return syncer } func (syncer *DBSyncer) String() string { return fmt.Sprintf("DBSyncer id[%v] source[%v] target[%v] startTime[%v]", syncer.id, utils.BlockMongoUrlPassword(syncer.FromMongoUrl, "***"), utils.BlockMongoUrlPassword(syncer.ToMongoUrl, "***"), syncer.startTime) } func (syncer *DBSyncer) Init() { syncer.RestAPI() } func (syncer *DBSyncer) Close() { LOG.Info("syncer[%v] closed", syncer) syncer.replMetric.Close() //sleep 1 second for metric routine exit gracefully time.Sleep(1 * time.Second) } func (syncer *DBSyncer) Start() (syncError error) { syncer.startTime = time.Now() var wg sync.WaitGroup filterList := filter.NewDocFilterList() // get all namespace nsList, _, err := utils.GetDbNamespace(syncer.FromMongoUrl, filterList.IterateFilter, conf.Options.MongoSslRootCaFile) if err != nil { return err } if len(nsList) == 0 { LOG.Info("%s finish, but no data", syncer) return } // create metric for each collection for _, ns := range nsList { syncer.metricNsMap[ns] = NewCollectionMetric() } collExecutorParallel := conf.Options.FullSyncReaderCollectionParallel namespaces := make(chan utils.NS, collExecutorParallel) wg.Add(len(nsList)) nimo.GoRoutine(func() { for _, ns := range nsList { namespaces <- ns } }) // run collection sync in parallel var nsDoneCount int32 = 0 for i := 0; i < collExecutorParallel; i++ { collExecutorId := GenerateCollExecutorId() nimo.GoRoutine(func() { for { ns, ok := <-namespaces if !ok { break } toNS := utils.NewNS(syncer.nsTrans.Transform(ns.Str())) LOG.Info("%s collExecutor-%d sync ns %v to %v begin", syncer, collExecutorId, ns, toNS) err := syncer.collectionSync(collExecutorId, ns, toNS) atomic.AddInt32(&nsDoneCount, 1) if err != nil { LOG.Critical("%s collExecutor-%d sync ns %v to %v failed. %v", syncer, collExecutorId, ns, toNS, err) syncError = fmt.Errorf("document syncer sync ns %v to %v failed. %v", ns, toNS, err) } else { process := int(atomic.LoadInt32(&nsDoneCount)) * 100 / len(nsList) LOG.Info("%s collExecutor-%d sync ns %v to %v successful. db syncer-%d progress %v%%", syncer, collExecutorId, ns, toNS, syncer.id, process) } wg.Done() } LOG.Info("%s collExecutor-%d finish", syncer, collExecutorId) }) } wg.Wait() close(namespaces) return syncError } // start sync single collection func (syncer *DBSyncer) collectionSync(collExecutorId int, ns utils.NS, toNS utils.NS) error { // writer colExecutor := NewCollectionExecutor(collExecutorId, syncer.ToMongoUrl, toNS, syncer, conf.Options.TunnelMongoSslRootCaFile) if err := colExecutor.Start(); err != nil { return fmt.Errorf("start collectionSync failed: %v", err) } // splitter reader splitter := NewDocumentSplitter(syncer.FromMongoUrl, conf.Options.MongoSslRootCaFile, ns) if splitter == nil { return fmt.Errorf("create splitter failed") } defer splitter.Close() // metric collectionMetric := syncer.metricNsMap[ns] collectionMetric.CollectionStatus = StatusProcessing collectionMetric.TotalCount = splitter.count // run in several pieces var wg sync.WaitGroup wg.Add(conf.Options.FullSyncReaderParallelThread) for i := 0; i < conf.Options.FullSyncReaderParallelThread; i++ { go func() { defer wg.Done() for { reader, ok := <-splitter.readerChan if !ok || reader == nil { break } if err := syncer.splitSync(reader, colExecutor, collectionMetric); err != nil { LOG.Crashf("%v", err) } } }() } wg.Wait() LOG.Info("%s all readers finish, wait all writers finish", syncer) // close writer if err := colExecutor.Wait(); err != nil { return fmt.Errorf("close writer failed: %v", err) } /* * in the former version, we fetch indexes after all data finished. However, it'll * have problem if the index is build/delete/update in the full-sync stage, the oplog * will be replayed again, e.g., build index, which must be wrong. */ // fetch index // set collection finish collectionMetric.CollectionStatus = StatusFinish return nil } func (syncer *DBSyncer) splitSync(reader *DocumentReader, colExecutor *CollectionExecutor, collectionMetric *CollectionMetric) error { bufferSize := conf.Options.FullSyncReaderDocumentBatchSize buffer := make([]*bson.Raw, 0, bufferSize) bufferByteSize := 0 for { doc, err := reader.NextDoc() // doc, err := reader.NextDocMgo() if err != nil { return fmt.Errorf("splitter reader[%v] get next document failed: %v", reader, err) } else if doc == nil { atomic.AddUint64(&collectionMetric.FinishCount, uint64(len(buffer))) colExecutor.Sync(buffer) syncer.replMetric.AddSuccess(uint64(len(buffer))) // only used to calculate the tps which is extract from "success" break } syncer.replMetric.AddGet(1) if bufferByteSize+len(doc) > MAX_BUFFER_BYTE_SIZE || len(buffer) >= bufferSize { atomic.AddUint64(&collectionMetric.FinishCount, uint64(len(buffer))) colExecutor.Sync(buffer) syncer.replMetric.AddSuccess(uint64(len(buffer))) // only used to calculate the tps which is extract from "success" buffer = make([]*bson.Raw, 0, bufferSize) bufferByteSize = 0 } // transform dbref for document if len(conf.Options.TransformNamespace) > 0 && conf.Options.IncrSyncDBRef { var docData bson.D if err := bson.Unmarshal(doc, &docData); err != nil { LOG.Error("splitter reader[%v] do bson unmarshal %v failed. %v", reader, doc, err) } else { docData = transform.TransformDBRef(docData, reader.ns.Database, syncer.nsTrans) if v, err := bson.Marshal(docData); err != nil { LOG.Warn("splitter reader[%v] do bson marshal %v failed. %v", reader, docData, err) } else { doc = v } } } buffer = append(buffer, &doc) bufferByteSize += len(doc) } LOG.Info("splitter reader finishes: %v", reader) reader.Close() // reader.CloseMgo() return nil } /************************************************************************/ // restful api func (syncer *DBSyncer) RestAPI() { // progress api type OverviewInfo struct { Progress string `json:"progress"` // synced_collection_number / total_collection_number TotalCollection int `json:"total_collection_number"` // total collection FinishedCollection int `json:"finished_collection_number"` // finished ProcessingCollection int `json:"processing_collection_number"` // in processing WaitCollection int `json:"wait_collection_number"` // wait start CollectionMetric map[string]string `json:"collection_metric"` // collection_name -> process } utils.FullSyncHttpApi.RegisterAPI("/progress", nimo.HttpGet, func([]byte) interface{} { ret := OverviewInfo{ CollectionMetric: make(map[string]string), } syncer.metricNsMapLock.Lock() defer syncer.metricNsMapLock.Unlock() ret.TotalCollection = len(syncer.metricNsMap) for ns, collectionMetric := range syncer.metricNsMap { ret.CollectionMetric[ns.Str()] = collectionMetric.String() switch collectionMetric.CollectionStatus { case StatusWaitStart: ret.WaitCollection += 1 case StatusProcessing: ret.ProcessingCollection += 1 case StatusFinish: ret.FinishedCollection += 1 } } if ret.TotalCollection == 0 { ret.Progress = "100%" } else { ret.Progress = fmt.Sprintf("%.2f%%", float64(ret.FinishedCollection)/float64(ret.TotalCollection)*100) } return ret }) /***************************************************/ }