def initEnvironment()

in streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala [204:306]


  def initEnvironment(tableMode: TableMode): Unit = {
    val builder = EnvironmentSettings.newInstance()
    val parameter = configuration.parameter
    Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
      .getOrElse(PlannerType.blink) match {
      case PlannerType.blink =>
        val useBlinkPlanner =
          Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
        if (useBlinkPlanner == null) {
          logWarn("useBlinkPlanner deprecated")
        } else {
          useBlinkPlanner.setAccessible(true)
          useBlinkPlanner.invoke(builder)
          logInfo("blinkPlanner will be use.")
        }
      case PlannerType.old =>
        val useOldPlanner = Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
        if (useOldPlanner == null) {
          logWarn("useOldPlanner deprecated")
        } else {
          useOldPlanner.setAccessible(true)
          useOldPlanner.invoke(builder)
          logInfo("useOldPlanner will be use.")
        }
      case PlannerType.any =>
        val useAnyPlanner = Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
        if (useAnyPlanner == null) {
          logWarn("useAnyPlanner deprecated")
        } else {
          logInfo("useAnyPlanner will be use.")
          useAnyPlanner.setAccessible(true)
          useAnyPlanner.invoke(builder)
        }
    }

    val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
    mode match {
      case TableMode.batch =>
        logInfo(s"components should work in $tableMode mode")
        builder.inBatchMode()
      case TableMode.streaming =>
        logInfo(s"components should work in $tableMode mode")
        builder.inStreamingMode()
    }

    val buildWith =
      (parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE))
    buildWith match {
      case (x: String, y: String) if x != null && y != null =>
        logInfo(s"with built in catalog: $x")
        logInfo(s"with built in database: $y")
        builder.withBuiltInCatalogName(x)
        builder.withBuiltInDatabaseName(y)
      case (x: String, _) if x != null =>
        logInfo(s"with built in catalog: $x")
        builder.withBuiltInCatalogName(x)
      case (_, y: String) if y != null =>
        logInfo(s"with built in database: $y")
        builder.withBuiltInDatabaseName(y)
      case _ =>
    }
    val setting = builder.build()
    tableMode match {
      case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
      case TableMode.streaming =>
        initEnvironment()
        if (streamEnvConfFunc != null) {
          streamEnvConfFunc(streamEnvironment, parameter)
        }
        if (javaStreamEnvConfFunc != null) {
          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, parameter)
        }
        localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, setting)
    }
    val appName = parameter.getAppName()
    if (appName != null) {
      tableMode match {
        case TableMode.batch =>
          localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
        case TableMode.streaming =>
          localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
      }
    }

    apiType match {
      case ApiType.java =>
        if (javaTableEnvConfFunc != null) {
          tableMode match {
            case TableMode.batch =>
              javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter)
            case TableMode.streaming =>
              javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
          }
        }
      case ApiType.scala =>
        if (tableConfFunc != null) {
          tableMode match {
            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, parameter)
            case TableMode.streaming => tableConfFunc(localStreamTableEnv.getConfig, parameter)
          }
        }
    }
  }