private def translateFilter()

in spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala [349:513]


  private def translateFilter(filter: Filter, strictPushDown: Boolean, isES50: Boolean):String = {
    // the pushdown can be strict - i.e. use only filters and thus match the value exactly (works with non-analyzed)
    // or non-strict meaning queries will be used instead that is the filters will be analyzed as well
    filter match {

      case EqualTo(attribute, value)            => {
        // if we get a null, translate it into a missing query (we're extra careful - Spark should translate the equals into isMissing anyway)
        if (value == null || value == None || value == ()) {
          if (isES50) {
            s"""{"bool":{"must_not":{"exists":{"field":"$attribute"}}}}"""
          }
          else {
            s"""{"missing":{"field":"$attribute"}}"""  
          }
        }

        if (strictPushDown) s"""{"term":{"$attribute":${extract(value)}}}"""
        else {
          if (isES50) {
            s"""{"match":{"$attribute":${extract(value)}}}"""
          }
          else {
            s"""{"query":{"match":{"$attribute":${extract(value)}}}}"""
          }
        }
      }
      case GreaterThan(attribute, value)        => s"""{"range":{"$attribute":{"gt" :${extract(value)}}}}"""
      case GreaterThanOrEqual(attribute, value) => s"""{"range":{"$attribute":{"gte":${extract(value)}}}}"""
      case LessThan(attribute, value)           => s"""{"range":{"$attribute":{"lt" :${extract(value)}}}}"""
      case LessThanOrEqual(attribute, value)    => s"""{"range":{"$attribute":{"lte":${extract(value)}}}}"""
      case In(attribute, values)                => {
        // when dealing with mixed types (strings and numbers) Spark converts the Strings to null (gets confused by the type field)
        // this leads to incorrect query DSL hence why nulls are filtered
        val filtered = values filter (_ != null)
        if (filtered.isEmpty) {
          return ""
        }

        // further more, match query only makes sense with String types so for other types apply a terms query (aka strictPushDown)
        val attrType = lazySchema.struct(attribute).dataType
        val isStrictType = attrType match {
          case DateType |
               TimestampType => true
          case _             => false
        }

        if (!strictPushDown && isStrictType) {
          if (Utils.LOGGER.isDebugEnabled()) {
            Utils.LOGGER.debug(s"Attribute $attribute type $attrType not suitable for match query; using terms (strict) instead")
          }
        }

        if (strictPushDown || isStrictType) s"""{"terms":{"$attribute":${extractAsJsonArray(filtered)}}}"""
        else {
          if (isES50) {
            s"""{"bool":{"should":[${extractMatchArray(attribute, filtered)}]}}"""
          }
          else {
            s"""{"or":{"filters":[${extractMatchArray(attribute, filtered)}]}}"""  
          }
        }
      }
      case IsNull(attribute)                    => {
        if (isES50) {
          s"""{"bool":{"must_not":{"exists":{"field":"$attribute"}}}}"""
        }
        else {
          s"""{"missing":{"field":"$attribute"}}"""  
        }
      }
      case IsNotNull(attribute)                 => s"""{"exists":{"field":"$attribute"}}"""
      case And(left, right)                     => {
        if (isES50) {
          s"""{"bool":{"filter":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""
        }
        else {
          s"""{"and":{"filters":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""  
        }
      }
      case Or(left, right)                      => {
        if (isES50) {
          s"""{"bool":{"should":[{"bool":{"filter":${translateFilter(left, strictPushDown, isES50)}}}, {"bool":{"filter":${translateFilter(right, strictPushDown, isES50)}}}]}}"""
        }
        else {
          s"""{"or":{"filters":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""
        }
      }
      case Not(filterToNeg)                     => {
        if (isES50) {
          s"""{"bool":{"must_not":${translateFilter(filterToNeg, strictPushDown, isES50)}}}"""
        }
        else {        
          s"""{"not":{"filter":${translateFilter(filterToNeg, strictPushDown, isES50)}}}"""
        }
      }

      // the filter below are available only from Spark 1.3.1 (not 1.3.0)

      //
      // String Filter notes:
      //
      // the DSL will be quite slow (linear to the number of terms in the index) but there's no easy way around them
      // we could use regexp filter however it's a bit overkill and there are plenty of chars to escape
      // s"""{"regexp":{"$attribute":"$value.*"}}"""
      // as an alternative we could use a query string but still, the analyzed / non-analyzed is there as the DSL is slightly more complicated
      // s"""{"query":{"query_string":{"default_field":"$attribute","query":"$value*"}}}"""
      // instead wildcard query is used, with the value lowercased (to match analyzed fields)

      case f:Product if isClass(f, "org.apache.spark.sql.sources.StringStartsWith") => {
        val arg = {
          val x = f.productElement(1).toString()
          if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
        }
        if (isES50) {
          s"""{"wildcard":{"${f.productElement(0)}":"$arg*"}}"""
        }
        else {
          s"""{"query":{"wildcard":{"${f.productElement(0)}":"$arg*"}}}"""  
        }
      }

      case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith")   => {
        val arg = {
          val x = f.productElement(1).toString()
          if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
        }
        if (isES50) {
          s"""{"wildcard":{"${f.productElement(0)}":"*$arg"}}"""
        }
        else {
          s"""{"query":{"wildcard":{"${f.productElement(0)}":"*$arg"}}}"""
        }
      }

      case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains")   => {
        val arg = {
          val x = f.productElement(1).toString()
          if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
        }
        if (isES50) {
          s"""{"wildcard":{"${f.productElement(0)}":"*$arg*"}}"""
        }
        else {
          s"""{"query":{"wildcard":{"${f.productElement(0)}":"*$arg*"}}}"""
        }
      }

      // the filters below are available only from Spark 1.5.0

      case f:Product if isClass(f, "org.apache.spark.sql.sources.EqualNullSafe")    => {
        val arg = extract(f.productElement(1))
        if (strictPushDown) s"""{"term":{"${f.productElement(0)}":$arg}}"""
        else {
          if (isES50) {
            s"""{"match":{"${f.productElement(0)}":$arg}}"""
          }
          else {
            s"""{"query":{"match":{"${f.productElement(0)}":$arg}}}"""
          }
        }
      }

      case _                                                                        => ""
    }
  }