in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala [42:124]
def main(args: Array[String]): Unit = print(read(args))
def read(args: Array[String]): String = {
args(0) match {
case "--vmopt" =>
// solved jdk1.8+ dynamic loading of resources to the classpath problem
ClassLoader.getSystemClassLoader match {
case c if c.isInstanceOf[URLClassLoader] => ""
case _ =>
"--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED"
}
case action =>
val conf = args(1)
val map = Try {
val extension = conf.split("\\.").last.toLowerCase
extension match {
case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
case "conf" => PropertiesUtils.fromHoconFile(conf)
case "properties" => PropertiesUtils.fromPropertiesFile(conf)
case _ =>
throw new IllegalArgumentException(
"[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)")
}
} match {
case Success(value) => value
case _ => Map.empty[String, String]
}
val programArgs = args.drop(2)
action match {
case "--option" =>
val option = getOption(map, programArgs)
val buffer = new StringBuffer()
Try {
val line = parser.parse(flinkOptions, option, false)
line.getOptions.foreach(x => {
buffer.append(s" -${x.getOpt}")
if (x.hasArg) {
buffer.append(s" ${x.getValue()}")
}
})
} match {
case Failure(exception) => exception.printStackTrace()
case _ =>
}
map.getOrElse(optionMain, null) match {
case null =>
case mainClass => buffer.append(s" -c $mainClass")
}
buffer.toString.trim
case "--property" =>
val buffer = new StringBuffer()
map
.filter(x => x._1 != optionMain && x._1.startsWith(propertyPrefix) && x._2.nonEmpty)
.foreach {
x =>
val key = x._1.drop(propertyPrefix.length).trim
val value = x._2.trim
if (key == ConfigKeys.KEY_FLINK_APP_NAME) {
buffer.append(s" -D$key=${value.replace(" ", "_")}")
} else {
buffer.append(s" -D$key=$value")
}
}
buffer.toString.trim
case "--name" =>
map
.getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "")
.trim match {
case appName if appName.nonEmpty => appName
case _ => ""
}
// is detached mode
case "--detached" =>
val option = getOption(map, programArgs)
val line = parser.parse(FlinkRunOption.allOptions, option, false)
val detached = line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt) || line.hasOption(
FlinkRunOption.DETACHED_OPTION.getLongOpt)
val mode = if (detached) "Detached" else "Attach"
mode
case _ => null
}
}
}