collector/coordinator/full.go (186 lines of code) (raw):
package coordinator
import (
"fmt"
"math"
"sync"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/collector/docsyncer"
"github.com/alibaba/MongoShake/v2/collector/filter"
"github.com/alibaba/MongoShake/v2/collector/transform"
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/sharding"
"github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
)
func fetchChunkMap(isSharding bool) (sharding.ShardingChunkMap, error) {
// return directly if source is replica set or fetch method is change stream
if !isSharding || conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
return nil, nil
}
ok, err := sharding.GetBalancerStatusByUrl(conf.Options.MongoCsUrl)
if err != nil {
return nil, fmt.Errorf("obtain balance status from mongo_cs_url=%s error. %v",
conf.Options.MongoCsUrl, err)
}
if ok {
return nil, fmt.Errorf("source mongodb sharding need to stop balancer when document replication occur")
}
// enable filter orphan document
if conf.Options.FullSyncExecutorFilterOrphanDocument {
LOG.Info("begin to get chunk map from config.chunks of source mongodb sharding")
return sharding.GetChunkMapByUrl(conf.Options.MongoCsUrl)
}
return nil, nil
}
func getTimestampMap(sources []*utils.MongoSource, sslRootFile string) (map[string]utils.TimestampNode, error) {
// no need to fetch if sync mode is full only
if conf.Options.SyncMode == utils.VarSyncModeFull {
return nil, nil
}
var ckptMap map[string]utils.TimestampNode
var err error
ckptMap, _, _, _, _, err = utils.GetAllTimestamp(sources, sslRootFile)
if err != nil {
return nil, fmt.Errorf("fetch source all timestamp failed: %v", err)
}
return ckptMap, nil
}
func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
fromIsSharding := coordinator.SourceIsSharding()
var shardingChunkMap sharding.ShardingChunkMap
var err error
// init orphan sharding chunk map if source is mongod(get data directly from mongod)
if fromIsSharding && coordinator.MongoS == nil {
LOG.Info("source is mongod, need to fetching chunk map")
shardingChunkMap, err = fetchChunkMap(fromIsSharding)
if err != nil {
LOG.Critical("fetch chunk map failed[%v]", err)
return err
}
} else {
LOG.Info("source is replica or mongos, no need to fetching chunk map")
}
filterList := filter.NewDocFilterList()
// get all namespace need to sync
nsSet, _, err := utils.GetAllNamespace(coordinator.RealSourceFullSync, filterList.IterateFilter,
conf.Options.MongoSslRootCaFile)
if err != nil {
return err
}
LOG.Info("all namespace: %v", nsSet)
var ckptMap map[string]utils.TimestampNode
if conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless && len(coordinator.MongoD) > 0 {
// get current newest timestamp
ckptMap, err = getTimestampMap(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
if err != nil {
return err
}
}
// create target client
toUrl := conf.Options.TunnelAddress[0]
var toConn *utils.MongoCommunityConn
if !conf.Options.FullSyncExecutorDebug {
if toConn, err = utils.NewMongoCommunityConn(toUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, conf.Options.TunnelMongoSslRootCaFile); err != nil {
return err
}
defer toConn.Close()
}
// create namespace transform
trans := transform.NewNamespaceTransform(conf.Options.TransformNamespace)
// drop target collection if possible
if err := docsyncer.StartDropDestCollection(nsSet, toConn, trans); err != nil {
return err
}
// enable shard if sharding -> sharding
shardingSync := docsyncer.IsShardingToSharding(fromIsSharding, toConn)
if shardingSync {
var connString string
if len(conf.Options.MongoSUrl) > 0 {
connString = conf.Options.MongoSUrl
} else {
connString = conf.Options.MongoCsUrl
}
if err := docsyncer.StartNamespaceSpecSyncForSharding(connString, toConn, trans); err != nil {
return err
}
}
// fetch all indexes
var indexMap map[utils.NS][]bson.D
if conf.Options.FullSyncCreateIndex != utils.VarFullSyncCreateIndexNone {
if indexMap, err = fetchIndexes(coordinator.RealSourceFullSync, filterList.IterateFilter); err != nil {
return fmt.Errorf("fetch index failed[%v]", err)
}
// print
LOG.Info("index list below: ----------")
for ns, index := range indexMap {
// LOG.Info("collection[%v] -> %s", ns, utils.MarshalStruct(index))
LOG.Info("collection[%v] -> %v", ns, index)
}
LOG.Info("index list above: ----------")
if conf.Options.FullSyncCreateIndex == utils.VarFullSyncCreateIndexBackground {
if err := docsyncer.StartIndexSync(indexMap, toUrl, trans, true); err != nil {
return fmt.Errorf("create background index failed[%v]", err)
}
}
}
// global qps limit, all dbsyncer share only 1 Qos
qos := utils.StartQoS(0, int64(conf.Options.FullSyncReaderDocumentBatchSize), &utils.FullSentinelOptions.TPS)
// start sync each db
var wg sync.WaitGroup
var replError error
for i, src := range coordinator.RealSourceFullSync {
var orphanFilter *filter.OrphanFilter
if conf.Options.FullSyncExecutorFilterOrphanDocument && shardingChunkMap != nil {
dbChunkMap := make(sharding.DBChunkMap)
if chunkMap, ok := shardingChunkMap[src.ReplicaName]; ok {
dbChunkMap = chunkMap
} else {
LOG.Warn("document syncer %v has no chunk map", src.ReplicaName)
}
orphanFilter = filter.NewOrphanFilter(src.ReplicaName, dbChunkMap)
}
dbSyncer := docsyncer.NewDBSyncer(i, src.URL, src.ReplicaName, toUrl, trans, orphanFilter, qos, fromIsSharding)
dbSyncer.Init()
LOG.Info("document syncer-%d do replication for url=%v", i, src.URL)
wg.Add(1)
nimo.GoRoutine(func() {
defer wg.Done()
if err := dbSyncer.Start(); err != nil {
LOG.Critical("document replication for url=%v failed. %v",
utils.BlockMongoUrlPassword(src.URL, "***"), err)
replError = err
}
dbSyncer.Close()
})
}
// start http server.
nimo.GoRoutine(func() {
// before starting, we must register all interface
if err := utils.FullSyncHttpApi.Listen(); err != nil {
LOG.Critical("start full sync server with port[%v] failed: %v", conf.Options.FullSyncHTTPListenPort,
err)
}
})
// wait all db finished
wg.Wait()
if replError != nil {
return replError
}
// create index if == foreground
if conf.Options.FullSyncCreateIndex == utils.VarFullSyncCreateIndexForeground {
if err := docsyncer.StartIndexSync(indexMap, toUrl, trans, false); err != nil {
return fmt.Errorf("create forground index failed[%v]", err)
}
}
// update checkpoint after full sync
// do not update checkpoint when source is "aliyun_serverless"
if conf.Options.SyncMode != utils.VarSyncModeFull && conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
// need merge to one when from mongos and fetch_mothod=="change_stream"
if coordinator.MongoS != nil && conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
var smallestNew int64 = math.MaxInt64
for _, val := range ckptMap {
if smallestNew > val.Newest {
smallestNew = val.Newest
}
}
ckptMap = map[string]utils.TimestampNode{
coordinator.MongoS.ReplicaName: {
Newest: smallestNew,
},
}
}
LOG.Info("try to set checkpoint with map[%v]", ckptMap)
if err := docsyncer.Checkpoint(ckptMap); err != nil {
return err
}
}
LOG.Info("document syncer sync end")
return nil
}
func (coordinator *ReplicationCoordinator) SourceIsSharding() bool {
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
return coordinator.MongoS != nil
} else {
return len(conf.Options.MongoUrls) > 1
}
}