def main()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala [36:98]


  def main(args: Array[String]) {
    if (args.length < 1) {
      println("HBaseMapPartitionExample {tableName} is missing an argument")
      return
    }

    val tableName = args(0)

    val sparkConf = new SparkConf().setAppName("HBaseMapPartitionExample " + tableName)
    val sc = new SparkContext(sparkConf)

    try {

      // [(Array[Byte])]
      val rdd = sc.parallelize(
        Array(
          Bytes.toBytes("1"),
          Bytes.toBytes("2"),
          Bytes.toBytes("3"),
          Bytes.toBytes("4"),
          Bytes.toBytes("5"),
          Bytes.toBytes("6"),
          Bytes.toBytes("7")))

      val conf = HBaseConfiguration.create()

      val hbaseContext = new HBaseContext(sc, conf)

      val getRdd = rdd.hbaseMapPartitions[String](
        hbaseContext,
        (it, connection) => {
          val table = connection.getTable(TableName.valueOf(tableName))
          it.map {
            r =>
              // batching would be faster.  This is just an example
              val result = table.get(new Get(r))

              val it = result.listCells().iterator()
              val b = new StringBuilder

              b.append(Bytes.toString(result.getRow) + ":")

              while (it.hasNext) {
                val cell = it.next()
                val q = Bytes.toString(cell.getQualifierArray)
                if (q.equals("counter")) {
                  b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
                } else {
                  b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
                }
              }
              b.toString()
          }
        })

      getRdd
        .collect()
        .foreach(v => println(v))

    } finally {
      sc.stop()
    }
  }