private void makeRetrievalAttempt()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java [482:544]


        private void makeRetrievalAttempt() {
            MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
            if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
                try {

                    sleepBeforeNextCall();
                    GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
                    lastSuccessfulCall = Instant.now();

                    final List<KinesisClientRecord> records = getRecordsResult.records().stream()
                            .map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
                    ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
                            .records(records)
                            .millisBehindLatest(getRecordsResult.millisBehindLatest())
                            .cacheEntryTime(lastSuccessfulCall)
                            .isAtShardEnd(getRecordsRetrievalStrategy.dataFetcher().isShardEndReached())
                            .childShards(getRecordsResult.childShards())
                            .build();

                    PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
                            calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
                            PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
                    publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
                    addArrivedRecordsInput(recordsRetrieved);
                    drainQueueForRequests();
                } catch (PositionResetException pse) {
                    throw pse;
                } catch (RetryableRetrievalException rre) {
                    log.info("{} :  Timeout occurred while waiting for response from Kinesis.  Will retry the request.", streamAndShardId);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.info("{} :  Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
                } catch (ExpiredIteratorException e) {
                    log.info("{} :  records threw ExpiredIteratorException - restarting"
                            + " after greatest seqNum passed to customer", streamAndShardId, e);

                    MetricsUtil.addStreamId(scope, streamId);
                    scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);

                    publisherSession.dataFetcher().restartIterator();
                } catch (ProvisionedThroughputExceededException e) {
                    // Update the lastSuccessfulCall if we get a throttling exception so that we back off idleMillis
                    // for the next call
                    lastSuccessfulCall = Instant.now();
                    log.error("{} :  Exception thrown while fetching records from Kinesis", streamAndShardId, e);
                } catch (SdkException e) {
                    log.error("{} :  Exception thrown while fetching records from Kinesis", streamAndShardId, e);
                } finally {
                    MetricsUtil.endScope(scope);
                }
            } else {
                //
                // Consumer isn't ready to receive new records will allow prefetch counters to pause
                //
                try {
                    publisherSession.prefetchCounters().waitForConsumer();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    log.info("{} :  Thread was interrupted while waiting for the consumer.  " +
                            "Shutdown has probably been started", streamAndShardId);
                }
            }
        }