def main()

in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala [42:124]


  def main(args: Array[String]): Unit = print(read(args))

  def read(args: Array[String]): String = {
    args(0) match {
      case "--vmopt" =>
        // solved jdk1.8+ dynamic loading of resources to the classpath problem
        ClassLoader.getSystemClassLoader match {
          case c if c.isInstanceOf[URLClassLoader] => ""
          case _ =>
            "--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED"
        }
      case action =>
        val conf = args(1)
        val map = Try {
          val extension = conf.split("\\.").last.toLowerCase
          extension match {
            case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
            case "conf" => PropertiesUtils.fromHoconFile(conf)
            case "properties" => PropertiesUtils.fromPropertiesFile(conf)
            case _ =>
              throw new IllegalArgumentException(
                "[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)")
          }
        } match {
          case Success(value) => value
          case _ => Map.empty[String, String]
        }
        val programArgs = args.drop(2)
        action match {
          case "--option" =>
            val option = getOption(map, programArgs)
            val buffer = new StringBuffer()
            Try {
              val line = parser.parse(flinkOptions, option, false)
              line.getOptions.foreach(x => {
                buffer.append(s" -${x.getOpt}")
                if (x.hasArg) {
                  buffer.append(s" ${x.getValue()}")
                }
              })
            } match {
              case Failure(exception) => exception.printStackTrace()
              case _ =>
            }
            map.getOrElse(optionMain, null) match {
              case null =>
              case mainClass => buffer.append(s" -c $mainClass")
            }
            buffer.toString.trim
          case "--property" =>
            val buffer = new StringBuffer()
            map
              .filter(x => x._1 != optionMain && x._1.startsWith(propertyPrefix) && x._2.nonEmpty)
              .foreach {
                x =>
                  val key = x._1.drop(propertyPrefix.length).trim
                  val value = x._2.trim
                  if (key == ConfigKeys.KEY_FLINK_APP_NAME) {
                    buffer.append(s" -D$key=${value.replace(" ", "_")}")
                  } else {
                    buffer.append(s" -D$key=$value")
                  }
              }
            buffer.toString.trim
          case "--name" =>
            map
              .getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "")
              .trim match {
              case appName if appName.nonEmpty => appName
              case _ => ""
            }
          // is detached mode
          case "--detached" =>
            val option = getOption(map, programArgs)
            val line = parser.parse(FlinkRunOption.allOptions, option, false)
            val detached = line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt) || line.hasOption(
              FlinkRunOption.DETACHED_OPTION.getLongOpt)
            val mode = if (detached) "Detached" else "Attach"
            mode
          case _ => null
        }
    }
  }