public RecordsWithSplitIds fetch()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java [97:174]


    public RecordsWithSplitIds<Record> fetch() throws IOException {
        if (assignedSplits.isEmpty()) {
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        DynamoDbStreamsShardSplitWithContext splitContext = assignedSplits.poll();

        if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
            assignedSplits.add(splitContext);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        long currentTime = System.currentTimeMillis();
        long nextEligibleTime = getNextEligibleTime(splitContext);

        LOG.debug(
                "Polling split: {}, currentTime: {}, eligibleTime: {}, wasEmptyPoll: {}",
                splitContext.splitState.getSplitId(),
                currentTime,
                nextEligibleTime,
                splitContext.wasLastPollEmpty);

        // Check if split is not ready due to empty poll and non-empty poll delay
        if (nextEligibleTime > currentTime) {
            assignedSplits.add(splitContext);
            sleep(1);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }

        GetRecordsResponse getRecordsResponse =
                dynamodbStreams.getRecords(
                        splitContext.splitState.getStreamArn(),
                        splitContext.splitState.getShardId(),
                        splitContext.splitState.getNextStartingPosition());
        boolean isComplete = getRecordsResponse.nextShardIterator() == null;
        boolean isEmptyPoll = hasNoRecords(getRecordsResponse);

        splitContext.lastPollTimeMillis = currentTime;
        splitContext.wasLastPollEmpty = isEmptyPoll;

        if (isEmptyPoll) {
            if (isComplete) {
                return new DynamoDbStreamRecordsWithSplitIds(
                        Collections.emptyIterator(), splitContext.splitState.getSplitId(), true);
            } else {
                assignedSplits.add(splitContext);
                return INCOMPLETE_SHARD_EMPTY_RECORDS;
            }
        } else {
            DynamoDbStreamsShardMetrics shardMetrics =
                    shardMetricGroupMap.get(splitContext.splitState.getShardId());
            Record lastRecord =
                    getRecordsResponse.records().get(getRecordsResponse.records().size() - 1);
            shardMetrics.setMillisBehindLatest(
                    Math.max(
                            System.currentTimeMillis()
                                    - lastRecord
                                            .dynamodb()
                                            .approximateCreationDateTime()
                                            .toEpochMilli(),
                            0));
        }

        splitContext.splitState.setNextStartingPosition(
                StartingPosition.continueFromSequenceNumber(
                        getRecordsResponse
                                .records()
                                .get(getRecordsResponse.records().size() - 1)
                                .dynamodb()
                                .sequenceNumber()));

        if (!isComplete) {
            assignedSplits.add(splitContext);
        }
        return new DynamoDbStreamRecordsWithSplitIds(
                getRecordsResponse.records().iterator(),
                splitContext.splitState.getSplitId(),
                isComplete);
    }