private[this] def extractProgramArgs()

in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala [407:482]


  private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = {

    val programArgs = new ArrayBuffer[String]()

    if (StringUtils.isNotEmpty(submitRequest.args)) {
      val multiLineChar = "\"\"\""
      val array = submitRequest.args.split("\\s+")
      if (!array.exists(_.startsWith(multiLineChar))) {
        array.foreach(programArgs +=)
      } else {
        val argsArray = new ArrayBuffer[String]()
        val tempBuffer = new ArrayBuffer[String]()

        @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
          if (index == array.length) {
            if (tempBuffer.nonEmpty) {
              argsArray += tempBuffer.mkString(" ")
            }
            return
          }
          val next = index + 1
          val elem = array(index)

          if (elem.trim.nonEmpty) {
            if (!multiLine) {
              if (elem.startsWith(multiLineChar)) {
                tempBuffer += elem.drop(3)
                processElement(next, multiLine = true)
              } else {
                argsArray += elem
                processElement(next, multiLine = false)
              }
            } else {
              if (elem.endsWith(multiLineChar)) {
                tempBuffer += elem.dropRight(3)
                argsArray += tempBuffer.mkString(" ")
                tempBuffer.clear()
                processElement(next, multiLine = false)
              } else {
                tempBuffer += elem
                processElement(next, multiLine)
              }
            }
          } else {
            tempBuffer += elem
            processElement(next, multiLine = false)
          }
        }

        processElement(0, multiLine = false)
        argsArray.foreach(x => programArgs += x.trim)
      }
    }

    if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
      programArgs += PARAM_KEY_FLINK_CONF
      programArgs += submitRequest.flinkYaml
      programArgs += PARAM_KEY_APP_NAME
      programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
      programArgs += PARAM_KEY_FLINK_PARALLELISM
      programArgs += getParallelism(submitRequest).toString
      submitRequest.developmentMode match {
        case DevelopmentMode.FLINK_SQL =>
          programArgs += PARAM_KEY_FLINK_SQL
          programArgs += submitRequest.flinkSQL
          if (submitRequest.appConf != null) {
            programArgs += PARAM_KEY_APP_CONF
            programArgs += submitRequest.appConf
          }
        case _ if Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
          programArgs += PARAM_KEY_APP_CONF
          programArgs += submitRequest.appConf
      }
    }
    programArgs.toList.asJava
  }