in spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala [304:342]
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
if (Utils.isKeepHandledFilters(cfg) || filters == null || filters.size == 0) {
filters
} else {
// walk the filters (things like And / Or) and see whether we recognize all of them
// if we do, skip the filter, otherwise let it in there even though we might push some of it
def unhandled(filter: Filter): Boolean = {
filter match {
case EqualTo(_, _) => false
case GreaterThan(_, _) => false
case GreaterThanOrEqual(_, _) => false
case LessThan(_, _) => false
case LessThanOrEqual(_, _) => false
// In is problematic - see translate, don't filter it
case In(_, _) => true
case IsNull(_) => false
case IsNotNull(_) => false
case And(left, right) => unhandled(left) || unhandled(right)
case Or(left, right) => unhandled(left) || unhandled(right)
case Not(pred) => unhandled(pred)
// Spark 1.3.1+
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringStartsWith") => false
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith") => false
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains") => false
// Spark 1.5+
case f:Product if isClass(f, "org.apache.spark.sql.sources.EqualNullSafe") => false
// unknown
case _ => true
}
}
val filtered = filters.filter(unhandled)
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Unhandled filters from ${filters.mkString("[", ",", "]")} to ${filtered.mkString("[", ",", "]")}")
}
filtered
}
}