public GetRecordsResponse getRecords()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java [92:141]


    public GetRecordsResponse getRecords(
            String streamArn, String shardId, StartingPosition startingPosition) {
        String shardIterator =
                shardIdToIteratorStore.computeIfAbsent(
                        shardId, (s) -> getShardIterator(streamArn, s, startingPosition));

        if (shardIterator == null) {
            return EMPTY_GET_RECORDS_RESPONSE;
        }
        try {
            GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
            if (getRecordsResponse.nextShardIterator() != null) {
                shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
            }
            return getRecordsResponse;
        } catch (ExpiredIteratorException e) {
            LOG.info(
                    "Received ExpiredIteratorException from GetRecords. "
                            + "Calling GetShardIterator for shard: {} with position: {}",
                    startingPosition,
                    shardId);
            // Eagerly retry getRecords() if the iterator is expired
            shardIterator = getShardIterator(streamArn, shardId, startingPosition);
            GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
            if (getRecordsResponse.nextShardIterator() != null) {
                shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
            }
            return getRecordsResponse;
        } catch (TrimmedDataAccessException e) {
            // TrimmedDataAccessException means that the record pointed by shard iterator has
            // expired.
            // We should read the shard back from TRIM_HORIZON
            LOG.warn(
                    "Received TrimmedDataAccessException from GetRecords. "
                            + "Calling GetShardIterator for shard: {} with TRIM_HORIZON",
                    shardId);
            shardIterator = getShardIterator(streamArn, shardId, StartingPosition.fromStart());
            GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
            if (getRecordsResponse.nextShardIterator() != null) {
                shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
            }
            return getRecordsResponse;
        } catch (ResourceNotFoundException e) {
            LOG.warn(
                    "Received ResourceNotFoundException from GetRecords for shard: {}. "
                            + "This might indicate that there is restore happening from stale snapshot or data loss from backpressure",
                    shardId);
            return EMPTY_GET_RECORDS_RESPONSE;
        }
    }