in streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala [122:170]
def hasProp(key: String): Boolean = MapUtils.isNotEmpty(properties) && properties.containsKey(key)
def getProp(key: String): Any = properties.get(key)
def hasExtra(key: String): Boolean = MapUtils.isNotEmpty(extraParameter) && extraParameter.containsKey(key)
def getExtra(key: String): Any = extraParameter.get(key)
private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
if (this.appConf == null) {
return Map.empty[String, String]
}
val format = this.appConf.substring(0, 7)
if (format == "json://") {
val json = this.appConf.drop(7)
new ObjectMapper()
.readValue[JavaMap[String, String]](json, classOf[JavaMap[String, String]])
.toMap
.filter(_._2 != null)
} else {
lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7))
val map = format match {
case "yaml://" => PropertiesUtils.fromYamlText(content)
case "conf://" => PropertiesUtils.fromHoconText(content)
case "prop://" => PropertiesUtils.fromPropertiesText(content)
case "hdfs://" =>
/**
* If the configuration file is HDFS mode, you need to copy the HDFS related configuration
* file to resources.
*/
val text = HdfsUtils.read(this.appConf)
val extension = this.appConf.split("\\.").last.toLowerCase
extension match {
case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
case "conf" => PropertiesUtils.fromHoconText(text)
case "properties" => PropertiesUtils.fromPropertiesText(text)
case _ =>
throw new IllegalArgumentException(
"[StreamPark] Usage: application config format error,must be [yaml|conf|properties]")
}
case _ =>
throw new IllegalArgumentException("[StreamPark] application config format error.")
}
map
.filter(_._1.startsWith(prefix))
.filter(_._2.nonEmpty)
.map(x => x._1.drop(prefix.length) -> x._2)
}
}