in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java [97:174]
public RecordsWithSplitIds<Record> fetch() throws IOException {
if (assignedSplits.isEmpty()) {
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
DynamoDbStreamsShardSplitWithContext splitContext = assignedSplits.poll();
if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
assignedSplits.add(splitContext);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
long currentTime = System.currentTimeMillis();
long nextEligibleTime = getNextEligibleTime(splitContext);
LOG.debug(
"Polling split: {}, currentTime: {}, eligibleTime: {}, wasEmptyPoll: {}",
splitContext.splitState.getSplitId(),
currentTime,
nextEligibleTime,
splitContext.wasLastPollEmpty);
// Check if split is not ready due to empty poll and non-empty poll delay
if (nextEligibleTime > currentTime) {
assignedSplits.add(splitContext);
sleep(1);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
GetRecordsResponse getRecordsResponse =
dynamodbStreams.getRecords(
splitContext.splitState.getStreamArn(),
splitContext.splitState.getShardId(),
splitContext.splitState.getNextStartingPosition());
boolean isComplete = getRecordsResponse.nextShardIterator() == null;
boolean isEmptyPoll = hasNoRecords(getRecordsResponse);
splitContext.lastPollTimeMillis = currentTime;
splitContext.wasLastPollEmpty = isEmptyPoll;
if (isEmptyPoll) {
if (isComplete) {
return new DynamoDbStreamRecordsWithSplitIds(
Collections.emptyIterator(), splitContext.splitState.getSplitId(), true);
} else {
assignedSplits.add(splitContext);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
} else {
DynamoDbStreamsShardMetrics shardMetrics =
shardMetricGroupMap.get(splitContext.splitState.getShardId());
Record lastRecord =
getRecordsResponse.records().get(getRecordsResponse.records().size() - 1);
shardMetrics.setMillisBehindLatest(
Math.max(
System.currentTimeMillis()
- lastRecord
.dynamodb()
.approximateCreationDateTime()
.toEpochMilli(),
0));
}
splitContext.splitState.setNextStartingPosition(
StartingPosition.continueFromSequenceNumber(
getRecordsResponse
.records()
.get(getRecordsResponse.records().size() - 1)
.dynamodb()
.sequenceNumber()));
if (!isComplete) {
assignedSplits.add(splitContext);
}
return new DynamoDbStreamRecordsWithSplitIds(
getRecordsResponse.records().iterator(),
splitContext.splitState.getSplitId(),
isComplete);
}