in spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala [349:513]
private def translateFilter(filter: Filter, strictPushDown: Boolean, isES50: Boolean):String = {
// the pushdown can be strict - i.e. use only filters and thus match the value exactly (works with non-analyzed)
// or non-strict meaning queries will be used instead that is the filters will be analyzed as well
filter match {
case EqualTo(attribute, value) => {
// if we get a null, translate it into a missing query (we're extra careful - Spark should translate the equals into isMissing anyway)
if (value == null || value == None || value == ()) {
if (isES50) {
s"""{"bool":{"must_not":{"exists":{"field":"$attribute"}}}}"""
}
else {
s"""{"missing":{"field":"$attribute"}}"""
}
}
if (strictPushDown) s"""{"term":{"$attribute":${extract(value)}}}"""
else {
if (isES50) {
s"""{"match":{"$attribute":${extract(value)}}}"""
}
else {
s"""{"query":{"match":{"$attribute":${extract(value)}}}}"""
}
}
}
case GreaterThan(attribute, value) => s"""{"range":{"$attribute":{"gt" :${extract(value)}}}}"""
case GreaterThanOrEqual(attribute, value) => s"""{"range":{"$attribute":{"gte":${extract(value)}}}}"""
case LessThan(attribute, value) => s"""{"range":{"$attribute":{"lt" :${extract(value)}}}}"""
case LessThanOrEqual(attribute, value) => s"""{"range":{"$attribute":{"lte":${extract(value)}}}}"""
case In(attribute, values) => {
// when dealing with mixed types (strings and numbers) Spark converts the Strings to null (gets confused by the type field)
// this leads to incorrect query DSL hence why nulls are filtered
val filtered = values filter (_ != null)
if (filtered.isEmpty) {
return ""
}
// further more, match query only makes sense with String types so for other types apply a terms query (aka strictPushDown)
val attrType = lazySchema.struct(attribute).dataType
val isStrictType = attrType match {
case DateType |
TimestampType => true
case _ => false
}
if (!strictPushDown && isStrictType) {
if (Utils.LOGGER.isDebugEnabled()) {
Utils.LOGGER.debug(s"Attribute $attribute type $attrType not suitable for match query; using terms (strict) instead")
}
}
if (strictPushDown || isStrictType) s"""{"terms":{"$attribute":${extractAsJsonArray(filtered)}}}"""
else {
if (isES50) {
s"""{"bool":{"should":[${extractMatchArray(attribute, filtered)}]}}"""
}
else {
s"""{"or":{"filters":[${extractMatchArray(attribute, filtered)}]}}"""
}
}
}
case IsNull(attribute) => {
if (isES50) {
s"""{"bool":{"must_not":{"exists":{"field":"$attribute"}}}}"""
}
else {
s"""{"missing":{"field":"$attribute"}}"""
}
}
case IsNotNull(attribute) => s"""{"exists":{"field":"$attribute"}}"""
case And(left, right) => {
if (isES50) {
s"""{"bool":{"filter":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""
}
else {
s"""{"and":{"filters":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""
}
}
case Or(left, right) => {
if (isES50) {
s"""{"bool":{"should":[{"bool":{"filter":${translateFilter(left, strictPushDown, isES50)}}}, {"bool":{"filter":${translateFilter(right, strictPushDown, isES50)}}}]}}"""
}
else {
s"""{"or":{"filters":[${translateFilter(left, strictPushDown, isES50)}, ${translateFilter(right, strictPushDown, isES50)}]}}"""
}
}
case Not(filterToNeg) => {
if (isES50) {
s"""{"bool":{"must_not":${translateFilter(filterToNeg, strictPushDown, isES50)}}}"""
}
else {
s"""{"not":{"filter":${translateFilter(filterToNeg, strictPushDown, isES50)}}}"""
}
}
// the filter below are available only from Spark 1.3.1 (not 1.3.0)
//
// String Filter notes:
//
// the DSL will be quite slow (linear to the number of terms in the index) but there's no easy way around them
// we could use regexp filter however it's a bit overkill and there are plenty of chars to escape
// s"""{"regexp":{"$attribute":"$value.*"}}"""
// as an alternative we could use a query string but still, the analyzed / non-analyzed is there as the DSL is slightly more complicated
// s"""{"query":{"query_string":{"default_field":"$attribute","query":"$value*"}}}"""
// instead wildcard query is used, with the value lowercased (to match analyzed fields)
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringStartsWith") => {
val arg = {
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
if (isES50) {
s"""{"wildcard":{"${f.productElement(0)}":"$arg*"}}"""
}
else {
s"""{"query":{"wildcard":{"${f.productElement(0)}":"$arg*"}}}"""
}
}
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith") => {
val arg = {
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
if (isES50) {
s"""{"wildcard":{"${f.productElement(0)}":"*$arg"}}"""
}
else {
s"""{"query":{"wildcard":{"${f.productElement(0)}":"*$arg"}}}"""
}
}
case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains") => {
val arg = {
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
if (isES50) {
s"""{"wildcard":{"${f.productElement(0)}":"*$arg*"}}"""
}
else {
s"""{"query":{"wildcard":{"${f.productElement(0)}":"*$arg*"}}}"""
}
}
// the filters below are available only from Spark 1.5.0
case f:Product if isClass(f, "org.apache.spark.sql.sources.EqualNullSafe") => {
val arg = extract(f.productElement(1))
if (strictPushDown) s"""{"term":{"${f.productElement(0)}":$arg}}"""
else {
if (isES50) {
s"""{"match":{"${f.productElement(0)}":$arg}}"""
}
else {
s"""{"query":{"match":{"${f.productElement(0)}":$arg}}}"""
}
}
}
case _ => ""
}
}