in nimo-shake/incr-sync/syncer.go [319:394]
func (d *Dispatcher) getRecords(shardIt string) {
qos := qps.StartQoS(int(conf.Options.QpsIncr))
defer qos.Close()
next := &shardIt
for {
<-qos.Bucket
// LOG.Info("%s bbbb0 ", d.String())
records, err := d.dynamoStreamSession.GetRecords(&dynamodbstreams.GetRecordsInput{
ShardIterator: next,
Limit: aws.Int64(conf.Options.QpsIncrBatchNum),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
LOG.Warn("%s getRecords get records with iterator[%v] recv ProvisionedThroughputExceededException continue",
d.String(), *next)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeSerialization:
LOG.Warn("%s getRecords get records with iterator[%v] recv SerializationError[%v] continue",
d.String(), *next, err)
time.Sleep(5 * time.Second)
continue
case request.ErrCodeRequestError, request.CanceledErrorCode,
request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead,
dynamodb.ErrCodeInternalServerError:
LOG.Warn("%s getRecords get records with iterator[%v] recv Error[%v] continue",
d.String(), *next, err)
time.Sleep(5 * time.Second)
continue
default:
LOG.Crashf("%s getRecords scan failed[%v] errcode[%v]", d.String(), err, aerr.Code())
}
} else {
LOG.Crashf("%s get records with iterator[%v] failed[%v]", d.String(), *next, err)
}
}
// LOG.Info("%s bbbb1 %v", d.String(), *next)
next = records.NextShardIterator
LOG.Debug("getRecords shardIt[%s] record_number[%d]", shardIt, len(records.Records))
if len(records.Records) == 0 && next != nil {
d.shardIt = *next // update shardIt
time.Sleep(GetRecordsInterval * time.Second)
continue
}
d.metric.AddGet(uint64(len(records.Records)))
// LOG.Info("bbbb2 ", records.Records)
// do write
for _, record := range records.Records {
d.batchChan <- record
}
if next == nil {
break
}
}
close(d.batchChan)
LOG.Info("%s getRecords exit", d.String())
}