public RecordsWithSplitIds fetch()

in flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java [77:118]


    public RecordsWithSplitIds<CassandraRow> fetch() {
        Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>();
        Set<String> finishedSplits = new HashSet<>();

        Metadata clusterMetadata = cluster.getMetadata();
        String partitionKey = getPartitionKey(clusterMetadata);
        String finalQuery = generateRangeQuery(query, partitionKey);
        PreparedStatement preparedStatement = session.prepare(finalQuery);

        // Set wakeup to false to start consuming
        wakeup.compareAndSet(true, false);
        for (CassandraSplit cassandraSplit : unprocessedSplits) {
            // allow to interrupt the reading of splits especially the blocking session.execute()
            // call as requested in the API
            if (wakeup.get()) {
                break;
            }
            try {
                Token startToken =
                        clusterMetadata.newToken(cassandraSplit.getRingRangeStart().toString());
                Token endToken =
                        clusterMetadata.newToken(cassandraSplit.getRingRangeEnd().toString());
                final ResultSet resultSet =
                        session.execute(
                                preparedStatement
                                        .bind()
                                        .setToken(0, startToken)
                                        .setToken(1, endToken));
                // add all the records of the split to the output (in memory).
                // It is safe because each split has a configurable maximum memory size
                addRecordsToOutput(resultSet, cassandraSplit, recordsBySplit);
                // add the already read (or even empty) split to finished splits
                finishedSplits.add(cassandraSplit.splitId());
                // for reentrant calls: if fetch is restarted,
                // do not reprocess the already processed splits
                unprocessedSplits.remove(cassandraSplit);
            } catch (Exception ex) {
                LOG.error("Error while reading split ", ex);
            }
        }
        return new RecordsBySplits<>(recordsBySplit, finishedSplits);
    }