in nimo-shake/incr-sync/syncer.go [148:193]
func NewDispatcher(id int, shard *utils.ShardNode, ckptWriter checkpoint.Writer,
metric *utils.ReplicationMetric) *Dispatcher {
// create dynamo stream client
dynamoStreamSession, err := utils.CreateDynamoStreamSession(conf.Options.LogLevel)
if err != nil {
LOG.Crashf("table[%s] create dynamodb stream session failed[%v]", shard.Table, err)
return nil
}
ns := utils.NS{
Database: conf.Options.Id,
Collection: shard.Table,
}
// create target writer
targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ns, conf.Options.LogLevel)
if targetWriter == nil {
LOG.Crashf("table[%s] create target-writer with type[%v] and address[%v] failed", ns.Collection,
conf.Options.TargetType, conf.Options.TargetAddress)
}
// converter
converter := protocal.NewConverter(conf.Options.ConvertType)
if converter == nil {
LOG.Crashf("table[%s] create converter[%v] failed", conf.Options.ConvertType)
}
d := &Dispatcher{
id: id,
shard: shard,
dynamoStreamSession: dynamoStreamSession,
targetWriter: targetWriter,
batchChan: make(chan *dynamodbstreams.Record, DispatcherBatcherChanSize),
executorChan: make(chan *ExecuteNode, DispatcherExecuterChanSize),
converter: converter,
ns: ns,
ckptWriter: ckptWriter,
metric: metric,
}
go d.batcher()
go d.executor()
go d.ckptManager()
return d
}