in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala [105:140]
def parseConfig(config: String): Map[String, String] = {
lazy val content = DeflaterUtils.unzipString(config.drop(7))
def readConfig(text: String): Map[String, String] = {
val format = config.split("\\.").last.toLowerCase
format match {
case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
case "conf" => PropertiesUtils.fromHoconText(text)
case "properties" => PropertiesUtils.fromPropertiesText(text)
case _ =>
throw new IllegalArgumentException(
"[StreamPark] Usage: application config file error,must be [yaml|conf|properties]")
}
}
val map = config match {
case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
case x if x.startsWith("conf://") =>
PropertiesUtils.fromHoconText(content)
case x if x.startsWith("prop://") =>
PropertiesUtils.fromPropertiesText(content)
case x if x.startsWith("hdfs://") =>
// If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
val text = HdfsUtils.read(x)
readConfig(text)
case _ =>
val configFile = new File(config)
require(
configFile.exists(),
s"[StreamPark] Usage: application config file: $configFile is not found!!!")
val text = FileUtils.readFile(configFile)
readConfig(text)
}
map.filter(_._2.nonEmpty)
}