in nimo-shake/full-sync/document-syncer.go [70:143]
func (ds *documentSyncer) Run() {
batchNumber := int(conf.Options.FullDocumentWriteBatch)
LOG.Info("%s start with batchSize[%v]", ds.String(), batchNumber)
var data interface{}
var ok bool
batchGroup := make([]interface{}, 0, batchNumber)
timeout := false
batchGroupSize := 0
exit := false
for {
StartT := time.Now()
select {
case data, ok = <-ds.inputChan:
if !ok {
exit = true
LOG.Info("%s channel already closed, flushing cache and exiting...", ds.String())
}
case <-time.After(time.Second * batchTimeout):
// timeout
timeout = true
data = nil
}
readParserChanDuration := time.Since(StartT)
LOG.Debug("exit[%v], timeout[%v], len(batchGroup)[%v], batchGroupSize[%v], data[%v]", exit, timeout,
len(batchGroup), batchGroupSize, data)
if data != nil {
if UT_TestDocumentSyncer {
batchGroup = append(batchGroup, data)
} else {
switch v := data.(type) {
case protocal.RawData:
if v.Size > 0 {
batchGroup = append(batchGroup, v.Data)
batchGroupSize += v.Size
}
case map[string]*dynamodb.AttributeValue:
batchGroup = append(batchGroup, v)
// meaningless batchGroupSize
}
}
}
if exit || timeout || len(batchGroup) >= batchNumber || batchGroupSize >= batchSize {
StartT = time.Now()
batchGroupLen := len(batchGroup)
if len(batchGroup) != 0 {
if err := ds.write(batchGroup); err != nil {
LOG.Crashf("%s write data failed[%v]", ds.String(), err)
}
batchGroup = make([]interface{}, 0, batchNumber)
batchGroupSize = 0
}
writeDestDBDuration := time.Since(StartT)
LOG.Info("%s write db batch[%v] parserChan.len[%v] readParserChanTime[%v] writeDestDbTime[%v]",
ds.String(), batchGroupLen, len(ds.inputChan), readParserChanDuration, writeDestDBDuration)
if exit {
break
}
timeout = false
}
}
go func() {
<-time.NewTimer(time.Minute * 5).C
ds.writer.Close()
LOG.Info("%s full-sync writer close", ds.String())
}()
LOG.Info("%s finish writing", ds.String())
}