def main()

in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkHBaseDemo.scala [48:118]


  def main(args: Array[String]): Unit = {
    if (args.length < 7) {
      System.err.println(
        """Usage: spark-submit --class SparkHBaseDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret>
          |         <consumerId> <topic> <subExpression> <parallelism> <tableName> <quorum>
          |
          |Arguments:
          |
          |    accessKeyId      Aliyun Access Key ID.
          |    accessKeySecret  Aliyun Key Secret.
          |    consumerId       ONS ConsumerID.
          |    topic            ONS topic.
          |    subExpression    * for all, or some specific tag.
          |    tableName        The name of HBase table.
          |    quorum           HBase quorum setting.
          |
        """.stripMargin)
      System.exit(1)
    }

    val Array(accessKeyId, accessKeySecret, consumerId, topic, subExpression, tname, quorum) = args

    val COLUMN_FAMILY_BYTES = Bytes.toBytes("count")
    val COLUMN_QUALIFIER_BYTES = Bytes.toBytes("count")

    val batchInterval = Seconds(2)

    val conf = new SparkConf().setAppName("E-MapReduce Demo 9: Spark HBase Demo (Scala)")
    val ssc = new StreamingContext(conf, batchInterval)
    def func: Message => Array[Byte] = msg => msg.getBody
    val onsStream = OnsUtils.createStream(
        ssc,
        consumerId,
        topic,
        subExpression,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK_2,
        func)

    onsStream.foreachRDD(rdd => {
      rdd.map(bytes => new String(bytes))
        .flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
        .mapPartitions {words => {
        val conn = ConnectionUtil.getDefaultConn(quorum)
        val tableName = TableName.valueOf(tname)
        val t = conn.getTable(tableName)
        try {
          words.sliding(100, 100).foreach(slice => {
            val puts = slice.map(word => {
              println(s"word: $word")
              val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis()))
              put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
                System.currentTimeMillis(), Bytes.toBytes(word._2))
              put
            }).toList
            t.put(puts)
          })
        } finally {
          t.close()
        }

        Iterator.empty
      }}.count()
    })

    ssc.start()
    ssc.awaitTermination()
  }