in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [486:613]
def transverseFilterTree(
parentRowKeyFilter: RowKeyFilter,
valueArray: mutable.MutableList[Array[Byte]],
filter: Filter): DynamicLogicExpression = {
filter match {
case EqualTo(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(Utils.toBytes(value, field), null))
}
val byteValue = Utils.toBytes(value, field)
valueArray += byteValue
}
new EqualLogicExpression(attr, valueArray.length - 1, false)
/**
* encoder may split the predicates into multiple byte array boundaries.
* Each boundaries is mapped into the RowKeyFilter and then is unioned by the reduce
* operation. If the data type is not supported, b will be None, and there is
* no operation happens on the parentRowKeyFilter.
*
* Note that because LessThan is not inclusive, thus the first bound should be exclusive,
* which is controlled by inc.
*
* The other predicates, i.e., GreaterThan/LessThanOrEqual/GreaterThanOrEqual follows
* the similar logic.
*/
case LessThan(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
val b = encoder.ranges(value)
var inc = false
b.map(_.less.map {
x =>
val r = new RowKeyFilter(null, new ScanRange(x.upper, inc, x.low, true))
inc = true
r
}).map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
.map(parentRowKeyFilter.mergeIntersect(_))
}
val byteValue = encoder.encode(field.dt, value)
valueArray += byteValue
}
new LessThanLogicExpression(attr, valueArray.length - 1)
case GreaterThan(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
val b = encoder.ranges(value)
var inc = false
b.map(_.greater.map {
x =>
val r = new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, inc))
inc = true
r
}).map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
.map(parentRowKeyFilter.mergeIntersect(_))
}
val byteValue = encoder.encode(field.dt, value)
valueArray += byteValue
}
new GreaterThanLogicExpression(attr, valueArray.length - 1)
case LessThanOrEqual(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
val b = encoder.ranges(value)
b.map(
_.less.map(x => new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, true))))
.map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
.map(parentRowKeyFilter.mergeIntersect(_))
}
val byteValue = encoder.encode(field.dt, value)
valueArray += byteValue
}
new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
case GreaterThanOrEqual(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
val b = encoder.ranges(value)
b.map(
_.greater.map(x => new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, true))))
.map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
.map(parentRowKeyFilter.mergeIntersect(_))
}
val byteValue = encoder.encode(field.dt, value)
valueArray += byteValue
}
new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
case StringStartsWith(attr, value) =>
val field = catalog.getField(attr)
if (field != null) {
if (field.isRowKey) {
val p = Utils.toBytes(value, field)
val endRange = Utils.incrementByteArray(p)
parentRowKeyFilter.mergeIntersect(
new RowKeyFilter(null, new ScanRange(endRange, false, p, true)))
}
val byteValue = Utils.toBytes(value, field)
valueArray += byteValue
}
new StartsWithLogicExpression(attr, valueArray.length - 1)
case Or(left, right) =>
val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
val rightSideRowKeyFilter = new RowKeyFilter
val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
new OrLogicExpression(leftExpression, rightExpression)
case And(left, right) =>
val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
val rightSideRowKeyFilter = new RowKeyFilter
val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
new AndLogicExpression(leftExpression, rightExpression)
case IsNull(attr) =>
new IsNullLogicExpression(attr, false)
case IsNotNull(attr) =>
new IsNullLogicExpression(attr, true)
case _ =>
new PassThroughLogicExpression
}
}