in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java [81:145]
public RecordsWithSplitIds<Record> fetch() throws IOException {
KinesisShardSplitState splitState = assignedSplits.poll();
// When there are no assigned splits, return quickly
if (skipWhenNoAssignedSplit(splitState)) {
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
if (skipUntilScheduledFetchTime(splitState)) {
assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
// When assigned splits have been paused, skip the split
if (pausedSplitIds.contains(splitState.getSplitId())) {
assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
RecordBatch recordBatch;
try {
recordBatch = fetchRecords(splitState);
scheduleNextFetchTime(splitState, recordBatch);
} catch (ResourceNotFoundException e) {
LOG.warn(
"Failed to fetch records from shard {}: shard no longer exists. Marking split as complete",
splitState.getSplitId());
return new KinesisRecordsWithSplitIds(
Collections.emptyIterator(), splitState.getSplitId(), true);
}
if (recordBatch == null) {
assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
if (!recordBatch.isCompleted()) {
assignedSplits.add(splitState);
}
shardMetricGroupMap
.get(splitState.getShardId())
.setMillisBehindLatest(recordBatch.getMillisBehindLatest());
if (recordBatch.getRecords().isEmpty()) {
if (recordBatch.isCompleted()) {
return new KinesisRecordsWithSplitIds(
Collections.emptyIterator(), splitState.getSplitId(), true);
} else {
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
}
splitState.setNextStartingPosition(
StartingPosition.continueFromSequenceNumber(
recordBatch
.getRecords()
.get(recordBatch.getRecords().size() - 1)
.sequenceNumber()));
return new KinesisRecordsWithSplitIds(
recordBatch.getRecords().iterator(),
splitState.getSplitId(),
recordBatch.isCompleted());
}