in nimo-shake/incr-sync/syncer.go [62:125]
func Start(streamMap map[string]*dynamodbstreams.Stream, ckptWriter checkpoint.Writer) {
replMetric = utils.NewMetric(utils.TypeIncr, utils.METRIC_CKPT_TIMES|utils.METRIC_SUCCESS|utils.METRIC_TPS)
ckptWriterG = ckptWriter
for table, stream := range streamMap {
LOG.Info("table[%v] stream[%v] begin", table, *stream.StreamArn)
shardChan := make(chan *utils.ShardNode, ShardChanSize)
fetcher := NewFetcher(table, stream, shardChan, ckptWriter, replMetric)
if fetcher == nil {
LOG.Crashf("table[%v] stream[%v] start fetcher failed", table, *stream.StreamArn)
}
go fetcher.Run()
for i := 0; i < int(conf.Options.IncreaseConcurrency); i++ {
go func(id int, table string) {
for {
shard := <-shardChan
LOG.Info("table[%s] dispatch id[%v] starts shard[%v]", table, id, *shard.Shard.ShardId)
// check whether current shard is running or finished
GlobalShardLock.Lock()
flag := GlobalShardMap[*shard.Shard.ShardId]
GlobalShardLock.Unlock()
switch flag {
case 0:
LOG.Info("table[%s] dispatch id[%v] shard[%v] isn't running, need to run",
table, id, *shard.Shard.ShardId)
case 1:
LOG.Warn("table[%s] dispatch id[%v] shard[%v] is running, no need to run again",
table, id, *shard.Shard.ShardId)
continue
case 2:
LOG.Warn("table[%s] dispatch id[%v] shard[%v] is finished, no need to run again",
table, id, *shard.Shard.ShardId)
continue
}
// set running flag
GlobalShardLock.Lock()
GlobalShardMap[*shard.Shard.ShardId] = 1
GlobalShardLock.Unlock()
d := NewDispatcher(id, shard, ckptWriter, replMetric)
d.Run()
// set finished flag
GlobalShardLock.Lock()
GlobalShardMap[*shard.Shard.ShardId] = 2
GlobalShardLock.Unlock()
// update table epoch
GlobalFetcherLock.Lock()
GlobalFetcherMoreFlag[shard.Table] += 1
GlobalFetcherLock.Unlock()
LOG.Info("dispatch id[%v] finishes shard[%v]", id, *shard.Shard.ShardId)
}
}(i, table)
}
}
select {}
}