override def buildScan()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [311:392]


  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

    val pushDownTuple = buildPushDownPredicatesResource(filters)
    val pushDownRowKeyFilter = pushDownTuple._1
    var pushDownDynamicLogicExpression = pushDownTuple._2
    val valueArray = pushDownTuple._3

    if (!usePushDownColumnFilter) {
      pushDownDynamicLogicExpression = null
    }

    logDebug("pushDownRowKeyFilter:           " + pushDownRowKeyFilter.ranges)
    if (pushDownDynamicLogicExpression != null) {
      logDebug("pushDownDynamicLogicExpression: " +
        pushDownDynamicLogicExpression.toExpressionString)
    }
    logDebug("valueArray:                     " + valueArray.length)

    val requiredQualifierDefinitionList =
      new mutable.MutableList[Field]

    requiredColumns.foreach( c => {
      val field = catalog.getField(c)
      requiredQualifierDefinitionList += field
    })

    //retain the information for unit testing checks
    DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
      pushDownDynamicLogicExpression)

    val getList = new util.ArrayList[Get]()
    val rddList = new util.ArrayList[RDD[Row]]()

    //add points to getList
    pushDownRowKeyFilter.points.foreach(p => {
      val get = new Get(p)
      requiredQualifierDefinitionList.foreach( d => {
        if (d.isRowKey)
          get.addColumn(d.cfBytes, d.colBytes)
      })
      getList.add(get)
    })

    val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
        Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
          valueArray, requiredQualifierDefinitionList, encoderClsName))
    } else {
      None
    }
    val hRdd = new HBaseTableScanRDD(this, hbaseContext, pushDownFilterJava, requiredQualifierDefinitionList.seq)
    pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
    pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))

    var resultRDD: RDD[Row] = {
      val tmp = hRdd.map{ r =>
        val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
        buildRow(indexedFields, r)

      }
      if (tmp.partitions.size > 0) {
        tmp
      } else {
        null
      }
    }

    if (resultRDD == null) {
      val scan = new Scan()
      scan.setCacheBlocks(blockCacheEnable)
      scan.setBatch(batchNum)
      scan.setCaching(cacheSize)
      requiredQualifierDefinitionList.foreach( d =>
        scan.addColumn(d.cfBytes, d.colBytes))

      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
        val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
        buildRow(indexedFields, r._2)
      })
      resultRDD=rdd
    }
    resultRDD
  }