private def createFlinkILoop()

in flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala [274:426]


  private def createFlinkILoop(config: Config): Unit = {
    val printReplOutput = properties.getProperty("zeppelin.flink.printREPLOutput", "true").toBoolean
    val replOut = if (printReplOutput) {
      new JPrintWriter(interpreterOutput, true)
    } else {
      new JPrintWriter(Console.out, true)
    }

    val (iLoop, cluster) = {
      // workaround of checking hadoop jars in yarn mode
      if (mode == ExecutionMode.YARN) {
        try {
          Class.forName(classOf[FlinkYarnSessionCli].getName)
        } catch {
          case e: ClassNotFoundException =>
            throw new InterpreterException("Unable to load FlinkYarnSessionCli for yarn mode", e)
          case e: NoClassDefFoundError =>
            throw new InterpreterException("No hadoop jar found, make sure you have hadoop command in your PATH", e)
        }
      }

      val (effectiveConfig, cluster) = fetchConnectionInfo(config, configuration, flinkShims)
      this.configuration = effectiveConfig
      cluster match {
        case Some(clusterClient) =>
          // local mode or yarn
          if (mode == ExecutionMode.LOCAL) {
            LOGGER.info("Starting FlinkCluster in local mode")
            this.jmWebUrl = clusterClient.getWebInterfaceURL
            this.displayedJMWebUrl = this.jmWebUrl
          } else if (mode == ExecutionMode.YARN) {
            LOGGER.info("Starting FlinkCluster in yarn mode")
            this.jmWebUrl = clusterClient.getWebInterfaceURL
            this.yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
            this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
          } else {
            throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
          }
        case None =>
          // yarn-application mode
          if (ExecutionMode.isYarnApplicationMode(mode)) {
            // get yarnAppId from env `_APP_ID`
            this.yarnAppId = System.getenv("_APP_ID")
            LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
            this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
            this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
          } else if (ExecutionMode.isK8sApplicationMode(mode)) {
            LOGGER.info("Use FlinkCluster in kubernetes-application mode")
            this.jmWebUrl = "http://localhost:" + configuration.getInteger("rest.port", 8081)
            this.displayedJMWebUrl = this.jmWebUrl
          } else {
            LOGGER.info("Use FlinkCluster in remote mode")
            this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
            this.displayedJMWebUrl = getDisplayedJMWebUrl("")
          }
      }

      LOGGER.info(s"\nConnecting to Flink cluster: " + this.jmWebUrl)
      if (InterpreterContext.get() != null) {
        InterpreterContext.get().getIntpEventClient.sendWebUrlInfo(this.jmWebUrl)
      }
      LOGGER.info("externalJars: " +
        config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
      val classLoader = Thread.currentThread().getContextClassLoader
      try {
        // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
        // the TableFactory properly
        Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
        val repl = new FlinkILoop(configuration, config.externalJars, None, replOut, mode,
          JExecutionEnvironment.getExecutionEnvironment,
          JStreamExecutionEnvironment.getExecutionEnvironment,
          this)
        (repl, cluster)
      } catch {
        case e: Exception =>
          LOGGER.error(ExceptionUtils.getStackTrace(e))
          throw e
      } finally {
        Thread.currentThread().setContextClassLoader(classLoader)
      }
    }

    this.flinkILoop = iLoop
    this.cluster = cluster

    val settings = createSettings()

    val outputDir = Files.createTempDirectory("flink-repl");
    val interpArguments = List(
      "-Yrepl-class-based",
      "-Yrepl-outdir", s"${outputDir.toFile.getAbsolutePath}"
    )
    settings.processArguments(interpArguments, true)

    flinkILoop.settings = settings
    flinkILoop.intp = createIMain(settings, replOut)
    flinkILoop.initEnvironments()

    flinkILoop.intp.beQuietDuring {
      // set execution environment
      flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
      flinkILoop.intp.bind("senv", flinkILoop.scalaSenv)

      val packageImports = Seq[String](
        "org.apache.flink.core.fs._",
        "org.apache.flink.core.fs.local._",
        "org.apache.flink.api.common.io._",
        "org.apache.flink.api.common.aggregators._",
        "org.apache.flink.api.common.accumulators._",
        "org.apache.flink.api.common.distributions._",
        "org.apache.flink.api.common.operators._",
        "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
        "org.apache.flink.api.common.functions._",
        "org.apache.flink.api.java.io._",
        "org.apache.flink.api.java.aggregation._",
        "org.apache.flink.api.java.functions._",
        "org.apache.flink.api.java.operators._",
        "org.apache.flink.api.java.sampling._",
        "org.apache.flink.api.scala._",
        "org.apache.flink.api.scala.utils._",
        "org.apache.flink.streaming.api.scala._",
        "org.apache.flink.streaming.api.windowing.time._",
        "org.apache.flink.types.Row"
      )

      flinkILoop.intp.interpret("import " + packageImports.mkString(", "))
      flinkILoop.intp.interpret("import org.apache.flink.table.api._")
      flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._")
      flinkILoop.intp.interpret("import org.apache.flink.table.functions.ScalarFunction")
      flinkILoop.intp.interpret("import org.apache.flink.table.functions.AggregateFunction")
      flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableFunction")
      flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableAggregateFunction")
    }

    val in0 = None
    val reader = in0.fold(flinkILoop.chooseReader(settings))(r =>
      SimpleReader(r, replOut, interactive = true))

    flinkILoop.in = reader
    flinkILoop.initializeSynchronous()
    flinkILoop.intp.setContextClassLoader()
    reader.postInit()
    this.scalaCompletion = reader.completion

    this.benv = flinkILoop.scalaBenv
    this.senv = flinkILoop.scalaSenv
    val timeType = properties.getProperty("flink.senv.timecharacteristic", "EventTime")
    this.senv.setStreamTimeCharacteristic(TimeCharacteristic.valueOf(timeType))
    this.benv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
    this.senv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))

    setAsContext()
  }