in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java [62:95]
public RecordsWithSplitIds<Record> fetch() throws IOException {
KinesisShardSplitState splitState = assignedSplits.poll();
if (splitState == null) {
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
GetRecordsResponse getRecordsResponse =
kinesis.getRecords(
splitState.getStreamArn(),
splitState.getShardId(),
splitState.getNextStartingPosition());
boolean isComplete = getRecordsResponse.nextShardIterator() == null;
if (hasNoRecords(getRecordsResponse)) {
if (isComplete) {
return new KinesisRecordsWithSplitIds(
Collections.emptyIterator(), splitState.getSplitId(), true);
} else {
assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
}
splitState.setNextStartingPosition(
StartingPosition.continueFromSequenceNumber(
getRecordsResponse
.records()
.get(getRecordsResponse.records().size() - 1)
.sequenceNumber()));
assignedSplits.add(splitState);
return new KinesisRecordsWithSplitIds(
getRecordsResponse.records().iterator(), splitState.getSplitId(), isComplete);
}