func()

in nimo-shake/incr-sync/syncer.go [319:394]


func (d *Dispatcher) getRecords(shardIt string) {
	qos := qps.StartQoS(int(conf.Options.QpsIncr))
	defer qos.Close()

	next := &shardIt
	for {
		<-qos.Bucket

		// LOG.Info("%s bbbb0 ", d.String())

		records, err := d.dynamoStreamSession.GetRecords(&dynamodbstreams.GetRecordsInput{
			ShardIterator: next,
			Limit:         aws.Int64(conf.Options.QpsIncrBatchNum),
		})
		if err != nil {
			if aerr, ok := err.(awserr.Error); ok {

				switch aerr.Code() {
				case dynamodb.ErrCodeProvisionedThroughputExceededException:
					LOG.Warn("%s getRecords get records with iterator[%v] recv ProvisionedThroughputExceededException continue",
						d.String(), *next)
					time.Sleep(5 * time.Second)
					continue

				case request.ErrCodeSerialization:
					LOG.Warn("%s getRecords get records with iterator[%v] recv SerializationError[%v] continue",
						d.String(), *next, err)
					time.Sleep(5 * time.Second)
					continue

				case request.ErrCodeRequestError, request.CanceledErrorCode,
					request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
					request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead,
					dynamodb.ErrCodeInternalServerError:
					LOG.Warn("%s getRecords get records with iterator[%v] recv Error[%v] continue",
						d.String(), *next, err)
					time.Sleep(5 * time.Second)
					continue

				default:
					LOG.Crashf("%s getRecords scan failed[%v] errcode[%v]", d.String(), err, aerr.Code())
				}
			} else {
				LOG.Crashf("%s get records with iterator[%v] failed[%v]", d.String(), *next, err)
			}
		}

		// LOG.Info("%s bbbb1 %v", d.String(), *next)

		next = records.NextShardIterator

		LOG.Debug("getRecords shardIt[%s] record_number[%d]", shardIt, len(records.Records))

		if len(records.Records) == 0 && next != nil {
			d.shardIt = *next // update shardIt
			time.Sleep(GetRecordsInterval * time.Second)
			continue
		}

		d.metric.AddGet(uint64(len(records.Records)))

		// LOG.Info("bbbb2 ", records.Records)

		// do write
		for _, record := range records.Records {
			d.batchChan <- record
		}

		if next == nil {
			break
		}
	}

	close(d.batchChan)
	LOG.Info("%s getRecords exit", d.String())
}