override def buildScan()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [344:442]


  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

    val pushDownTuple = buildPushDownPredicatesResource(filters)
    val pushDownRowKeyFilter = pushDownTuple._1
    var pushDownDynamicLogicExpression = pushDownTuple._2
    val valueArray = pushDownTuple._3

    if (!usePushDownColumnFilter) {
      pushDownDynamicLogicExpression = null
    }

    logDebug("pushDownRowKeyFilter:           " + pushDownRowKeyFilter.ranges)
    if (pushDownDynamicLogicExpression != null) {
      logDebug(
        "pushDownDynamicLogicExpression: " +
          pushDownDynamicLogicExpression.toExpressionString)
    }
    logDebug("valueArray:                     " + valueArray.length)

    val requiredQualifierDefinitionList =
      new mutable.MutableList[Field]

    requiredColumns.foreach(
      c => {
        val field = catalog.getField(c)
        requiredQualifierDefinitionList += field
      })

    // retain the information for unit testing checks
    DefaultSourceStaticUtils.populateLatestExecutionRules(
      pushDownRowKeyFilter,
      pushDownDynamicLogicExpression)

    val getList = new util.ArrayList[Get]()
    val rddList = new util.ArrayList[RDD[Row]]()

    // add points to getList
    pushDownRowKeyFilter.points.foreach(
      p => {
        val get = new Get(p)
        requiredQualifierDefinitionList.foreach(
          d => {
            if (d.isRowKey)
              get.addColumn(d.cfBytes, d.colBytes)
          })
        getList.add(get)
      })

    val pushDownFilterJava =
      if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
        Some(
          new SparkSQLPushDownFilter(
            pushDownDynamicLogicExpression,
            valueArray,
            requiredQualifierDefinitionList,
            encoderClsName))
      } else {
        None
      }
    val hRdd = new HBaseTableScanRDD(
      this,
      hbaseContext,
      pushDownFilterJava,
      requiredQualifierDefinitionList.seq)
    pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
    pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))

    var resultRDD: RDD[Row] = {
      val tmp = hRdd.map {
        r =>
          val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
          buildRow(indexedFields, r)

      }
      if (tmp.partitions.size > 0) {
        tmp
      } else {
        null
      }
    }

    if (resultRDD == null) {
      val scan = new Scan()
      scan.setCacheBlocks(blockCacheEnable)
      scan.setBatch(batchNum)
      scan.setCaching(cacheSize)
      requiredQualifierDefinitionList.foreach(d => scan.addColumn(d.cfBytes, d.colBytes))

      val rdd = hbaseContext
        .hbaseRDD(TableName.valueOf(tableName), scan)
        .map(
          r => {
            val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
            buildRow(indexedFields, r._2)
          })
      resultRDD = rdd
    }
    resultRDD
  }