in integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala [115:215]
def createCarbonFilter(schema: StructType,
predicate: sources.Filter): Option[CarbonExpression] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
predicate match {
case sources.EqualTo(name, value) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.Not(sources.EqualTo(name, value)) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.EqualNullSafe(name, value) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.Not(sources.EqualNullSafe(name, value)) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.GreaterThan(name, value) =>
Some(new GreaterThanExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.LessThan(name, value) =>
Some(new LessThanExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.GreaterThanOrEqual(name, value) =>
Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.LessThanOrEqual(name, value) =>
Some(new LessThanEqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.In(name, values) =>
if (values.length == 1 && values(0) == null) {
Some(new FalseExpression(getCarbonExpression(name)))
} else {
Some(new InExpression(getCarbonExpression(name),
new ListExpression(
convertToJavaList(values.filterNot(_ == null)
.map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
}
case sources.Not(sources.In(name, values)) =>
if (values.contains(null)) {
Some(new FalseExpression(getCarbonExpression(name)))
} else {
Some(new NotInExpression(getCarbonExpression(name),
new ListExpression(
convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
}
case sources.IsNull(name) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, null), true))
case sources.IsNotNull(name) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, null), true))
case sources.And(lhs, rhs) =>
(createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(lhs)
rhsFilter <- createFilter(rhs)
} yield {
new OrExpression(lhsFilter, rhsFilter)
}
case sources.StringStartsWith(name, value) if value.length > 0 =>
Some(new StartsWithExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case _ => None
}
}
def getCarbonExpression(name: String) = {
new CarbonColumnExpression(name,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
}
def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
val dataTypeOfAttribute =
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
val dataType =
if (Option(value).isDefined &&
dataTypeOfAttribute == CarbonDataTypes.STRING &&
value.isInstanceOf[Double]) {
CarbonDataTypes.DOUBLE
} else {
dataTypeOfAttribute
}
val dataValue = if (dataTypeOfAttribute.equals(CarbonDataTypes.BINARY)
&& Option(value).isDefined) {
new String(value.asInstanceOf[Array[Byte]])
} else if (dataTypeOfAttribute.equals(CarbonDataTypes.DATE) &&
value.isInstanceOf[java.sql.Date]) {
// In case of Date object , convert back to int.
DateTimeUtils.fromJavaDate(value.asInstanceOf[java.sql.Date])
} else {
value
}
new CarbonLiteralExpression(dataValue, dataType)
}
createFilter(predicate)
}