func Start()

in nimo-shake/incr-sync/syncer.go [62:125]


func Start(streamMap map[string]*dynamodbstreams.Stream, ckptWriter checkpoint.Writer) {
	replMetric = utils.NewMetric(utils.TypeIncr, utils.METRIC_CKPT_TIMES|utils.METRIC_SUCCESS|utils.METRIC_TPS)
	ckptWriterG = ckptWriter
	for table, stream := range streamMap {
		LOG.Info("table[%v] stream[%v] begin", table, *stream.StreamArn)

		shardChan := make(chan *utils.ShardNode, ShardChanSize)
		fetcher := NewFetcher(table, stream, shardChan, ckptWriter, replMetric)
		if fetcher == nil {
			LOG.Crashf("table[%v] stream[%v] start fetcher failed", table, *stream.StreamArn)
		}

		go fetcher.Run()

		for i := 0; i < int(conf.Options.IncreaseConcurrency); i++ {
			go func(id int, table string) {
				for {
					shard := <-shardChan
					LOG.Info("table[%s] dispatch id[%v] starts shard[%v]", table, id, *shard.Shard.ShardId)

					// check whether current shard is running or finished
					GlobalShardLock.Lock()
					flag := GlobalShardMap[*shard.Shard.ShardId]
					GlobalShardLock.Unlock()
					switch flag {
					case 0:
						LOG.Info("table[%s] dispatch id[%v] shard[%v] isn't running, need to run",
							table, id, *shard.Shard.ShardId)
					case 1:
						LOG.Warn("table[%s] dispatch id[%v] shard[%v] is running, no need to run again",
							table, id, *shard.Shard.ShardId)
						continue
					case 2:
						LOG.Warn("table[%s] dispatch id[%v] shard[%v] is finished, no need to run again",
							table, id, *shard.Shard.ShardId)
						continue
					}

					// set running flag
					GlobalShardLock.Lock()
					GlobalShardMap[*shard.Shard.ShardId] = 1
					GlobalShardLock.Unlock()

					d := NewDispatcher(id, shard, ckptWriter, replMetric)
					d.Run()

					// set finished flag
					GlobalShardLock.Lock()
					GlobalShardMap[*shard.Shard.ShardId] = 2
					GlobalShardLock.Unlock()

					// update table epoch
					GlobalFetcherLock.Lock()
					GlobalFetcherMoreFlag[shard.Table] += 1
					GlobalFetcherLock.Unlock()

					LOG.Info("dispatch id[%v] finishes shard[%v]", id, *shard.Shard.ShardId)
				}
			}(i, table)
		}
	}

	select {}
}