override def unhandledFilters()

in spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala [304:342]


  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
    if (Utils.isKeepHandledFilters(cfg) || filters == null || filters.size == 0) {
      filters
    } else {
      // walk the filters (things like And / Or) and see whether we recognize all of them
      // if we do, skip the filter, otherwise let it in there even though we might push some of it
      def unhandled(filter: Filter): Boolean = {
        filter match {
          case EqualTo(_, _)                                                            => false
          case GreaterThan(_, _)                                                        => false
          case GreaterThanOrEqual(_, _)                                                 => false
          case LessThan(_, _)                                                           => false
          case LessThanOrEqual(_, _)                                                    => false
          // In is problematic - see translate, don't filter it
          case In(_, _)                                                                 => true
          case IsNull(_)                                                                => false
          case IsNotNull(_)                                                             => false
          case And(left, right)                                                         => unhandled(left) || unhandled(right)
          case Or(left, right)                                                          => unhandled(left) || unhandled(right)
          case Not(pred)                                                                => unhandled(pred)
          // Spark 1.3.1+
          case f:Product if isClass(f, "org.apache.spark.sql.sources.StringStartsWith") => false
          case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith")   => false
          case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains")   => false
          // Spark 1.5+
          case f:Product if isClass(f, "org.apache.spark.sql.sources.EqualNullSafe")    => false

          // unknown
          case _                                                                        => true
        }
      }

      val filtered = filters.filter(unhandled)
      if (Utils.LOGGER.isTraceEnabled()) {
        Utils.LOGGER.trace(s"Unhandled filters from ${filters.mkString("[", ",", "]")} to ${filtered.mkString("[", ",", "]")}")
      }
      filtered
    }
  }