func()

in collector/coordinator/full.go [61:234]


func (coordinator *ReplicationCoordinator) startDocumentReplication() error {

	fromIsSharding := coordinator.SourceIsSharding()

	var shardingChunkMap sharding.ShardingChunkMap
	var err error
	// init orphan sharding chunk map if source is mongod(get data directly from mongod)
	if fromIsSharding && coordinator.MongoS == nil {
		LOG.Info("source is mongod, need to fetching chunk map")
		shardingChunkMap, err = fetchChunkMap(fromIsSharding)
		if err != nil {
			LOG.Critical("fetch chunk map failed[%v]", err)
			return err
		}
	} else {
		LOG.Info("source is replica or mongos, no need to fetching chunk map")
	}

	filterList := filter.NewDocFilterList()
	// get all namespace need to sync
	nsSet, _, err := utils.GetAllNamespace(coordinator.RealSourceFullSync, filterList.IterateFilter,
		conf.Options.MongoSslRootCaFile)
	if err != nil {
		return err
	}
	LOG.Info("all namespace: %v", nsSet)

	var ckptMap map[string]utils.TimestampNode
	if conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless && len(coordinator.MongoD) > 0 {
		// get current newest timestamp
		ckptMap, err = getTimestampMap(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
		if err != nil {
			return err
		}
	}

	// create target client
	toUrl := conf.Options.TunnelAddress[0]
	var toConn *utils.MongoCommunityConn
	if !conf.Options.FullSyncExecutorDebug {
		if toConn, err = utils.NewMongoCommunityConn(toUrl, utils.VarMongoConnectModePrimary, true,
			utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, conf.Options.TunnelMongoSslRootCaFile); err != nil {
			return err
		}
		defer toConn.Close()
	}

	// create namespace transform
	trans := transform.NewNamespaceTransform(conf.Options.TransformNamespace)

	// drop target collection if possible
	if err := docsyncer.StartDropDestCollection(nsSet, toConn, trans); err != nil {
		return err
	}

	// enable shard if sharding -> sharding
	shardingSync := docsyncer.IsShardingToSharding(fromIsSharding, toConn)
	if shardingSync {
		var connString string
		if len(conf.Options.MongoSUrl) > 0 {
			connString = conf.Options.MongoSUrl
		} else {
			connString = conf.Options.MongoCsUrl
		}
		if err := docsyncer.StartNamespaceSpecSyncForSharding(connString, toConn, trans); err != nil {
			return err
		}
	}

	// fetch all indexes
	var indexMap map[utils.NS][]bson.D
	if conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexNone {
		if indexMap, err = fetchIndexes(coordinator.RealSourceFullSync, filterList.IterateFilter); err != nil {
			return fmt.Errorf("fetch index failed[%v]", err)
		}

		// print
		LOG.Info("index list below: ----------")
		for ns, index := range indexMap {
			// LOG.Info("collection[%v] -> %s", ns, utils.MarshalStruct(index))
			LOG.Info("collection[%v] -> %v", ns, index)
		}
		LOG.Info("index list above: ----------")

		if conf.Options.FullSyncCreateIndex == utils.VarFullSyncCreateIndexBackground {
			if err := docsyncer.StartIndexSync(indexMap, toUrl, trans, true); err != nil {
				return fmt.Errorf("create background index failed[%v]", err)
			}
		}
	}

	// global qps limit, all dbsyncer share only 1 Qos
	qos := utils.StartQoS(0, int64(conf.Options.FullSyncReaderDocumentBatchSize), &utils.FullSentinelOptions.TPS)

	// start sync each db
	var wg sync.WaitGroup
	var replError error
	for i, src := range coordinator.RealSourceFullSync {
		var orphanFilter *filter.OrphanFilter
		if conf.Options.FullSyncExecutorFilterOrphanDocument && shardingChunkMap != nil {
			dbChunkMap := make(sharding.DBChunkMap)
			if chunkMap, ok := shardingChunkMap[src.ReplicaName]; ok {
				dbChunkMap = chunkMap
			} else {
				LOG.Warn("document syncer %v has no chunk map", src.ReplicaName)
			}
			orphanFilter = filter.NewOrphanFilter(src.ReplicaName, dbChunkMap)
		}

		dbSyncer := docsyncer.NewDBSyncer(i, src.URL, src.ReplicaName, toUrl, trans, orphanFilter, qos, fromIsSharding)
		dbSyncer.Init()
		LOG.Info("document syncer-%d do replication for url=%v", i, src.URL)

		wg.Add(1)
		nimo.GoRoutine(func() {
			defer wg.Done()
			if err := dbSyncer.Start(); err != nil {
				LOG.Critical("document replication for url=%v failed. %v",
					utils.BlockMongoUrlPassword(src.URL, "***"), err)
				replError = err
			}
			dbSyncer.Close()
		})
	}

	// start http server.
	nimo.GoRoutine(func() {
		// before starting, we must register all interface
		if err := utils.FullSyncHttpApi.Listen(); err != nil {
			LOG.Critical("start full sync server with port[%v] failed: %v", conf.Options.FullSyncHTTPListenPort,
				err)
		}
	})

	// wait all db finished
	wg.Wait()
	if replError != nil {
		return replError
	}

	// create index if == foreground
	if conf.Options.FullSyncCreateIndex == utils.VarFullSyncCreateIndexForeground {
		if err := docsyncer.StartIndexSync(indexMap, toUrl, trans, false); err != nil {
			return fmt.Errorf("create forground index failed[%v]", err)
		}
	}

	// update checkpoint after full sync
	// do not update checkpoint when source is "aliyun_serverless"
	if conf.Options.SyncMode != utils.VarSyncModeFull && conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
		// need merge to one when from mongos and fetch_mothod=="change_stream"
		if coordinator.MongoS != nil && conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
			var smallestNew int64 = math.MaxInt64
			for _, val := range ckptMap {
				if smallestNew > val.Newest {
					smallestNew = val.Newest
				}
			}
			ckptMap = map[string]utils.TimestampNode{
				coordinator.MongoS.ReplicaName: {
					Newest: smallestNew,
				},
			}
		}

		LOG.Info("try to set checkpoint with map[%v]", ckptMap)
		if err := docsyncer.Checkpoint(ckptMap); err != nil {
			return err
		}
	}

	LOG.Info("document syncer sync end")
	return nil
}