def runJob()

in src/main/scala/com/aliyun/emr/example/spark/streaming/benchmark/AbstractStreaming.scala [17:46]


  def runJob(args: Array[String]): Unit = {
    config = loadConfig(args(0))
    val receiverCores = config.getProperty("partition.number").toInt / config.getProperty("kafka.partition.receiver.factor").toInt
    val executorCore = (config.getProperty("cluster.cores.total").toInt * config.getProperty("cpu.core.factor").toFloat - receiverCores).toInt/config.getProperty("spark.executor.instances").toInt
    val executorMem = config.getProperty("cluster.memory.per.node.mb").toInt * config.getProperty("cluster.worker.node.number").toInt / config.getProperty("spark.executor.instances").toInt
    val sparkConf = new SparkConf()
      .setAppName(config.getProperty("name"))
      .set("spark.yarn.am.memory.mb", config.getProperty("spark.yarn.am.memory.mb") + "m")
      .set("spark.yarn.am.cores", config.getProperty("spark.yarn.am.cores"))
      .set("spark.executor.instances", config.getProperty("spark.executor.instances"))
      .set("spark.executor.cores", executorCore.toString)
      .set("spark.executor.memory", executorMem + "m")
      .set("spark.streaming.blockInterval", config.getProperty("spark.streaming.blockInterval"))
    val ssc = new StreamingContext(new SparkContext(sparkConf), Duration(config.getProperty("duration.ms").toLong))

    val kafkaParam = Map[String, Object](
      "bootstrap.servers" -> config.getProperty("broker.list"),
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> config.getProperty("consumer.group"),
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config.getProperty("topic")), kafkaParam))

    execute(stream)

    ssc.start()
    ssc.awaitTermination()
  }