flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java [133:145]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private StartingPosition getNextStartingPosition(final SequenceNumber latestSequenceNumber) {
        // When consuming from a timestamp sentinel/AT_TIMESTAMP ShardIteratorType.
        // If the first RecordBatch is empty, then the latestSequenceNumber would be the timestamp
        // sentinel.
        // This is because we have not yet received any real sequence numbers on this shard.
        // In this condition we should retry from the previous starting position (AT_TIMESTAMP).
        if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(latestSequenceNumber)) {
            Preconditions.checkState(nextStartingPosition.getShardIteratorType() == AT_TIMESTAMP);
            return nextStartingPosition;
        } else {
            return StartingPosition.continueFromSequenceNumber(latestSequenceNumber);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java [141:153]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private StartingPosition getNextStartingPosition(final SequenceNumber latestSequenceNumber) {
        // When consuming from a timestamp sentinel/AT_TIMESTAMP ShardIteratorType.
        // If the first RecordBatch has no deaggregated records, then the latestSequenceNumber would
        // be the timestamp sentinel.
        // This is because we have not yet received any real sequence numbers on this shard.
        // In this condition we should retry from the previous starting position (AT_TIMESTAMP).
        if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(latestSequenceNumber)) {
            Preconditions.checkState(nextStartingPosition.getShardIteratorType() == AT_TIMESTAMP);
            return nextStartingPosition;
        } else {
            return StartingPosition.continueFromSequenceNumber(latestSequenceNumber);
        }
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



