in streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala [124:188]
final private def init(args: Array[String]): Unit = {
var argv = args.toList
var conf: String = null
val userArgs = ArrayBuffer[(String, String)]()
while (argv.nonEmpty) {
argv match {
case "--conf" :: value :: tail =>
conf = value
argv = tail
case "--checkpoint" :: value :: tail =>
checkpoint = value
argv = tail
case "--createOnError" :: value :: tail =>
createOnError = value.toBoolean
argv = tail
case Nil =>
case other :: value :: tail if other.startsWith(PARAM_PREFIX) =>
userArgs += other.drop(2) -> value
argv = tail
case tail =>
logError(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
}
}
if (conf != null) {
val localConf = conf.split("\\.").last match {
case "conf" => PropertiesUtils.fromHoconFile(conf)
case "properties" => PropertiesUtils.fromPropertiesFile(conf)
case "yaml" | "yml" => PropertiesUtils.fromYamlFile(conf)
case _ =>
throw new IllegalArgumentException(
"[StreamPark] Usage: config file error,must be [properties|yaml|conf]")
}
localConf.foreach(arg => sparkConf.set(arg._1, arg._2))
}
userArgs.foreach(arg => sparkConf.set(arg._1, arg._2))
val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, "org.apache.streampark.spark.cli.SqlClient")
if (appMain == null) {
logError(s"[StreamPark] parameter: $KEY_SPARK_MAIN_CLASS must not be empty!")
System.exit(1)
}
val appName = sparkConf.get(KEY_SPARK_APP_NAME, null) match {
case null | "" => appMain
case name => name
}
// debug mode
val localMode = sparkConf.get("spark.master", null) == "local"
if (localMode) {
sparkConf.setAppName(s"[LocalDebug] $appName").setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
}
// stop...
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val extraListeners =
sparkListeners.mkString(",") + "," + sparkConf.get("spark.extraListeners", "")
if (extraListeners != "") {
sparkConf.set("spark.extraListeners", extraListeners)
}
}