in connector/src/main/scala/com/datastax/spark/connector/rdd/AbstractCassandraJoin.scala [74:113]
protected def checkValidJoin(): Seq[ColumnRef] = {
val partitionKeyColumnNames = tableDef.partitionKey.map(_.columnName).toSet
val primaryKeyColumnNames = tableDef.primaryKey.map(_.columnName).toSet
val colNames = joinColumnNames.map(_.columnName).toSet
// Initialize RowWriter and Query to be used for accessing Cassandra
rowWriter.columnNames
def checkSingleColumn(column: ColumnRef): Unit = {
require(
primaryKeyColumnNames.contains(column.columnName),
s"Can't pushdown join on column $column because it is not part of the PRIMARY KEY"
)
}
// Make sure we have all of the clustering indexes between the 0th position and the max requested
// in the join:
val chosenClusteringColumns = tableDef.clusteringColumns
.filter(cc => colNames.contains(cc.columnName))
if (!tableDef.clusteringColumns.startsWith(chosenClusteringColumns)) {
val maxCol = chosenClusteringColumns.last
val maxIndex = maxCol.componentIndex.get
val requiredColumns = tableDef.clusteringColumns.takeWhile(_.componentIndex.get <= maxIndex)
val missingColumns = requiredColumns.toSet -- chosenClusteringColumns.toSet
throw new IllegalArgumentException(
s"Can't pushdown join on column $maxCol without also specifying [ $missingColumns ]"
)
}
val missingPartitionKeys = partitionKeyColumnNames -- colNames
require(
missingPartitionKeys.isEmpty,
s"Can't join without the full partition key. Missing: [ $missingPartitionKeys ]"
)
//Run To check for conflicting where clauses
JoinHelper.getJoinQueryString(tableDef, joinColumnNames, CqlQueryParts(selectedColumnRefs, where, limit, clusteringOrder))
joinColumnNames.foreach(checkSingleColumn)
joinColumnNames
}