in nimo-shake/incr-sync/syncer.go [404:526]
func (d *Dispatcher) batcher() {
node := &ExecuteNode{
operate: make([]interface{}, 0, BatcherNumber),
index: make([]interface{}, 0, BatcherNumber),
}
var preEvent string
var batchNr int
var batchSize int
for {
var record *dynamodbstreams.Record
ok := true
timeout := false
select {
case record, ok = <-d.batchChan:
case <-time.After(time.Second * IncrBatcherTimeout):
timeout = true
record = nil
}
if !ok || timeout {
if len(node.operate) != 0 || len(node.index) != 0 {
d.executorChan <- node
node = &ExecuteNode{
tp: "",
operate: make([]interface{}, 0, BatcherNumber),
}
preEvent = ""
batchNr = 0
batchSize = 0
}
if !ok {
// channel close
break
}
// timeout
continue
}
if *record.EventName != preEvent || batchNr >= BatcherNumber || batchSize >= BatcherSize {
// need split
if len(node.operate) != 0 || len(node.index) != 0 {
// preEvent != ""
d.executorChan <- node
}
node = &ExecuteNode{
tp: *record.EventName,
operate: make([]interface{}, 0, BatcherNumber), // need fetch data field when type is RawData
index: make([]interface{}, 0, BatcherNumber), // index list
}
preEvent = *record.EventName
batchNr = 0
batchSize = 0
}
// parse index
index, err := d.converter.Run(record.Dynamodb.Keys)
if err != nil {
LOG.Crashf("%s convert parse[%v] failed[%v]", d.String(), record.Dynamodb.Keys, err)
}
// LOG.Info("~~~op[%v] data: %v", *record.EventName, record)
// batch into list
switch *record.EventName {
case EventInsert:
value, err := d.converter.Run(record.Dynamodb.NewImage)
if err != nil {
LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
}
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.operate = append(node.operate, value.(protocal.RawData).Data)
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.operate = append(node.operate, value)
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
case EventMODIFY:
value, err := d.converter.Run(record.Dynamodb.NewImage)
if err != nil {
LOG.Crashf("%s converter do insert meets error[%v]", d.String(), err)
}
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.operate = append(node.operate, value.(protocal.RawData).Data)
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.operate = append(node.operate, value)
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
case EventRemove:
switch d.targetWriter.(type) {
case *writer.MongoCommunityWriter:
node.index = append(node.index, index.(protocal.RawData).Data)
case *writer.DynamoProxyWriter:
node.index = append(node.index, index)
default:
LOG.Crashf("unknown operator")
}
default:
LOG.Crashf("%s unknown event name[%v]", d.String(), *record.EventName)
}
node.lastSequenceNumber = *record.Dynamodb.SequenceNumber
if record.Dynamodb.ApproximateCreationDateTime != nil {
node.approximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime.String()
}
batchNr += 1
// batchSize += index.Size
}
LOG.Info("%s batcher exit", d.String())
close(d.executorChan)
}