private def pushFiltersToSpark()

in connector/src/main/scala/org/apache/spark/sql/cassandra/InClausePredicateRules.scala [34:70]


  private def pushFiltersToSpark(predicates: AnalyzedPredicates, filters: Set[Filter]): AnalyzedPredicates =
    predicates.copy(
      handledBySpark = predicates.handledBySpark ++ filters,
      handledByCassandra = predicates.handledByCassandra -- filters)

  private def columnsFilters(filters: Set[Filter], columns: Seq[ColumnDef]): Set[Filter] =
    filters.filter(f => columns.exists(_.columnName == columnName(f)))

  private def filterOutHugeInClausePredicates(
    predicates: AnalyzedPredicates,
    tableDef: TableDef,
    sparkConf: SparkConf): AnalyzedPredicates = {
    val fullTableScanConversionThreshold = sparkConf.getLong(
      InClauseToFullTableScanConversionThreshold.name,
      InClauseToFullTableScanConversionThreshold.default)

    val inFilters = predicates.handledByCassandra.filter(isInPredicate)

    val partitionColumnsFilters = columnsFilters(inFilters, tableDef.partitionKey)
    val clusteringColumnsFilters = columnsFilters(inFilters, tableDef.clusteringColumns)

    val partitionCartesianSize = inCrossProductSize(partitionColumnsFilters)
    val clusteringCartesianSize = inCrossProductSize(clusteringColumnsFilters)

    if (partitionCartesianSize * clusteringCartesianSize < fullTableScanConversionThreshold) {
      predicates
    } else if (partitionCartesianSize < fullTableScanConversionThreshold) {
      logInfo(s"Number of key combinations in 'IN' clauses exceeds ${InClauseToFullTableScanConversionThreshold.name} " +
        s"($fullTableScanConversionThreshold), clustering columns filters are not pushed down.")
      pushFiltersToSpark(predicates, clusteringColumnsFilters)
    } else {
      logInfo(s"Number of key combinations in 'IN' clauses exceeds ${InClauseToFullTableScanConversionThreshold.name} " +
        s"($fullTableScanConversionThreshold), partition key filters are not pushed down. This results in full table " +
        s"scan with Spark side filtering.")
      pushFiltersToSpark(predicates, partitionColumnsFilters ++ clusteringColumnsFilters)
    }
  }