def createIMain()

in flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala [152:272]


  def createIMain(settings: Settings, out: JPrintWriter): IMain

  def createSettings(): Settings

  private def initFlinkConfig(): Config = {

    this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
    LOGGER.info("Using flink: " + flinkVersion)
    this.flinkShims = FlinkShims.getInstance(flinkVersion, properties)

    var flinkHome = sys.env.getOrElse("FLINK_HOME", "")
    var flinkConfDir = sys.env.getOrElse("FLINK_CONF_DIR", "")
    val hadoopConfDir = sys.env.getOrElse("HADOOP_CONF_DIR", "")
    val yarnConfDir = sys.env.getOrElse("YARN_CONF_DIR", "")
    var hiveConfDir = sys.env.getOrElse("HIVE_CONF_DIR", "")

    mode = ExecutionMode.withName(
      properties.getProperty("flink.execution.mode", "LOCAL")
        .replace("-", "_")
        .toUpperCase)

    if (ExecutionMode.isYarnApplicationMode(mode)) {
      // use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
      val workingDirectory = new File(".").getAbsolutePath
      flinkHome = workingDirectory
      flinkConfDir = workingDirectory
      hiveConfDir = workingDirectory
    }
    if (ExecutionMode.isK8sApplicationMode(mode)) {
      // use current pod working directory as FLINK_HOME
      val workingDirectory = new File(".").getAbsolutePath
      flinkHome = workingDirectory
      flinkConfDir = workingDirectory + "/conf"
      hiveConfDir = workingDirectory + "/conf"
    }

    LOGGER.info("FLINK_HOME: " + flinkHome)
    LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir)
    LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir)
    LOGGER.info("YARN_CONF_DIR: " + yarnConfDir)
    LOGGER.info("HIVE_CONF_DIR: " + hiveConfDir)

    this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
    var config = Config(executionMode = mode)
    val jmMemory = properties.getProperty("jobmanager.memory.process.size",
      properties.getProperty("flink.jm.memory", "1024"))
    config = config.copy(yarnConfig =
      Some(ensureYarnConfig(config)
        .copy(jobManagerMemory = Some(jmMemory))))

    val tmMemory = properties.getProperty("taskmanager.memory.process.size",
      properties.getProperty("flink.tm.memory", "1024"))
    config = config.copy(yarnConfig =
      Some(ensureYarnConfig(config)
        .copy(taskManagerMemory = Some(tmMemory))))

    val appName = properties.getProperty("yarn.application.name",
      properties.getProperty("flink.yarn.appName", "Flink Yarn App Name"))
    config = config.copy(yarnConfig =
      Some(ensureYarnConfig(config)
        .copy(name = Some(appName))))

    val slotNum =  Integer.parseInt(properties.getProperty("taskmanager.numberOfTaskSlots",
     properties.getProperty("flink.tm.slot", "1")))
    config = config.copy(yarnConfig =
      Some(ensureYarnConfig(config)
        .copy(slots = Some(slotNum))))

    val queue = properties.getProperty("yarn.application.queue",
      properties.getProperty("flink.yarn.queue", "default"))
    config = config.copy(yarnConfig =
      Some(ensureYarnConfig(config)
        .copy(queue = Some(queue))))

    this.userUdfJars = getUserUdfJars()

    this.userJars = getUserJarsExceptUdfJars ++ this.userUdfJars
    if (ExecutionMode.isK8sApplicationMode(mode)) {
      var flinkAppJar = properties.getProperty("flink.app.jar")
      if (flinkAppJar != null && flinkAppJar.startsWith("local://")) {
        flinkAppJar = flinkAppJar.substring(8)
        this.userJars = this.userJars :+ flinkAppJar
      } else {
        throw new IOException("flink.app.jar is not set or invalid, flink.app.jar: " + flinkAppJar)
      }
    }
    LOGGER.info("UserJars: " + userJars.mkString(","))
    config = config.copy(externalJars = Some(userJars.toArray))
    LOGGER.info("Config: " + config)
    configuration.setString("flink.yarn.jars", userJars.mkString(":"))

    // load other configuration from interpreter properties
    properties.asScala.foreach(entry => configuration.setString(entry._1, entry._2))
    this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)
    this.defaultSqlParallelism = configuration.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
    LOGGER.info("Default Parallelism: " + this.defaultParallelism)
    LOGGER.info("Default SQL Parallelism: " + this.defaultSqlParallelism)

    // set scala.color
    if (properties.getProperty("zeppelin.flink.scala.color", "true").toBoolean) {
      System.setProperty("scala.color", "true")
    }

    // set host/port when it is remote mode
    if (config.executionMode == ExecutionMode.REMOTE) {
      val host = properties.getProperty("flink.execution.remote.host")
      val port = properties.getProperty("flink.execution.remote.port")
      if (host == null) {
        throw new InterpreterException("flink.execution.remote.host is not " +
          "specified when using REMOTE mode")
      }
      if (port == null) {
        throw new InterpreterException("flink.execution.remote.port is not " +
          "specified when using REMOTE mode")
      }
      config = config.copy(host = Some(host))
        .copy(port = Some(Integer.parseInt(port)))
    }

    config
  }