def main()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala [35:78]


  def main(args: Array[String]) {
    if (args.length < 2) {
      System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily} are missing an argument")
      return
    }

    val tableName = args(0)
    val columnFamily = args(1)

    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
      tableName + " " + columnFamily)
    val sc = new SparkContext(sparkConf)

    try {

      val rdd = sc.parallelize(Array(
        (Bytes.toBytes("6"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
        (Bytes.toBytes("7"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
        (Bytes.toBytes("8"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
        (Bytes.toBytes("9"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
        (Bytes.toBytes("10"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))

      val conf = HBaseConfiguration.create()

      val timeStamp = System.currentTimeMillis()

      val hbaseContext = new HBaseContext(sc, conf)
      hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
        TableName.valueOf(tableName),
        (putRecord) => {
          val put = new Put(putRecord._1)
          putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
            timeStamp, putValue._3))
          put
        })
    } finally {
      sc.stop()
    }
  }