def main()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala [36:92]


  def main(args: Array[String]) {
    if (args.length < 2) {
      println("HBaseForeachPartitionExample {tableName} {columnFamily} are missing an arguments")
      return
    }

    val tableName = args(0)
    val columnFamily = args(1)

    val sparkConf = new SparkConf().setAppName(
      "HBaseForeachPartitionExample " +
        tableName + " " + columnFamily)
    val sc = new SparkContext(sparkConf)

    try {
      // [(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
      val rdd = sc.parallelize(
        Array(
          (
            Bytes.toBytes("1"),
            Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
          (
            Bytes.toBytes("2"),
            Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
          (
            Bytes.toBytes("3"),
            Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
          (
            Bytes.toBytes("4"),
            Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
          (
            Bytes.toBytes("5"),
            Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))

      val conf = HBaseConfiguration.create()

      val hbaseContext = new HBaseContext(sc, conf)

      rdd.hbaseForeachPartition(
        hbaseContext,
        (it, connection) => {
          val m = connection.getBufferedMutator(TableName.valueOf(tableName))

          it.foreach(
            r => {
              val put = new Put(r._1)
              r._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
              m.mutate(put)
            })
          m.flush()
          m.close()
        })

    } finally {
      sc.stop()
    }
  }