in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java [482:544]
private void makeRetrievalAttempt() {
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
final List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())
.cacheEntryTime(lastSuccessfulCall)
.isAtShardEnd(getRecordsRetrievalStrategy.dataFetcher().isShardEndReached())
.childShards(getRecordsResult.childShards())
.build();
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", streamAndShardId, e);
MetricsUtil.addStreamId(scope, streamId);
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
publisherSession.dataFetcher().restartIterator();
} catch (ProvisionedThroughputExceededException e) {
// Update the lastSuccessfulCall if we get a throttling exception so that we back off idleMillis
// for the next call
lastSuccessfulCall = Instant.now();
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
} finally {
MetricsUtil.endScope(scope);
}
} else {
//
// Consumer isn't ready to receive new records will allow prefetch counters to pause
//
try {
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", streamAndShardId);
}
}
}