def convertToSolrQuery()

in connector/src/main/scala/org/apache/spark/sql/cassandra/SolrPredicateRules.scala [345:428]


  def convertToSolrQuery(
    predicates: AnalyzedPredicates,
    tableDef: TableDef,
    solrIndexedFields: Set[String],
    searchOptimizationEnabled: DseSearchOptimizationSetting,
    sparkConf: SparkConf): AnalyzedPredicates = {

    val allPredicates = predicates.handledByCassandra ++ predicates.handledBySpark

    val pkRestriction = getPartitionKeyRestriction(predicates, tableDef)
    if (pkRestriction.nonEmpty)
      logDebug(s"Partition restriction being withheld from Solr Conversion:  $pkRestriction")

    val possibleSolrPredicates = allPredicates -- pkRestriction

    val (solrConvertibleFilters, sparkFilters) = possibleSolrPredicates
      .partition(isConvertibleToSolr(_, solrIndexedFields))

    logDebug(s"Converting $solrConvertibleFilters to Solr Predicates")
    val solrFilters = solrConvertibleFilters.map(convertToSolrFilter)

    // Recommendation from Caleb :
    // Using separate filters ["filter","filter"] allows for reuse of filters
    val combinedFilterQuery = solrFilters
      .map { case SolrFilter(query, references) => s""""$query"""" }
      .mkString(", ")

    val solrString =  s"""{"q":"*:*", "fq":[$combinedFilterQuery]}"""

    /*
    See https://docs.datastax.com/en/datastax_enterprise/4.8/datastax_enterprise/srch/srchJSON.html#srchJSON__distQueryShardTol
    By setting these parameters for our estimate queries we will be more tolerant to partial results and shards not responding
    Which is ok because we are just trying to get an estimate
    */
    val FaultTolerant = Seq("\"shards.failover\": false", "\"shards.tolerant\": true").mkString(",")
    val solrStringNoFailoverTolerant = s"""{"q":"*:*", "fq":[$combinedFilterQuery], $FaultTolerant}"""

    val combinedSolrFilter: Filter = EqualTo(SolrQuery, solrString)

    val optimizedPredicates = AnalyzedPredicates(Set(combinedSolrFilter) ++ pkRestriction, sparkFilters)

    if (solrConvertibleFilters.isEmpty) {
      logDebug("No Solr Convertible Filters Found")
      predicates
    } else {
      searchOptimizationEnabled match {
        case Auto(ratio) =>
          val conn = CassandraConnector(sparkConf)
          val request = s"""SELECT COUNT(*) from "${tableDef.keyspaceName}"."${tableDef.tableName}" where solr_query=?"""

          logDebug(s"Checking total number of records")
          val (totalRecords:Long, queryRecords:Long) = conn.withSessionDo{ session =>
            //Disable Paging for the count requests since we are fault tolerant and paging cannot
            // be used during a fault tolerant request
            // https://docs.datastax.com/en/drivers/java/2.2/com/datastax/driver/core/Statement.html#setFetchSize-int-
            val pagingDisabled = session.getContext.getConfig.getDefaultProfile.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, -1)
            val totalRequest = SimpleStatement.newInstance(request, s"""{"q":"*:*", $FaultTolerant}""")
              .setExecutionProfile(pagingDisabled)
            val queryRequest = SimpleStatement.newInstance(request, solrStringNoFailoverTolerant)
              .setExecutionProfile(pagingDisabled)

            val totalFuture = session.executeAsync(totalRequest)
            val queryFuture = session.executeAsync(queryRequest)//TODO THIS can be done in a more reactive way I believe
            (totalFuture.toCompletableFuture.get(5, TimeUnit.SECONDS).one().getLong(0),
              queryFuture.toCompletableFuture.get(5, TimeUnit.SECONDS).one().getLong(0))
          }

          val queryRatio = if (totalRecords == 0) 0 else queryRecords.toDouble / totalRecords.toDouble
          if (queryRatio > ratio) {
            logDebug(s"Requesting $queryRatio of the total records. Required to be less than $ratio for DSE Search, falling back to Full Table Scan")
            predicates
          }
          else {
            logDebug(s"Requesting $queryRatio of the total records. Less than $ratio, using DSE Search Optimized request")
            optimizedPredicates
          }
        case On =>
          logDebug(s"Converted $solrConvertibleFilters to $combinedSolrFilter")
          optimizedPredicates
        case Off =>
          predicates
      }
    }
  }