in collector/docsyncer/doc_syncer.go [373:443]
func (syncer *DBSyncer) Start() (syncError error) {
syncer.startTime = time.Now()
var wg sync.WaitGroup
filterList := filter.NewDocFilterList()
// get all namespace
nsList, _, err := utils.GetDbNamespace(syncer.FromMongoUrl, filterList.IterateFilter,
conf.Options.MongoSslRootCaFile)
if err != nil {
return err
}
if len(nsList) == 0 {
LOG.Info("%s finish, but no data", syncer)
return
}
// create metric for each collection
for _, ns := range nsList {
syncer.metricNsMap[ns] = NewCollectionMetric()
}
collExecutorParallel := conf.Options.FullSyncReaderCollectionParallel
namespaces := make(chan utils.NS, collExecutorParallel)
wg.Add(len(nsList))
nimo.GoRoutine(func() {
for _, ns := range nsList {
namespaces <- ns
}
})
// run collection sync in parallel
var nsDoneCount int32 = 0
for i := 0; i < collExecutorParallel; i++ {
collExecutorId := GenerateCollExecutorId()
nimo.GoRoutine(func() {
for {
ns, ok := <-namespaces
if !ok {
break
}
toNS := utils.NewNS(syncer.nsTrans.Transform(ns.Str()))
LOG.Info("%s collExecutor-%d sync ns %v to %v begin", syncer, collExecutorId, ns, toNS)
err := syncer.collectionSync(collExecutorId, ns, toNS)
atomic.AddInt32(&nsDoneCount, 1)
if err != nil {
LOG.Critical("%s collExecutor-%d sync ns %v to %v failed. %v",
syncer, collExecutorId, ns, toNS, err)
syncError = fmt.Errorf("document syncer sync ns %v to %v failed. %v", ns, toNS, err)
} else {
process := int(atomic.LoadInt32(&nsDoneCount)) * 100 / len(nsList)
LOG.Info("%s collExecutor-%d sync ns %v to %v successful. db syncer-%d progress %v%%",
syncer, collExecutorId, ns, toNS, syncer.id, process)
}
wg.Done()
}
LOG.Info("%s collExecutor-%d finish", syncer, collExecutorId)
})
}
wg.Wait()
close(namespaces)
return syncError
}