in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkHBaseDemo.scala [48:118]
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println(
"""Usage: spark-submit --class SparkHBaseDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret>
| <consumerId> <topic> <subExpression> <parallelism> <tableName> <quorum>
|
|Arguments:
|
| accessKeyId Aliyun Access Key ID.
| accessKeySecret Aliyun Key Secret.
| consumerId ONS ConsumerID.
| topic ONS topic.
| subExpression * for all, or some specific tag.
| tableName The name of HBase table.
| quorum HBase quorum setting.
|
""".stripMargin)
System.exit(1)
}
val Array(accessKeyId, accessKeySecret, consumerId, topic, subExpression, tname, quorum) = args
val COLUMN_FAMILY_BYTES = Bytes.toBytes("count")
val COLUMN_QUALIFIER_BYTES = Bytes.toBytes("count")
val batchInterval = Seconds(2)
val conf = new SparkConf().setAppName("E-MapReduce Demo 9: Spark HBase Demo (Scala)")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStream = OnsUtils.createStream(
ssc,
consumerId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
onsStream.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.mapPartitions {words => {
val conn = ConnectionUtil.getDefaultConn(quorum)
val tableName = TableName.valueOf(tname)
val t = conn.getTable(tableName)
try {
words.sliding(100, 100).foreach(slice => {
val puts = slice.map(word => {
println(s"word: $word")
val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
System.currentTimeMillis(), Bytes.toBytes(word._2))
put
}).toList
t.put(puts)
})
} finally {
t.close()
}
Iterator.empty
}}.count()
})
ssc.start()
ssc.awaitTermination()
}