in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala [407:482]
private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(submitRequest.args)) {
val multiLineChar = "\"\"\""
val array = submitRequest.args.split("\\s+")
if (!array.exists(_.startsWith(multiLineChar))) {
array.foreach(programArgs +=)
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()
@tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
if (index == array.length) {
if (tempBuffer.nonEmpty) {
argsArray += tempBuffer.mkString(" ")
}
return
}
val next = index + 1
val elem = array(index)
if (elem.trim.nonEmpty) {
if (!multiLine) {
if (elem.startsWith(multiLineChar)) {
tempBuffer += elem.drop(3)
processElement(next, multiLine = true)
} else {
argsArray += elem
processElement(next, multiLine = false)
}
} else {
if (elem.endsWith(multiLineChar)) {
tempBuffer += elem.dropRight(3)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
processElement(next, multiLine = false)
} else {
tempBuffer += elem
processElement(next, multiLine)
}
}
} else {
tempBuffer += elem
processElement(next, multiLine = false)
}
}
processElement(0, multiLine = false)
argsArray.foreach(x => programArgs += x.trim)
}
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
programArgs += PARAM_KEY_FLINK_CONF
programArgs += submitRequest.flinkYaml
programArgs += PARAM_KEY_APP_NAME
programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
programArgs += PARAM_KEY_FLINK_PARALLELISM
programArgs += getParallelism(submitRequest).toString
submitRequest.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
programArgs += PARAM_KEY_FLINK_SQL
programArgs += submitRequest.flinkSQL
if (submitRequest.appConf != null) {
programArgs += PARAM_KEY_APP_CONF
programArgs += submitRequest.appConf
}
case _ if Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
programArgs += PARAM_KEY_APP_CONF
programArgs += submitRequest.appConf
}
}
programArgs.toList.asJava
}