in nimo-shake/full-sync/table-syncer.go [77:143]
func (ts *tableSyncer) Sync() {
ts.fetcherChan = make(chan *dynamodb.ScanOutput, fetcherChanSize)
ts.parserChan = make(chan interface{}, parserChanSize)
targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ts.ns, conf.Options.LogLevel)
if targetWriter == nil {
LOG.Crashf("%s create writer failed", ts)
return
}
// create table and index with description
if err := targetWriter.CreateTable(ts.sourceTableDescribe); err != nil {
LOG.Crashf("%s create table failed: %v", ts, err)
return
}
// wait dynamo proxy to sync cache
time.Sleep(10 * time.Second)
if conf.Options.SyncSchemaOnly {
LOG.Info("sync_schema_only enabled, %s exits", ts)
return
}
// total table item count
totalCount := ts.count()
ts.collectionMetric.CollectionStatus = utils.StatusProcessing
ts.collectionMetric.TotalCount = totalCount
// start fetcher to fetch all data from DynamoDB
go ts.fetcher()
// start parser to get data from fetcher and write into writer.
// we can also start several parsers to accelerate
var wgParser sync.WaitGroup
wgParser.Add(int(conf.Options.FullDocumentParser))
for i := 0; i < int(conf.Options.FullDocumentParser); i++ {
go func(id int) {
ts.parser(id)
wgParser.Done()
}(i)
}
// start writer
var wgWriter sync.WaitGroup
wgWriter.Add(int(conf.Options.FullDocumentConcurrency))
for i := 0; i < int(conf.Options.FullDocumentConcurrency); i++ {
go func(id int) {
LOG.Info("%s create document syncer with id[%v]", ts, id)
ds := NewDocumentSyncer(ts.id, ts.ns.Collection, id, ts.parserChan, ts.sourceTableDescribe,
ts.collectionMetric)
ds.Run()
LOG.Info("%s document syncer with id[%v] exit", ts, id)
wgWriter.Done()
}(i)
}
LOG.Info("%s wait all parsers exiting", ts.String())
wgParser.Wait() // wait all parser exit
close(ts.parserChan) // close parser channel
LOG.Info("%s all parsers exited, wait all writers exiting", ts.String())
wgWriter.Wait() // wait all writer exit
ts.collectionMetric.CollectionStatus = utils.StatusFinish
LOG.Info("%s finish syncing table", ts.String())
}