func Start()

in nimo-shake/full-sync/syncer.go [28:90]


func Start(dynamoSession *dynamodb.DynamoDB, w writer.Writer) {
	// fetch all tables
	LOG.Info("start fetching table list")
	tableList, err := utils.FetchTableList(dynamoSession)
	if err != nil {
		LOG.Crashf("fetch table list failed[%v]", err)
	}
	LOG.Info("finish fetching table list: %v", tableList)

	tableList = filter.FilterList(tableList)

	if err := checkTableExists(tableList, w); err != nil {
		if !strings.Contains(err.Error(), "ResourceNotFoundException") {
			LOG.Crashf("check table exists failed[%v]", err)
			return
		}
	}

	LOG.Info("start syncing: %v", tableList)

	metricNsMapLock.Lock()
	for _, table := range tableList {
		metricNsMap[table] = utils.NewCollectionMetric()
	}
	metricNsMapLock.Unlock()

	fullChan := make(chan string, len(tableList))
	for _, table := range tableList {
		fullChan <- table
	}

	var wg sync.WaitGroup
	wg.Add(len(tableList))
	for i := 0; i < int(conf.Options.FullConcurrency); i++ {
		go func(id int) {
			for {
				table, ok := <-fullChan
				if !ok {
					// chan closed
					break
				}

				// no need to lock map because the map size won't change
				ts := NewTableSyncer(id, table, metricNsMap[table])
				if ts == nil {
					LOG.Crashf("tableSyncer[%v] create failed", id)
				}

				LOG.Info("tableSyncer[%v] starts sync table[%v]", id, table)
				ts.Sync()
				LOG.Info("tableSyncer[%v] finish sync table[%v]", id, table)
				ts.Close()

				wg.Done()
			}
		}(i)
	}

	wg.Wait()
	close(fullChan)

	LOG.Info("finish syncing all tables and indexes!")
}