collector/coordinator/incr.go (64 lines of code) (raw):
package coordinator
import (
"errors"
"github.com/alibaba/MongoShake/v2/collector"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"fmt"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
func (coordinator *ReplicationCoordinator) startOplogReplication(oplogStartPosition interface{},
fullSyncFinishPosition int64,
startTsMap map[string]int64) error {
// prepare all syncer. only one syncer while source is ReplicaSet or mongos
// otherwise one syncer connects to one shard
LOG.Info("start incr replication")
for i, src := range coordinator.RealSourceIncrSync {
var syncerTs interface{}
if len(coordinator.MongoD) > 0 {
// read from shard or replicaset
if val, ok := oplogStartPosition.(int64); ok && val == 0 {
if v, ok := startTsMap[src.ReplicaName]; !ok {
return fmt.Errorf("replia[%v] not exists on startTsMap[%v]", src.ReplicaName, startTsMap)
} else {
syncerTs = v
}
} else {
syncerTs = oplogStartPosition
}
} else {
// read from mongos
syncerTs = oplogStartPosition
LOG.Info("read from mongos src.ReplicaName:%s ts:%v", src.ReplicaName, startTsMap[src.ReplicaName])
if len(conf.Options.MongoSUrl) > 0 && len(conf.Options.MongoCsUrl) == 0 && len(conf.Options.MongoUrls) == 0 {
if v, ok := startTsMap[src.ReplicaName]; ok {
syncerTs = v
LOG.Info("read from mongos and set ts src.ReplicaName:%s ts:%v", src.ReplicaName, startTsMap[src.ReplicaName])
}
}
}
LOG.Info("RealSourceIncrSync[%d]: %s, startTimestamp[%v]", i, src, syncerTs)
syncer := collector.NewOplogSyncer(src.ReplicaName, syncerTs, fullSyncFinishPosition, src.URL,
src.Gids)
// syncerGroup http api registry
syncer.Init()
coordinator.syncerGroup = append(coordinator.syncerGroup, syncer)
}
// set to group 0 as a leader
coordinator.syncerGroup[0].SyncGroup = coordinator.syncerGroup
// prepare worker routine and bind it to syncer
for i := 0; i < conf.Options.IncrSyncWorker; i++ {
syncer := coordinator.syncerGroup[i%len(coordinator.syncerGroup)]
w := collector.NewWorker(syncer, uint32(i))
if !w.Init() {
return errors.New("worker initialize error")
}
w.SetInitSyncFinishTs(fullSyncFinishPosition)
// syncer and worker are independent. the relationship between
// them needs binding here. one worker definitely belongs to a specific
// syncer. However individual syncer could bind multi workers (if source
// of overall replication is single mongodb replica)
syncer.Bind(w)
go w.StartWorker()
}
for _, syncer := range coordinator.syncerGroup {
go syncer.Start()
}
// start http server
nimo.GoRoutine(func() {
if err := utils.IncrSyncHttpApi.Listen(); err != nil {
LOG.Critical("start incr sync server with port[%v] failed: %v", conf.Options.IncrSyncHTTPListenPort,
err)
}
})
return nil
}