public RecordsWithSplitIds fetch()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java [81:145]


    public RecordsWithSplitIds<Record> fetch() throws IOException {
        KinesisShardSplitState splitState = assignedSplits.poll();

        // When there are no assigned splits, return quickly
        if (skipWhenNoAssignedSplit(splitState)) {
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        if (skipUntilScheduledFetchTime(splitState)) {
            assignedSplits.add(splitState);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        // When assigned splits have been paused, skip the split
        if (pausedSplitIds.contains(splitState.getSplitId())) {
            assignedSplits.add(splitState);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        RecordBatch recordBatch;
        try {
            recordBatch = fetchRecords(splitState);
            scheduleNextFetchTime(splitState, recordBatch);
        } catch (ResourceNotFoundException e) {
            LOG.warn(
                    "Failed to fetch records from shard {}: shard no longer exists. Marking split as complete",
                    splitState.getSplitId());
            return new KinesisRecordsWithSplitIds(
                    Collections.emptyIterator(), splitState.getSplitId(), true);
        }

        if (recordBatch == null) {
            assignedSplits.add(splitState);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        if (!recordBatch.isCompleted()) {
            assignedSplits.add(splitState);
        }

        shardMetricGroupMap
                .get(splitState.getShardId())
                .setMillisBehindLatest(recordBatch.getMillisBehindLatest());

        if (recordBatch.getRecords().isEmpty()) {
            if (recordBatch.isCompleted()) {
                return new KinesisRecordsWithSplitIds(
                        Collections.emptyIterator(), splitState.getSplitId(), true);
            } else {
                return INCOMPLETE_SHARD_EMPTY_RECORDS;
            }
        }

        splitState.setNextStartingPosition(
                StartingPosition.continueFromSequenceNumber(
                        recordBatch
                                .getRecords()
                                .get(recordBatch.getRecords().size() - 1)
                                .sequenceNumber()));

        return new KinesisRecordsWithSplitIds(
                recordBatch.getRecords().iterator(),
                splitState.getSplitId(),
                recordBatch.isCompleted());
    }