in connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala [88:109]
private def pushDownPredicates(predicates: AnalyzedPredicates, table: TableDef)
(eligibleForPushDown: Filter => Boolean): AnalyzedPredicates = {
val indexColumns = saiIndexes(table).map(_.targetColumn)
val pushedEqualityPredicates = predicates.handledByCassandra.collect {
case f if FilterOps.isEqualToPredicate(f) => FilterOps.columnName(f)
}.to(collection.mutable.Set)
val (handledByCassandra, handledBySpark) = predicates.handledBySpark.partition { filter =>
lazy val columnName = FilterOps.columnName(filter)
lazy val eligibleForSAIPushDown = eligibleForPushDown(filter) &&
indexColumns.contains(columnName) &&
!pushedEqualityPredicates.contains(columnName)
val isEqualityPredicate = FilterOps.isEqualToPredicate(filter)
if (isEqualityPredicate && eligibleForSAIPushDown) {
pushedEqualityPredicates += columnName
}
eligibleForSAIPushDown
}
AnalyzedPredicates(predicates.handledByCassandra ++ handledByCassandra, handledBySpark)
}