private[cassandra] def apply()

in connector/src/main/scala/org/apache/spark/sql/cassandra/SolrPredicateRules.scala [51:97]


  private[cassandra] def apply(
    predicates: AnalyzedPredicates,
    tableDef: TableDef,
    sparkConf: SparkConf,
    getSolrIndexedColumns: (TableDef, SparkConf) => Set[String]): AnalyzedPredicates = {

    //This could be done in the SCC as it's not Solr Specific
    val uselessIsNotNulls =
      findUselessIsNotNulls(predicates.handledByCassandra ++ predicates.handledBySpark, tableDef)
    val usefulPredicates =
      AnalyzedPredicates(predicates.handledByCassandra, predicates.handledBySpark -- uselessIsNotNulls)

    val pkRestriction = getPartitionKeyRestriction(usefulPredicates, tableDef).asInstanceOf[Set[Filter]]

    val primaryKeyRestrictionExists =
      pkRestriction.nonEmpty &&
      pkRestriction.subsetOf(usefulPredicates.handledByCassandra) &&
      usefulPredicates.handledBySpark.isEmpty

    val solrEnabledOnTargetHosts = CassandraConnector(sparkConf)
      .withSessionDo{ session =>
        val hosts = session.getMetadata.getNodes.values().asScala
        val possibleHosts = hosts.filter(host => host.getDistance != NodeDistance.IGNORED)
        possibleHosts.forall { host =>
          val workloads = Option(host.getExtras.get(DseNodeProperties.DSE_WORKLOADS))
          workloads.exists(_.asInstanceOf[java.util.Set[String]].contains("Search"))
        }
      }
    val failedRequirement = Seq[(Boolean, String)](
      (!solrEnabledOnTargetHosts, "Search is not enabled on DSE Target nodes."),
      (!searchOptimizationEnabled.enabled, "Automatic Search optimizations for Spark SQL are disabled."),
      (primaryKeyRestrictionExists, "There is a primary key restriction present"),
      (alreadyContainsSolrQuery(usefulPredicates), "Manual Solr query (solr_query = xxx) present.")
    ).collectFirst{ case (true, reason) => reason}
    failedRequirement match {
      case Some(reasonForFailure) =>
        logDebug(s"Not using Solr Optimizations. $reasonForFailure")
        usefulPredicates
      case None =>
        convertToSolrQuery(
          usefulPredicates,
          tableDef,
          getSolrIndexedColumns(tableDef, sparkConf),
          searchOptimizationEnabled,
          sparkConf)
    }
  }