in connector/src/main/scala/com/datastax/spark/connector/datasource/ScanHelper.scala [132:158]
def containsPartitionKey(tableDef: TableDef, clause: CqlWhereClause): Boolean = {
val pk = tableDef.partitionKey.map(_.columnName).toSet
val wherePredicates: Seq[Predicate] = clause.predicates.flatMap(CqlWhereParser.parse)
val saiColumns = tableDef.indexes.filter(_.className.contains(StorageAttachedIndex)).map(_.target)
val whereColumns: Set[String] = wherePredicates.collect {
case EqPredicate(c, _) if pk.contains(c) => c
case InPredicate(c) if pk.contains(c) => c
case InListPredicate(c, _) if pk.contains(c) => c
case RangePredicate(c, _, _) if pk.contains(c) && !saiColumns.contains(c) =>
throw new UnsupportedOperationException(
s"Range predicates on partition key columns (here: $c) are " +
s"not supported in where. Use filter instead.")
}.toSet
val primaryKeyComplete = whereColumns.nonEmpty && whereColumns.size == pk.size
val whereColumnsAllIndexed = whereColumns.forall(tableDef.isIndexed)
if (!primaryKeyComplete && !whereColumnsAllIndexed) {
val missing = pk -- whereColumns
throw new UnsupportedOperationException(
s"Partition key predicate must include all partition key columns or partition key columns need" +
s" to be indexed. Missing columns: ${missing.mkString(",")}"
)
}
primaryKeyComplete
}