in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala [211:236]
override def pushedFilters(): Array[Filter] = filtersForCassandra
/** Construct where clause from pushdown filters */
private def cqlWhereClause = CassandraScanBuilder.filterToCqlWhereClause(tableDef, filtersForCassandra)
/** Is convertable to joinWithCassandraTable if query
* - uses all partition key columns
* - spans multiple partitions
* - contains IN key values and the cartesian set of those values is greater than threshold
*/
private def isConvertableToJoinWithCassandra(predicates: AnalyzedPredicates): Boolean = {
val inClauseConversionThreshold = consolidatedConf.getLong(InClauseToJoinWithTableConversionThreshold.name, InClauseToJoinWithTableConversionThreshold.default)
if (inClauseConversionThreshold == 0L || !pushdownEnabled) {
false
} else {
val partitionFilters = eqAndInColumnFilters(tableDef.partitionKey, predicates)
val clusteringFilters = eqAndInColumnFilters(tableDef.clusteringColumns, predicates)
val inClauseValuesCartesianSize = (partitionFilters ++ clusteringFilters).foldLeft(1L) {
case (cartSize, In(_, values)) => cartSize * values.length
case (cartSize, _) => cartSize
}
partitionFilters.exists(_.isInstanceOf[In]) &&
tableDef.partitionKey.length == partitionFilters.length &&
inClauseValuesCartesianSize >= inClauseConversionThreshold
}
}