in flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java [157:184]
static String generateRangeQuery(String query, String partitionKey) {
Matcher queryMatcher = CassandraSource.SELECT_REGEXP.matcher(query);
if (!queryMatcher.matches()) {
throw new IllegalStateException(
String.format(
"Failed to generate range query out of the provided query: %s", query));
}
final int whereIndex = query.toLowerCase().indexOf("where");
int insertionPoint;
String filter;
if (whereIndex != -1) {
insertionPoint = whereIndex + "where".length();
filter =
String.format(
" (token(%s) >= ?) AND (token(%s) < ?) AND",
partitionKey, partitionKey);
} else {
// end of keyspace.table
insertionPoint = queryMatcher.end(2);
filter =
String.format(
" WHERE (token(%s) >= ?) AND (token(%s) < ?)",
partitionKey, partitionKey);
}
return String.format(
"%s%s%s",
query.substring(0, insertionPoint), filter, query.substring(insertionPoint));
}