in flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala [274:426]
private def createFlinkILoop(config: Config): Unit = {
val printReplOutput = properties.getProperty("zeppelin.flink.printREPLOutput", "true").toBoolean
val replOut = if (printReplOutput) {
new JPrintWriter(interpreterOutput, true)
} else {
new JPrintWriter(Console.out, true)
}
val (iLoop, cluster) = {
// workaround of checking hadoop jars in yarn mode
if (mode == ExecutionMode.YARN) {
try {
Class.forName(classOf[FlinkYarnSessionCli].getName)
} catch {
case e: ClassNotFoundException =>
throw new InterpreterException("Unable to load FlinkYarnSessionCli for yarn mode", e)
case e: NoClassDefFoundError =>
throw new InterpreterException("No hadoop jar found, make sure you have hadoop command in your PATH", e)
}
}
val (effectiveConfig, cluster) = fetchConnectionInfo(config, configuration, flinkShims)
this.configuration = effectiveConfig
cluster match {
case Some(clusterClient) =>
// local mode or yarn
if (mode == ExecutionMode.LOCAL) {
LOGGER.info("Starting FlinkCluster in local mode")
this.jmWebUrl = clusterClient.getWebInterfaceURL
this.displayedJMWebUrl = this.jmWebUrl
} else if (mode == ExecutionMode.YARN) {
LOGGER.info("Starting FlinkCluster in yarn mode")
this.jmWebUrl = clusterClient.getWebInterfaceURL
this.yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else {
throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
}
case None =>
// yarn-application mode
if (ExecutionMode.isYarnApplicationMode(mode)) {
// get yarnAppId from env `_APP_ID`
this.yarnAppId = System.getenv("_APP_ID")
LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else if (ExecutionMode.isK8sApplicationMode(mode)) {
LOGGER.info("Use FlinkCluster in kubernetes-application mode")
this.jmWebUrl = "http://localhost:" + configuration.getInteger("rest.port", 8081)
this.displayedJMWebUrl = this.jmWebUrl
} else {
LOGGER.info("Use FlinkCluster in remote mode")
this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
this.displayedJMWebUrl = getDisplayedJMWebUrl("")
}
}
LOGGER.info(s"\nConnecting to Flink cluster: " + this.jmWebUrl)
if (InterpreterContext.get() != null) {
InterpreterContext.get().getIntpEventClient.sendWebUrlInfo(this.jmWebUrl)
}
LOGGER.info("externalJars: " +
config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
val classLoader = Thread.currentThread().getContextClassLoader
try {
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
// the TableFactory properly
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val repl = new FlinkILoop(configuration, config.externalJars, None, replOut, mode,
JExecutionEnvironment.getExecutionEnvironment,
JStreamExecutionEnvironment.getExecutionEnvironment,
this)
(repl, cluster)
} catch {
case e: Exception =>
LOGGER.error(ExceptionUtils.getStackTrace(e))
throw e
} finally {
Thread.currentThread().setContextClassLoader(classLoader)
}
}
this.flinkILoop = iLoop
this.cluster = cluster
val settings = createSettings()
val outputDir = Files.createTempDirectory("flink-repl");
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.toFile.getAbsolutePath}"
)
settings.processArguments(interpArguments, true)
flinkILoop.settings = settings
flinkILoop.intp = createIMain(settings, replOut)
flinkILoop.initEnvironments()
flinkILoop.intp.beQuietDuring {
// set execution environment
flinkILoop.intp.bind("benv", flinkILoop.scalaBenv)
flinkILoop.intp.bind("senv", flinkILoop.scalaSenv)
val packageImports = Seq[String](
"org.apache.flink.core.fs._",
"org.apache.flink.core.fs.local._",
"org.apache.flink.api.common.io._",
"org.apache.flink.api.common.aggregators._",
"org.apache.flink.api.common.accumulators._",
"org.apache.flink.api.common.distributions._",
"org.apache.flink.api.common.operators._",
"org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
"org.apache.flink.api.common.functions._",
"org.apache.flink.api.java.io._",
"org.apache.flink.api.java.aggregation._",
"org.apache.flink.api.java.functions._",
"org.apache.flink.api.java.operators._",
"org.apache.flink.api.java.sampling._",
"org.apache.flink.api.scala._",
"org.apache.flink.api.scala.utils._",
"org.apache.flink.streaming.api.scala._",
"org.apache.flink.streaming.api.windowing.time._",
"org.apache.flink.types.Row"
)
flinkILoop.intp.interpret("import " + packageImports.mkString(", "))
flinkILoop.intp.interpret("import org.apache.flink.table.api._")
flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._")
flinkILoop.intp.interpret("import org.apache.flink.table.functions.ScalarFunction")
flinkILoop.intp.interpret("import org.apache.flink.table.functions.AggregateFunction")
flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableFunction")
flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableAggregateFunction")
}
val in0 = None
val reader = in0.fold(flinkILoop.chooseReader(settings))(r =>
SimpleReader(r, replOut, interactive = true))
flinkILoop.in = reader
flinkILoop.initializeSynchronous()
flinkILoop.intp.setContextClassLoader()
reader.postInit()
this.scalaCompletion = reader.completion
this.benv = flinkILoop.scalaBenv
this.senv = flinkILoop.scalaSenv
val timeType = properties.getProperty("flink.senv.timecharacteristic", "EventTime")
this.senv.setStreamTimeCharacteristic(TimeCharacteristic.valueOf(timeType))
this.benv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
this.senv.setParallelism(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM))
setAsContext()
}