func NewDispatcher()

in nimo-shake/incr-sync/syncer.go [148:193]


func NewDispatcher(id int, shard *utils.ShardNode, ckptWriter checkpoint.Writer,
	metric *utils.ReplicationMetric) *Dispatcher {
	// create dynamo stream client
	dynamoStreamSession, err := utils.CreateDynamoStreamSession(conf.Options.LogLevel)
	if err != nil {
		LOG.Crashf("table[%s] create dynamodb stream session failed[%v]", shard.Table, err)
		return nil
	}

	ns := utils.NS{
		Database:   conf.Options.Id,
		Collection: shard.Table,
	}

	// create target writer
	targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ns, conf.Options.LogLevel)
	if targetWriter == nil {
		LOG.Crashf("table[%s] create target-writer with type[%v] and address[%v] failed", ns.Collection,
			conf.Options.TargetType, conf.Options.TargetAddress)
	}

	// converter
	converter := protocal.NewConverter(conf.Options.ConvertType)
	if converter == nil {
		LOG.Crashf("table[%s] create converter[%v] failed", conf.Options.ConvertType)
	}

	d := &Dispatcher{
		id:                  id,
		shard:               shard,
		dynamoStreamSession: dynamoStreamSession,
		targetWriter:        targetWriter,
		batchChan:           make(chan *dynamodbstreams.Record, DispatcherBatcherChanSize),
		executorChan:        make(chan *ExecuteNode, DispatcherExecuterChanSize),
		converter:           converter,
		ns:                  ns,
		ckptWriter:          ckptWriter,
		metric:              metric,
	}

	go d.batcher()
	go d.executor()
	go d.ckptManager()

	return d
}