in flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java [77:118]
public RecordsWithSplitIds<CassandraRow> fetch() {
Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
Metadata clusterMetadata = cluster.getMetadata();
String partitionKey = getPartitionKey(clusterMetadata);
String finalQuery = generateRangeQuery(query, partitionKey);
PreparedStatement preparedStatement = session.prepare(finalQuery);
// Set wakeup to false to start consuming
wakeup.compareAndSet(true, false);
for (CassandraSplit cassandraSplit : unprocessedSplits) {
// allow to interrupt the reading of splits especially the blocking session.execute()
// call as requested in the API
if (wakeup.get()) {
break;
}
try {
Token startToken =
clusterMetadata.newToken(cassandraSplit.getRingRangeStart().toString());
Token endToken =
clusterMetadata.newToken(cassandraSplit.getRingRangeEnd().toString());
final ResultSet resultSet =
session.execute(
preparedStatement
.bind()
.setToken(0, startToken)
.setToken(1, endToken));
// add all the records of the split to the output (in memory).
// It is safe because each split has a configurable maximum memory size
addRecordsToOutput(resultSet, cassandraSplit, recordsBySplit);
// add the already read (or even empty) split to finished splits
finishedSplits.add(cassandraSplit.splitId());
// for reentrant calls: if fetch is restarted,
// do not reprocess the already processed splits
unprocessedSplits.remove(cassandraSplit);
} catch (Exception ex) {
LOG.error("Error while reading split ", ex);
}
}
return new RecordsBySplits<>(recordsBySplit, finishedSplits);
}