in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala [204:306]
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
val parameter = configuration.parameter
Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
.getOrElse(PlannerType.blink) match {
case PlannerType.blink =>
val useBlinkPlanner =
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
if (useBlinkPlanner == null) {
logWarn("useBlinkPlanner deprecated")
} else {
useBlinkPlanner.setAccessible(true)
useBlinkPlanner.invoke(builder)
logInfo("blinkPlanner will be use.")
}
case PlannerType.old =>
val useOldPlanner = Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
if (useOldPlanner == null) {
logWarn("useOldPlanner deprecated")
} else {
useOldPlanner.setAccessible(true)
useOldPlanner.invoke(builder)
logInfo("useOldPlanner will be use.")
}
case PlannerType.any =>
val useAnyPlanner = Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
if (useAnyPlanner == null) {
logWarn("useAnyPlanner deprecated")
} else {
logInfo("useAnyPlanner will be use.")
useAnyPlanner.setAccessible(true)
useAnyPlanner.invoke(builder)
}
}
val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
case TableMode.batch =>
logInfo(s"components should work in $tableMode mode")
builder.inBatchMode()
case TableMode.streaming =>
logInfo(s"components should work in $tableMode mode")
builder.inStreamingMode()
}
val buildWith =
(parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE))
buildWith match {
case (x: String, y: String) if x != null && y != null =>
logInfo(s"with built in catalog: $x")
logInfo(s"with built in database: $y")
builder.withBuiltInCatalogName(x)
builder.withBuiltInDatabaseName(y)
case (x: String, _) if x != null =>
logInfo(s"with built in catalog: $x")
builder.withBuiltInCatalogName(x)
case (_, y: String) if y != null =>
logInfo(s"with built in database: $y")
builder.withBuiltInDatabaseName(y)
case _ =>
}
val setting = builder.build()
tableMode match {
case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
case TableMode.streaming =>
initEnvironment()
if (streamEnvConfFunc != null) {
streamEnvConfFunc(streamEnvironment, parameter)
}
if (javaStreamEnvConfFunc != null) {
javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, parameter)
}
localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, setting)
}
val appName = parameter.getAppName()
if (appName != null) {
tableMode match {
case TableMode.batch =>
localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
case TableMode.streaming =>
localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
}
}
apiType match {
case ApiType.java =>
if (javaTableEnvConfFunc != null) {
tableMode match {
case TableMode.batch =>
javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
case TableMode.streaming =>
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
}
}
case ApiType.scala =>
if (tableConfFunc != null) {
tableMode match {
case TableMode.batch => tableConfFunc(localTableEnv.getConfig, parameter)
case TableMode.streaming => tableConfFunc(localStreamTableEnv.getConfig, parameter)
}
}
}
}