nimo-shake/full-sync/document-syncer.go (131 lines of code) (raw):
package full_sync
import (
"fmt"
"time"
"nimo-shake/common"
"nimo-shake/configure"
"nimo-shake/writer"
"github.com/aws/aws-sdk-go/service/dynamodb"
LOG "github.com/vinllen/log4go"
"nimo-shake/protocal"
"sync/atomic"
)
const (
batchSize = 2 * utils.MB // mongodb limit: 16MB
batchTimeout = 1 // seconds
)
var (
UT_TestDocumentSyncer = false
UT_TestDocumentSyncer_Chan chan []interface{}
)
/*------------------------------------------------------*/
// one document link corresponding to one documentSyncer
type documentSyncer struct {
tableSyncerId int
id int // documentSyncer id
ns utils.NS
inputChan chan interface{} // parserChan in table-syncer
writer writer.Writer
collectionMetric *utils.CollectionMetric
}
func NewDocumentSyncer(tableSyncerId int, table string, id int, inputChan chan interface{},
tableDescribe *dynamodb.TableDescription, collectionMetric *utils.CollectionMetric) *documentSyncer {
ns := utils.NS{
Database: conf.Options.Id,
Collection: table,
}
w := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ns, conf.Options.LogLevel)
if w == nil {
LOG.Crashf("tableSyncer[%v] documentSyncer[%v] create writer failed", tableSyncerId, table)
}
w.PassTableDesc(tableDescribe)
return &documentSyncer{
tableSyncerId: tableSyncerId,
id: id,
inputChan: inputChan,
writer: w,
ns: ns,
collectionMetric: collectionMetric,
}
}
func (ds *documentSyncer) String() string {
return fmt.Sprintf("tableSyncer[%v] documentSyncer[%v] ns[%v]", ds.tableSyncerId, ds.id, ds.ns)
}
func (ds *documentSyncer) Close() {
ds.writer.Close()
}
func (ds *documentSyncer) Run() {
batchNumber := int(conf.Options.FullDocumentWriteBatch)
LOG.Info("%s start with batchSize[%v]", ds.String(), batchNumber)
var data interface{}
var ok bool
batchGroup := make([]interface{}, 0, batchNumber)
timeout := false
batchGroupSize := 0
exit := false
for {
StartT := time.Now()
select {
case data, ok = <-ds.inputChan:
if !ok {
exit = true
LOG.Info("%s channel already closed, flushing cache and exiting...", ds.String())
}
case <-time.After(time.Second * batchTimeout):
// timeout
timeout = true
data = nil
}
readParserChanDuration := time.Since(StartT)
LOG.Debug("exit[%v], timeout[%v], len(batchGroup)[%v], batchGroupSize[%v], data[%v]", exit, timeout,
len(batchGroup), batchGroupSize, data)
if data != nil {
if UT_TestDocumentSyncer {
batchGroup = append(batchGroup, data)
} else {
switch v := data.(type) {
case protocal.RawData:
if v.Size > 0 {
batchGroup = append(batchGroup, v.Data)
batchGroupSize += v.Size
}
case map[string]*dynamodb.AttributeValue:
batchGroup = append(batchGroup, v)
// meaningless batchGroupSize
}
}
}
if exit || timeout || len(batchGroup) >= batchNumber || batchGroupSize >= batchSize {
StartT = time.Now()
batchGroupLen := len(batchGroup)
if len(batchGroup) != 0 {
if err := ds.write(batchGroup); err != nil {
LOG.Crashf("%s write data failed[%v]", ds.String(), err)
}
batchGroup = make([]interface{}, 0, batchNumber)
batchGroupSize = 0
}
writeDestDBDuration := time.Since(StartT)
LOG.Info("%s write db batch[%v] parserChan.len[%v] readParserChanTime[%v] writeDestDbTime[%v]",
ds.String(), batchGroupLen, len(ds.inputChan), readParserChanDuration, writeDestDBDuration)
if exit {
break
}
timeout = false
}
}
go func() {
<-time.NewTimer(time.Minute * 5).C
ds.writer.Close()
LOG.Info("%s full-sync writer close", ds.String())
}()
LOG.Info("%s finish writing", ds.String())
}
func (ds *documentSyncer) write(input []interface{}) error {
LOG.Debug("%s writing data with length[%v]", ds.String(), len(input))
if len(input) == 0 {
return nil
}
if UT_TestDocumentSyncer {
UT_TestDocumentSyncer_Chan <- input
return nil
}
defer atomic.AddUint64(&ds.collectionMetric.FinishCount, uint64(len(input)))
return ds.writer.WriteBulk(input)
}