in nimo-shake/full-sync/syncer.go [28:90]
func Start(dynamoSession *dynamodb.DynamoDB, w writer.Writer) {
// fetch all tables
LOG.Info("start fetching table list")
tableList, err := utils.FetchTableList(dynamoSession)
if err != nil {
LOG.Crashf("fetch table list failed[%v]", err)
}
LOG.Info("finish fetching table list: %v", tableList)
tableList = filter.FilterList(tableList)
if err := checkTableExists(tableList, w); err != nil {
if !strings.Contains(err.Error(), "ResourceNotFoundException") {
LOG.Crashf("check table exists failed[%v]", err)
return
}
}
LOG.Info("start syncing: %v", tableList)
metricNsMapLock.Lock()
for _, table := range tableList {
metricNsMap[table] = utils.NewCollectionMetric()
}
metricNsMapLock.Unlock()
fullChan := make(chan string, len(tableList))
for _, table := range tableList {
fullChan <- table
}
var wg sync.WaitGroup
wg.Add(len(tableList))
for i := 0; i < int(conf.Options.FullConcurrency); i++ {
go func(id int) {
for {
table, ok := <-fullChan
if !ok {
// chan closed
break
}
// no need to lock map because the map size won't change
ts := NewTableSyncer(id, table, metricNsMap[table])
if ts == nil {
LOG.Crashf("tableSyncer[%v] create failed", id)
}
LOG.Info("tableSyncer[%v] starts sync table[%v]", id, table)
ts.Sync()
LOG.Info("tableSyncer[%v] finish sync table[%v]", id, table)
ts.Close()
wg.Done()
}
}(i)
}
wg.Wait()
close(fullChan)
LOG.Info("finish syncing all tables and indexes!")
}