nimo-shake/full-sync/table-syncer.go (218 lines of code) (raw):
package full_sync
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
utils "nimo-shake/common"
conf "nimo-shake/configure"
"nimo-shake/protocal"
"nimo-shake/qps"
"nimo-shake/writer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
LOG "github.com/vinllen/log4go"
)
const (
fetcherChanSize = 1024
parserChanSize = 81920
)
type tableSyncer struct {
id int
ns utils.NS
sourceConn *dynamodb.DynamoDB
sourceTableDescribe *dynamodb.TableDescription
fetcherChan chan *dynamodb.ScanOutput // chan between fetcher and parser
parserChan chan interface{} // chan between parser and writer
converter protocal.Converter // converter
collectionMetric *utils.CollectionMetric
}
func NewTableSyncer(id int, table string, collectionMetric *utils.CollectionMetric) *tableSyncer {
sourceConn, err := utils.CreateDynamoSession(conf.Options.LogLevel)
if err != nil {
LOG.Error("tableSyncer[%v] with table[%v] create dynamodb session error[%v]", id, table, err)
return nil
}
// describe source table
tableDescription, err := sourceConn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(table),
})
if err != nil {
LOG.Error("tableSyncer[%v] with table[%v] describe failed[%v]", id, table, err)
return nil
}
converter := protocal.NewConverter(conf.Options.ConvertType)
if converter == nil {
LOG.Error("tableSyncer[%v] with table[%v] create converter failed", id, table)
return nil
}
return &tableSyncer{
id: id,
sourceConn: sourceConn,
sourceTableDescribe: tableDescription.Table,
converter: converter,
ns: utils.NS{
Database: conf.Options.Id,
Collection: table,
},
collectionMetric: collectionMetric,
}
}
func (ts *tableSyncer) String() string {
return fmt.Sprintf("tableSyncer[%v] with table[%v]", ts.id, ts.ns.Collection)
}
func (ts *tableSyncer) Sync() {
ts.fetcherChan = make(chan *dynamodb.ScanOutput, fetcherChanSize)
ts.parserChan = make(chan interface{}, parserChanSize)
targetWriter := writer.NewWriter(conf.Options.TargetType, conf.Options.TargetAddress, ts.ns, conf.Options.LogLevel)
if targetWriter == nil {
LOG.Crashf("%s create writer failed", ts)
return
}
// create table and index with description
if err := targetWriter.CreateTable(ts.sourceTableDescribe); err != nil {
LOG.Crashf("%s create table failed: %v", ts, err)
return
}
// wait dynamo proxy to sync cache
time.Sleep(10 * time.Second)
if conf.Options.SyncSchemaOnly {
LOG.Info("sync_schema_only enabled, %s exits", ts)
return
}
// total table item count
totalCount := ts.count()
ts.collectionMetric.CollectionStatus = utils.StatusProcessing
ts.collectionMetric.TotalCount = totalCount
// start fetcher to fetch all data from DynamoDB
go ts.fetcher()
// start parser to get data from fetcher and write into writer.
// we can also start several parsers to accelerate
var wgParser sync.WaitGroup
wgParser.Add(int(conf.Options.FullDocumentParser))
for i := 0; i < int(conf.Options.FullDocumentParser); i++ {
go func(id int) {
ts.parser(id)
wgParser.Done()
}(i)
}
// start writer
var wgWriter sync.WaitGroup
wgWriter.Add(int(conf.Options.FullDocumentConcurrency))
for i := 0; i < int(conf.Options.FullDocumentConcurrency); i++ {
go func(id int) {
LOG.Info("%s create document syncer with id[%v]", ts, id)
ds := NewDocumentSyncer(ts.id, ts.ns.Collection, id, ts.parserChan, ts.sourceTableDescribe,
ts.collectionMetric)
ds.Run()
LOG.Info("%s document syncer with id[%v] exit", ts, id)
wgWriter.Done()
}(i)
}
LOG.Info("%s wait all parsers exiting", ts.String())
wgParser.Wait() // wait all parser exit
close(ts.parserChan) // close parser channel
LOG.Info("%s all parsers exited, wait all writers exiting", ts.String())
wgWriter.Wait() // wait all writer exit
ts.collectionMetric.CollectionStatus = utils.StatusFinish
LOG.Info("%s finish syncing table", ts.String())
}
func (ts *tableSyncer) Close() {
// TODO, dynamo-session doesn't have close function?
}
func (ts *tableSyncer) fetcher() {
LOG.Info("%s start fetcher with %v reader", ts.String(), conf.Options.FullReadConcurrency)
qos := qps.StartQoS(int(conf.Options.QpsFull))
defer qos.Close()
var wg sync.WaitGroup
wg.Add(int(conf.Options.FullReadConcurrency))
for i := 0; i < int(conf.Options.FullReadConcurrency); i++ {
go func(segmentId int64) {
LOG.Info("%s start reader[%v]", ts.String(), segmentId)
defer LOG.Info("%s stop reader[%v]", ts.String(), segmentId)
// init nil
var previousKey map[string]*dynamodb.AttributeValue
for {
<-qos.Bucket
startT := time.Now()
scanInput := &dynamodb.ScanInput{
TableName: aws.String(ts.ns.Collection),
TotalSegments: aws.Int64(int64(conf.Options.FullReadConcurrency)),
Segment: aws.Int64(segmentId),
ExclusiveStartKey: previousKey,
Limit: aws.Int64(conf.Options.QpsFullBatchNum),
}
if len(conf.Options.FullFilterExpression) > 0 {
scanInput.FilterExpression = aws.String(conf.Options.FullFilterExpression)
scanInput.ExpressionAttributeValues = utils.ParseAttributes(conf.Options.FullFilterAttributeValues)
}
out, err := ts.sourceConn.Scan(scanInput)
if err != nil {
// TODO check network error and retry
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
LOG.Warn("%s fetcher reader[%v] recv ProvisionedThroughputExceededException continue",
ts.String(), segmentId)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeSerialization:
LOG.Warn("%s fetcher reader[%v] recv SerializationError[%v] continue",
ts.String(), segmentId, err)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeRequestError, request.CanceledErrorCode,
request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead:
LOG.Warn("%s fetcher reader[%v] recv Error[%v] continue",
ts.String(), segmentId, err)
time.Sleep(5 * time.Second)
continue
default:
LOG.Crashf("%s fetcher scan failed[%v] errcode[%v]", ts.String(), err, aerr.Code())
}
} else {
LOG.Crashf("%s fetcher scan failed[%v]", ts.String(), err)
}
}
scanDuration := time.Since(startT)
// LOG.Info(*out.Count)
// pass result to parser
startT = time.Now()
ts.fetcherChan <- out
writeFetcherChan := time.Since(startT)
LOG.Info("%s fetcher reader[%v] ts.fetcherChan.len[%v] "+
"scanTime[%v] scanCount[%v] writeFetcherChanTime[%v]",
ts.String(), segmentId, len(ts.fetcherChan), scanDuration, *out.Count, writeFetcherChan)
previousKey = out.LastEvaluatedKey
if previousKey == nil {
// complete
break
}
}
wg.Done()
}(int64(i))
}
wg.Wait()
LOG.Info("%s close fetcher", ts.String())
close(ts.fetcherChan)
}
func (ts *tableSyncer) parser(id int) {
LOG.Info("%s start parser[%v]", ts.String(), id)
for {
startT := time.Now()
data, ok := <-ts.fetcherChan
if !ok {
break
}
readFetcherChanDuration := time.Since(startT)
LOG.Debug("%s parser[%v] read data[%v]", ts.String(), id, data)
var parserDuration, writeParseChanDuration time.Duration = 0, 0
list := data.Items
for _, ele := range list {
startT = time.Now()
out, err := ts.converter.Run(ele)
parserDuration = parserDuration + time.Since(startT)
if err != nil {
LOG.Crashf("%s parser[%v] parse ele[%v] failed[%v]", ts.String(), id, ele, err)
}
startT = time.Now()
ts.parserChan <- out
writeParseChanDuration = writeParseChanDuration + time.Since(startT)
}
LOG.Info("%s parser parser[%v] readFetcherChanTime[%v] parserTime[%v]"+
" writeParseChantime[%v] parserChan.len[%v]",
ts.String(), id, readFetcherChanDuration, parserDuration, writeParseChanDuration, len(ts.parserChan))
}
LOG.Info("%s close parser", ts.String())
}
func (ts *tableSyncer) count() uint64 {
return uint64(*ts.sourceTableDescribe.ItemCount)
}