in nimo-shake/full-sync/table-syncer.go [149:238]
func (ts *tableSyncer) fetcher() {
LOG.Info("%s start fetcher with %v reader", ts.String(), conf.Options.FullReadConcurrency)
qos := qps.StartQoS(int(conf.Options.QpsFull))
defer qos.Close()
var wg sync.WaitGroup
wg.Add(int(conf.Options.FullReadConcurrency))
for i := 0; i < int(conf.Options.FullReadConcurrency); i++ {
go func(segmentId int64) {
LOG.Info("%s start reader[%v]", ts.String(), segmentId)
defer LOG.Info("%s stop reader[%v]", ts.String(), segmentId)
// init nil
var previousKey map[string]*dynamodb.AttributeValue
for {
<-qos.Bucket
startT := time.Now()
scanInput := &dynamodb.ScanInput{
TableName: aws.String(ts.ns.Collection),
TotalSegments: aws.Int64(int64(conf.Options.FullReadConcurrency)),
Segment: aws.Int64(segmentId),
ExclusiveStartKey: previousKey,
Limit: aws.Int64(conf.Options.QpsFullBatchNum),
}
if len(conf.Options.FullFilterExpression) > 0 {
scanInput.FilterExpression = aws.String(conf.Options.FullFilterExpression)
scanInput.ExpressionAttributeValues = utils.ParseAttributes(conf.Options.FullFilterAttributeValues)
}
out, err := ts.sourceConn.Scan(scanInput)
if err != nil {
// TODO check network error and retry
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
LOG.Warn("%s fetcher reader[%v] recv ProvisionedThroughputExceededException continue",
ts.String(), segmentId)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeSerialization:
LOG.Warn("%s fetcher reader[%v] recv SerializationError[%v] continue",
ts.String(), segmentId, err)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeRequestError, request.CanceledErrorCode,
request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead:
LOG.Warn("%s fetcher reader[%v] recv Error[%v] continue",
ts.String(), segmentId, err)
time.Sleep(5 * time.Second)
continue
default:
LOG.Crashf("%s fetcher scan failed[%v] errcode[%v]", ts.String(), err, aerr.Code())
}
} else {
LOG.Crashf("%s fetcher scan failed[%v]", ts.String(), err)
}
}
scanDuration := time.Since(startT)
// LOG.Info(*out.Count)
// pass result to parser
startT = time.Now()
ts.fetcherChan <- out
writeFetcherChan := time.Since(startT)
LOG.Info("%s fetcher reader[%v] ts.fetcherChan.len[%v] "+
"scanTime[%v] scanCount[%v] writeFetcherChanTime[%v]",
ts.String(), segmentId, len(ts.fetcherChan), scanDuration, *out.Count, writeFetcherChan)
previousKey = out.LastEvaluatedKey
if previousKey == nil {
// complete
break
}
}
wg.Done()
}(int64(i))
}
wg.Wait()
LOG.Info("%s close fetcher", ts.String())
close(ts.fetcherChan)
}