in connector/src/main/scala/org/apache/spark/sql/cassandra/SolrPredicateRules.scala [345:428]
def convertToSolrQuery(
predicates: AnalyzedPredicates,
tableDef: TableDef,
solrIndexedFields: Set[String],
searchOptimizationEnabled: DseSearchOptimizationSetting,
sparkConf: SparkConf): AnalyzedPredicates = {
val allPredicates = predicates.handledByCassandra ++ predicates.handledBySpark
val pkRestriction = getPartitionKeyRestriction(predicates, tableDef)
if (pkRestriction.nonEmpty)
logDebug(s"Partition restriction being withheld from Solr Conversion: $pkRestriction")
val possibleSolrPredicates = allPredicates -- pkRestriction
val (solrConvertibleFilters, sparkFilters) = possibleSolrPredicates
.partition(isConvertibleToSolr(_, solrIndexedFields))
logDebug(s"Converting $solrConvertibleFilters to Solr Predicates")
val solrFilters = solrConvertibleFilters.map(convertToSolrFilter)
// Recommendation from Caleb :
// Using separate filters ["filter","filter"] allows for reuse of filters
val combinedFilterQuery = solrFilters
.map { case SolrFilter(query, references) => s""""$query"""" }
.mkString(", ")
val solrString = s"""{"q":"*:*", "fq":[$combinedFilterQuery]}"""
/*
See https://docs.datastax.com/en/datastax_enterprise/4.8/datastax_enterprise/srch/srchJSON.html#srchJSON__distQueryShardTol
By setting these parameters for our estimate queries we will be more tolerant to partial results and shards not responding
Which is ok because we are just trying to get an estimate
*/
val FaultTolerant = Seq("\"shards.failover\": false", "\"shards.tolerant\": true").mkString(",")
val solrStringNoFailoverTolerant = s"""{"q":"*:*", "fq":[$combinedFilterQuery], $FaultTolerant}"""
val combinedSolrFilter: Filter = EqualTo(SolrQuery, solrString)
val optimizedPredicates = AnalyzedPredicates(Set(combinedSolrFilter) ++ pkRestriction, sparkFilters)
if (solrConvertibleFilters.isEmpty) {
logDebug("No Solr Convertible Filters Found")
predicates
} else {
searchOptimizationEnabled match {
case Auto(ratio) =>
val conn = CassandraConnector(sparkConf)
val request = s"""SELECT COUNT(*) from "${tableDef.keyspaceName}"."${tableDef.tableName}" where solr_query=?"""
logDebug(s"Checking total number of records")
val (totalRecords:Long, queryRecords:Long) = conn.withSessionDo{ session =>
//Disable Paging for the count requests since we are fault tolerant and paging cannot
// be used during a fault tolerant request
// https://docs.datastax.com/en/drivers/java/2.2/com/datastax/driver/core/Statement.html#setFetchSize-int-
val pagingDisabled = session.getContext.getConfig.getDefaultProfile.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, -1)
val totalRequest = SimpleStatement.newInstance(request, s"""{"q":"*:*", $FaultTolerant}""")
.setExecutionProfile(pagingDisabled)
val queryRequest = SimpleStatement.newInstance(request, solrStringNoFailoverTolerant)
.setExecutionProfile(pagingDisabled)
val totalFuture = session.executeAsync(totalRequest)
val queryFuture = session.executeAsync(queryRequest)//TODO THIS can be done in a more reactive way I believe
(totalFuture.toCompletableFuture.get(5, TimeUnit.SECONDS).one().getLong(0),
queryFuture.toCompletableFuture.get(5, TimeUnit.SECONDS).one().getLong(0))
}
val queryRatio = if (totalRecords == 0) 0 else queryRecords.toDouble / totalRecords.toDouble
if (queryRatio > ratio) {
logDebug(s"Requesting $queryRatio of the total records. Required to be less than $ratio for DSE Search, falling back to Full Table Scan")
predicates
}
else {
logDebug(s"Requesting $queryRatio of the total records. Less than $ratio, using DSE Search Optimized request")
optimizedPredicates
}
case On =>
logDebug(s"Converted $solrConvertibleFilters to $combinedSolrFilter")
optimizedPredicates
case Off =>
predicates
}
}
}