func()

in nimo-shake/full-sync/table-syncer.go [149:238]


func (ts *tableSyncer) fetcher() {
	LOG.Info("%s start fetcher with %v reader", ts.String(), conf.Options.FullReadConcurrency)

	qos := qps.StartQoS(int(conf.Options.QpsFull))
	defer qos.Close()

	var wg sync.WaitGroup
	wg.Add(int(conf.Options.FullReadConcurrency))
	for i := 0; i < int(conf.Options.FullReadConcurrency); i++ {
		go func(segmentId int64) {
			LOG.Info("%s start reader[%v]", ts.String(), segmentId)
			defer LOG.Info("%s stop reader[%v]", ts.String(), segmentId)

			// init nil
			var previousKey map[string]*dynamodb.AttributeValue
			for {
				<-qos.Bucket

				startT := time.Now()
				scanInput := &dynamodb.ScanInput{
					TableName:         aws.String(ts.ns.Collection),
					TotalSegments:     aws.Int64(int64(conf.Options.FullReadConcurrency)),
					Segment:           aws.Int64(segmentId),
					ExclusiveStartKey: previousKey,
					Limit:             aws.Int64(conf.Options.QpsFullBatchNum),
				}
				if len(conf.Options.FullFilterExpression) > 0 {
					scanInput.FilterExpression = aws.String(conf.Options.FullFilterExpression)
					scanInput.ExpressionAttributeValues = utils.ParseAttributes(conf.Options.FullFilterAttributeValues)
				}
				out, err := ts.sourceConn.Scan(scanInput)
				if err != nil {
					// TODO check network error and retry
					if aerr, ok := err.(awserr.Error); ok {

						switch aerr.Code() {
						case dynamodb.ErrCodeProvisionedThroughputExceededException:
							LOG.Warn("%s fetcher reader[%v] recv ProvisionedThroughputExceededException continue",
								ts.String(), segmentId)
							time.Sleep(5 * time.Second)
							continue

						case request.ErrCodeSerialization:
							LOG.Warn("%s fetcher reader[%v] recv SerializationError[%v] continue",
								ts.String(), segmentId, err)
							time.Sleep(5 * time.Second)
							continue

						case request.ErrCodeRequestError, request.CanceledErrorCode,
							request.ErrCodeResponseTimeout, request.HandlerResponseTimeout,
							request.WaiterResourceNotReadyErrorCode, request.ErrCodeRead:
							LOG.Warn("%s fetcher reader[%v] recv Error[%v] continue",
								ts.String(), segmentId, err)
							time.Sleep(5 * time.Second)
							continue

						default:
							LOG.Crashf("%s fetcher scan failed[%v] errcode[%v]", ts.String(), err, aerr.Code())
						}
					} else {
						LOG.Crashf("%s fetcher scan failed[%v]", ts.String(), err)
					}
				}
				scanDuration := time.Since(startT)

				// LOG.Info(*out.Count)

				// pass result to parser
				startT = time.Now()
				ts.fetcherChan <- out
				writeFetcherChan := time.Since(startT)

				LOG.Info("%s fetcher reader[%v] ts.fetcherChan.len[%v] "+
					"scanTime[%v] scanCount[%v] writeFetcherChanTime[%v]",
					ts.String(), segmentId, len(ts.fetcherChan), scanDuration, *out.Count, writeFetcherChan)

				previousKey = out.LastEvaluatedKey
				if previousKey == nil {
					// complete
					break
				}
			}
			wg.Done()
		}(int64(i))
	}
	wg.Wait()

	LOG.Info("%s close fetcher", ts.String())
	close(ts.fetcherChan)
}