def transverseFilterTree()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [486:613]


  def transverseFilterTree(
      parentRowKeyFilter: RowKeyFilter,
      valueArray: mutable.MutableList[Array[Byte]],
      filter: Filter): DynamicLogicExpression = {
    filter match {
      case EqualTo(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(Utils.toBytes(value, field), null))
          }
          val byteValue = Utils.toBytes(value, field)
          valueArray += byteValue
        }
        new EqualLogicExpression(attr, valueArray.length - 1, false)

      /**
       * encoder may split the predicates into multiple byte array boundaries.
       * Each boundaries is mapped into the RowKeyFilter and then is unioned by the reduce
       * operation. If the data type is not supported, b will be None, and there is
       * no operation happens on the parentRowKeyFilter.
       *
       * Note that because LessThan is not inclusive, thus the first bound should be exclusive,
       * which is controlled by inc.
       *
       * The other predicates, i.e., GreaterThan/LessThanOrEqual/GreaterThanOrEqual follows
       * the similar logic.
       */
      case LessThan(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            val b = encoder.ranges(value)
            var inc = false
            b.map(_.less.map {
              x =>
                val r = new RowKeyFilter(null, new ScanRange(x.upper, inc, x.low, true))
                inc = true
                r
            }).map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
              .map(parentRowKeyFilter.mergeIntersect(_))
          }
          val byteValue = encoder.encode(field.dt, value)
          valueArray += byteValue
        }
        new LessThanLogicExpression(attr, valueArray.length - 1)
      case GreaterThan(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            val b = encoder.ranges(value)
            var inc = false
            b.map(_.greater.map {
              x =>
                val r = new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, inc))
                inc = true
                r
            }).map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
              .map(parentRowKeyFilter.mergeIntersect(_))
          }
          val byteValue = encoder.encode(field.dt, value)
          valueArray += byteValue
        }
        new GreaterThanLogicExpression(attr, valueArray.length - 1)
      case LessThanOrEqual(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            val b = encoder.ranges(value)
            b.map(
              _.less.map(x => new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, true))))
              .map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
              .map(parentRowKeyFilter.mergeIntersect(_))
          }
          val byteValue = encoder.encode(field.dt, value)
          valueArray += byteValue
        }
        new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
      case GreaterThanOrEqual(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            val b = encoder.ranges(value)
            b.map(
              _.greater.map(x => new RowKeyFilter(null, new ScanRange(x.upper, true, x.low, true))))
              .map { x => x.reduce { (i, j) => i.mergeUnion(j) } }
              .map(parentRowKeyFilter.mergeIntersect(_))
          }
          val byteValue = encoder.encode(field.dt, value)
          valueArray += byteValue
        }
        new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
      case StringStartsWith(attr, value) =>
        val field = catalog.getField(attr)
        if (field != null) {
          if (field.isRowKey) {
            val p = Utils.toBytes(value, field)
            val endRange = Utils.incrementByteArray(p)
            parentRowKeyFilter.mergeIntersect(
              new RowKeyFilter(null, new ScanRange(endRange, false, p, true)))
          }
          val byteValue = Utils.toBytes(value, field)
          valueArray += byteValue
        }
        new StartsWithLogicExpression(attr, valueArray.length - 1)
      case Or(left, right) =>
        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
        val rightSideRowKeyFilter = new RowKeyFilter
        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)

        parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)

        new OrLogicExpression(leftExpression, rightExpression)
      case And(left, right) =>
        val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
        val rightSideRowKeyFilter = new RowKeyFilter
        val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
        parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)

        new AndLogicExpression(leftExpression, rightExpression)
      case IsNull(attr) =>
        new IsNullLogicExpression(attr, false)
      case IsNotNull(attr) =>
        new IsNullLogicExpression(attr, true)
      case _ =>
        new PassThroughLogicExpression
    }
  }