in connector/src/main/scala/org/apache/spark/sql/cassandra/SolrPredicateRules.scala [51:97]
private[cassandra] def apply(
predicates: AnalyzedPredicates,
tableDef: TableDef,
sparkConf: SparkConf,
getSolrIndexedColumns: (TableDef, SparkConf) => Set[String]): AnalyzedPredicates = {
//This could be done in the SCC as it's not Solr Specific
val uselessIsNotNulls =
findUselessIsNotNulls(predicates.handledByCassandra ++ predicates.handledBySpark, tableDef)
val usefulPredicates =
AnalyzedPredicates(predicates.handledByCassandra, predicates.handledBySpark -- uselessIsNotNulls)
val pkRestriction = getPartitionKeyRestriction(usefulPredicates, tableDef).asInstanceOf[Set[Filter]]
val primaryKeyRestrictionExists =
pkRestriction.nonEmpty &&
pkRestriction.subsetOf(usefulPredicates.handledByCassandra) &&
usefulPredicates.handledBySpark.isEmpty
val solrEnabledOnTargetHosts = CassandraConnector(sparkConf)
.withSessionDo{ session =>
val hosts = session.getMetadata.getNodes.values().asScala
val possibleHosts = hosts.filter(host => host.getDistance != NodeDistance.IGNORED)
possibleHosts.forall { host =>
val workloads = Option(host.getExtras.get(DseNodeProperties.DSE_WORKLOADS))
workloads.exists(_.asInstanceOf[java.util.Set[String]].contains("Search"))
}
}
val failedRequirement = Seq[(Boolean, String)](
(!solrEnabledOnTargetHosts, "Search is not enabled on DSE Target nodes."),
(!searchOptimizationEnabled.enabled, "Automatic Search optimizations for Spark SQL are disabled."),
(primaryKeyRestrictionExists, "There is a primary key restriction present"),
(alreadyContainsSolrQuery(usefulPredicates), "Manual Solr query (solr_query = xxx) present.")
).collectFirst{ case (true, reason) => reason}
failedRequirement match {
case Some(reasonForFailure) =>
logDebug(s"Not using Solr Optimizations. $reasonForFailure")
usefulPredicates
case None =>
convertToSolrQuery(
usefulPredicates,
tableDef,
getSolrIndexedColumns(tableDef, sparkConf),
searchOptimizationEnabled,
sparkConf)
}
}