final private def init()

in streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala [124:188]


  final private def init(args: Array[String]): Unit = {
    var argv = args.toList
    var conf: String = null
    val userArgs = ArrayBuffer[(String, String)]()

    while (argv.nonEmpty) {
      argv match {
        case "--conf" :: value :: tail =>
          conf = value
          argv = tail
        case "--checkpoint" :: value :: tail =>
          checkpoint = value
          argv = tail
        case "--createOnError" :: value :: tail =>
          createOnError = value.toBoolean
          argv = tail
        case Nil =>
        case other :: value :: tail if other.startsWith(PARAM_PREFIX) =>
          userArgs += other.drop(2) -> value
          argv = tail
        case tail =>
          logError(s"Unrecognized options: ${tail.mkString(" ")}")
          printUsageAndExit()
      }
    }

    if (conf != null) {
      val localConf = conf.split("\\.").last match {
        case "conf" => PropertiesUtils.fromHoconFile(conf)
        case "properties" => PropertiesUtils.fromPropertiesFile(conf)
        case "yaml" | "yml" => PropertiesUtils.fromYamlFile(conf)
        case _ =>
          throw new IllegalArgumentException(
            "[StreamPark] Usage: config file error,must be [properties|yaml|conf]")
      }
      localConf.foreach(arg => sparkConf.set(arg._1, arg._2))
    }
    userArgs.foreach(arg => sparkConf.set(arg._1, arg._2))

    val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, "org.apache.streampark.spark.cli.SqlClient")
    if (appMain == null) {
      logError(s"[StreamPark] parameter: $KEY_SPARK_MAIN_CLASS must not be empty!")
      System.exit(1)
    }

    val appName = sparkConf.get(KEY_SPARK_APP_NAME, null) match {
      case null | "" => appMain
      case name => name
    }

    // debug mode
    val localMode = sparkConf.get("spark.master", null) == "local"
    if (localMode) {
      sparkConf.setAppName(s"[LocalDebug] $appName").setMaster("local[*]")
      sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
    }
    // stop...
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    val extraListeners =
      sparkListeners.mkString(",") + "," + sparkConf.get("spark.extraListeners", "")
    if (extraListeners != "") {
      sparkConf.set("spark.extraListeners", extraListeners)
    }
  }