func()

in nimo-shake/full-sync/table-syncer.go [77:143]


func (ts *tableSyncer) Sync() {
	ts.fetcherChan = make(chan *dynamodb.ScanOutput, fetcherChanSize)
	ts.parserChan = make(chan interface{}, parserChanSize)

	targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ts.ns, conf.Options.LogLevel)
	if targetWriter == nil {
		LOG.Crashf("%s create writer failed", ts)
		return
	}
	// create table and index with description
	if err := targetWriter.CreateTable(ts.sourceTableDescribe); err != nil {
		LOG.Crashf("%s create table failed: %v", ts, err)
		return
	}

	// wait dynamo proxy to sync cache
	time.Sleep(10 * time.Second)

	if conf.Options.SyncSchemaOnly {
		LOG.Info("sync_schema_only enabled, %s exits", ts)
		return
	}

	// total table item count
	totalCount := ts.count()

	ts.collectionMetric.CollectionStatus = utils.StatusProcessing
	ts.collectionMetric.TotalCount = totalCount

	// start fetcher to fetch all data from DynamoDB
	go ts.fetcher()

	// start parser to get data from fetcher and write into writer.
	// we can also start several parsers to accelerate
	var wgParser sync.WaitGroup
	wgParser.Add(int(conf.Options.FullDocumentParser))
	for i := 0; i < int(conf.Options.FullDocumentParser); i++ {
		go func(id int) {
			ts.parser(id)
			wgParser.Done()
		}(i)
	}

	// start writer
	var wgWriter sync.WaitGroup
	wgWriter.Add(int(conf.Options.FullDocumentConcurrency))
	for i := 0; i < int(conf.Options.FullDocumentConcurrency); i++ {
		go func(id int) {
			LOG.Info("%s create document syncer with id[%v]", ts, id)
			ds := NewDocumentSyncer(ts.id, ts.ns.Collection, id, ts.parserChan, ts.sourceTableDescribe,
				ts.collectionMetric)
			ds.Run()
			LOG.Info("%s document syncer with id[%v] exit", ts, id)
			wgWriter.Done()
		}(i)
	}

	LOG.Info("%s wait all parsers exiting", ts.String())
	wgParser.Wait()      // wait all parser exit
	close(ts.parserChan) // close parser channel

	LOG.Info("%s all parsers exited, wait all writers exiting", ts.String())
	wgWriter.Wait() // wait all writer exit

	ts.collectionMetric.CollectionStatus = utils.StatusFinish
	LOG.Info("%s finish syncing table", ts.String())
}