in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala [36:92]
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseForeachPartitionExample {tableName} {columnFamily} are missing an arguments")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName(
"HBaseForeachPartitionExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
// [(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
val rdd = sc.parallelize(
Array(
(
Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(
Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(
Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(
Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(
Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
rdd.hbaseForeachPartition(
hbaseContext,
(it, connection) => {
val m = connection.getBufferedMutator(TableName.valueOf(tableName))
it.foreach(
r => {
val put = new Put(r._1)
r._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
m.mutate(put)
})
m.flush()
m.close()
})
} finally {
sc.stop()
}
}