in flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala [152:272]
def createIMain(settings: Settings, out: JPrintWriter): IMain
def createSettings(): Settings
private def initFlinkConfig(): Config = {
this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
LOGGER.info("Using flink: " + flinkVersion)
this.flinkShims = FlinkShims.getInstance(flinkVersion, properties)
var flinkHome = sys.env.getOrElse("FLINK_HOME", "")
var flinkConfDir = sys.env.getOrElse("FLINK_CONF_DIR", "")
val hadoopConfDir = sys.env.getOrElse("HADOOP_CONF_DIR", "")
val yarnConfDir = sys.env.getOrElse("YARN_CONF_DIR", "")
var hiveConfDir = sys.env.getOrElse("HIVE_CONF_DIR", "")
mode = ExecutionMode.withName(
properties.getProperty("flink.execution.mode", "LOCAL")
.replace("-", "_")
.toUpperCase)
if (ExecutionMode.isYarnApplicationMode(mode)) {
// use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
val workingDirectory = new File(".").getAbsolutePath
flinkHome = workingDirectory
flinkConfDir = workingDirectory
hiveConfDir = workingDirectory
}
if (ExecutionMode.isK8sApplicationMode(mode)) {
// use current pod working directory as FLINK_HOME
val workingDirectory = new File(".").getAbsolutePath
flinkHome = workingDirectory
flinkConfDir = workingDirectory + "/conf"
hiveConfDir = workingDirectory + "/conf"
}
LOGGER.info("FLINK_HOME: " + flinkHome)
LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir)
LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir)
LOGGER.info("YARN_CONF_DIR: " + yarnConfDir)
LOGGER.info("HIVE_CONF_DIR: " + hiveConfDir)
this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
var config = Config(executionMode = mode)
val jmMemory = properties.getProperty("jobmanager.memory.process.size",
properties.getProperty("flink.jm.memory", "1024"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(jobManagerMemory = Some(jmMemory))))
val tmMemory = properties.getProperty("taskmanager.memory.process.size",
properties.getProperty("flink.tm.memory", "1024"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(taskManagerMemory = Some(tmMemory))))
val appName = properties.getProperty("yarn.application.name",
properties.getProperty("flink.yarn.appName", "Flink Yarn App Name"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(name = Some(appName))))
val slotNum = Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
properties.getProperty("flink.tm.slot", "1")))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(slots = Some(slotNum))))
val queue = properties.getProperty("yarn.application.queue",
properties.getProperty("flink.yarn.queue", "default"))
config = config.copy(yarnConfig =
Some(ensureYarnConfig(config)
.copy(queue = Some(queue))))
this.userUdfJars = getUserUdfJars()
this.userJars = getUserJarsExceptUdfJars ++ this.userUdfJars
if (ExecutionMode.isK8sApplicationMode(mode)) {
var flinkAppJar = properties.getProperty("flink.app.jar")
if (flinkAppJar != null && flinkAppJar.startsWith("local://")) {
flinkAppJar = flinkAppJar.substring(8)
this.userJars = this.userJars :+ flinkAppJar
} else {
throw new IOException("flink.app.jar is not set or invalid, flink.app.jar: " + flinkAppJar)
}
}
LOGGER.info("UserJars: " + userJars.mkString(","))
config = config.copy(externalJars = Some(userJars.toArray))
LOGGER.info("Config: " + config)
configuration.setString("flink.yarn.jars", userJars.mkString(":"))
// load other configuration from interpreter properties
properties.asScala.foreach(entry => configuration.setString(entry._1, entry._2))
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)
this.defaultSqlParallelism = configuration.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
LOGGER.info("Default Parallelism: " + this.defaultParallelism)
LOGGER.info("Default SQL Parallelism: " + this.defaultSqlParallelism)
// set scala.color
if (properties.getProperty("zeppelin.flink.scala.color", "true").toBoolean) {
System.setProperty("scala.color", "true")
}
// set host/port when it is remote mode
if (config.executionMode == ExecutionMode.REMOTE) {
val host = properties.getProperty("flink.execution.remote.host")
val port = properties.getProperty("flink.execution.remote.port")
if (host == null) {
throw new InterpreterException("flink.execution.remote.host is not " +
"specified when using REMOTE mode")
}
if (port == null) {
throw new InterpreterException("flink.execution.remote.port is not " +
"specified when using REMOTE mode")
}
config = config.copy(host = Some(host))
.copy(port = Some(Integer.parseInt(port)))
}
config
}