protected def createEnvironmentContext()

in linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala [101:306]


  protected def createEnvironmentContext(
      engineCreationContext: EngineCreationContext
  ): EnvironmentContext = {
    val options = engineCreationContext.getOptions
    val flinkExecutionTarget = FLINK_EXECUTION_TARGET.getValue(options)

    val defaultEnv =
      Environment.parse(this.getClass.getClassLoader.getResource("flink-sql-defaults.yaml"))
    val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
    val flinkHome = FLINK_HOME.getValue(options)
    val flinkConfDir = FLINK_CONF_DIR.getValue(options)
    val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options)
    val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
    var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
    if (
        StringUtils.isNotBlank(flinkVersion) && flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION)
    ) {
      flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", "flink-dist_2.11")
    }
    // Local lib path
    val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
    // Ship directories
    val shipDirsArray = getShipDirectories(options)
    // other params
    val flinkClientType = GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue(options)
    val otherParams = new util.HashMap[String, Any]()
    val isManager = ManagerUtil.isManager
    if (isManager) {
//      logger.info(
//        s"flink manager mode on. Will set ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.key} to true."
//      )
      logger.info(
        s"support parallelism : ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()}"
      )
    }
    otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase())
    FlinkVersionThreadLocal.setFlinkVersion(flinkVersion)
    val context = new EnvironmentContext(
      defaultEnv,
      new Configuration,
      hadoopConfDir,
      flinkConfDir,
      flinkHome,
      flinkDistJarPath,
      flinkProvidedLibPath,
      providedLibDirsArray,
      shipDirsArray,
      new util.ArrayList[URL],
      flinkExecutionTarget,
      flinkVersion,
      otherParams
    )
    // Step1: environment-level configurations
    val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
    val yarnQueue = LINKIS_QUEUE_NAME.getValue(options)
    val parallelism = FLINK_APP_DEFAULT_PARALLELISM.getValue(options)
    val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "M"
    val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "M"
    val numberOfTaskSlots = LINKIS_FLINK_TASK_SLOTS.getValue(options)
    logger.info(
      s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory, taskManagerMemory = $taskManagerMemory, numberOfTaskSlots = $numberOfTaskSlots."
    )
    // Step2: application-level configurations
    // construct app-config
    val flinkConfig = context.getFlinkConfig
    // construct jar-dependencies
    val flinkUserProvidedLibPath = FLINK_PROVIDED_USER_LIB_PATH.getValue(options).split(",")
    val providedLibDirList =
      Lists.newArrayList(flinkUserProvidedLibPath.filter(StringUtils.isNotBlank): _*)
    val flinkProvidedLibPathList =
      Lists.newArrayList(flinkProvidedLibPath.split(",").filter(StringUtils.isNotBlank): _*)
    // Add the global lib path to user lib path list
    if (flinkProvidedLibPathList != null && flinkProvidedLibPathList.size() > 0) {
      providedLibDirList.addAll(flinkProvidedLibPathList)
    }
    if (
        !FlinkExecutionTargetType
          .isKubernetesExecutionTargetType(flinkExecutionTarget)
    ) {
      flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibDirList)
      // construct jar-dependencies
      flinkConfig.set(YarnConfigOptions.SHIP_FILES, context.getShipDirs)
      // yarn application name
      flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, jobName)
      // yarn queue
      flinkConfig.set(YarnConfigOptions.APPLICATION_QUEUE, yarnQueue)
    }
    // set user classpaths
    val classpaths = FLINK_APPLICATION_CLASSPATH.getValue(options)
    if (StringUtils.isNotBlank(classpaths)) {
      logger.info(s"Add $classpaths to flink application classpath.")
      flinkConfig.set(PipelineOptions.CLASSPATHS, util.Arrays.asList(classpaths.split(","): _*))
    }
    // Configure resource/concurrency
    flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism)
    flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory))
    flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory))
    flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
    // set extra configs
    options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
      case (key, value) =>
        var flinkConfigValue = value
        if (
            FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
              .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
        ) {
          flinkConfigValue = getExtractJavaOpts(value)
        }
        flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
    }
    // set kerberos config
    if (FLINK_KERBEROS_ENABLE.getValue(options)) {
      flinkConfig.set(
        SecurityOptions.KERBEROS_LOGIN_CONTEXTS,
        FLINK_KERBEROS_LOGIN_CONTEXTS.getValue(options)
      )
      flinkConfig.set(
        SecurityOptions.KERBEROS_KRB5_PATH,
        FLINK_KERBEROS_CONF_PATH.getValue(options)
      )
      flinkConfig.set(
        SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
        FLINK_KERBEROS_LOGIN_PRINCIPAL.getValue(options)
      )
      flinkConfig.set(
        SecurityOptions.KERBEROS_LOGIN_KEYTAB,
        FLINK_KERBEROS_LOGIN_KEYTAB.getValue(options)
      )
    }
    if (FLINK_REPORTER_ENABLE.getValue(options)) {
      flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue(options))
      flinkConfig.set(
        MetricOptions.REPORTER_INTERVAL,
        Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue(options).toLong)
      )
    }
    // set savePoint
    val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options)
    if (StringUtils.isNotBlank(savePointPath)) {
      val allowNonRestoredState = FLINK_APP_ALLOW_NON_RESTORED_STATUS.getValue(options).toBoolean
      val savepointRestoreSettings =
        SavepointRestoreSettings.forPath(savePointPath, allowNonRestoredState)
      SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, flinkConfig)
    }

    // Configure user-entrance jar. Can be HDFS file, but only support 1 jar
    val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
    if (
        StringUtils.isNotBlank(flinkMainClassJar) && FlinkExecutionTargetType
          .isYarnExecutionTargetType(flinkExecutionTarget)
    ) {
      val flinkMainClassJarPath =
        if (new File(flinkMainClassJar).exists()) flinkMainClassJar
        else getClass.getClassLoader.getResource(flinkMainClassJar).getPath
      logger.info(
        s"Ready to use $flinkMainClassJarPath as main class jar to submit application to Yarn."
      )
      flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJarPath))
      flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName)
      flinkConfig.setBoolean(DeploymentOptions.ATTACHED, FLINK_EXECUTION_ATTACHED.getValue(options))
      context.setDeploymentTarget(YarnDeploymentTarget.APPLICATION.getName)
      addApplicationLabels(engineCreationContext)
    } else if (isOnceEngineConn(engineCreationContext.getLabels())) {
      flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
    } else {
      flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
    }

    // set kubernetes config
    if (
        StringUtils.isNotBlank(flinkExecutionTarget) && FlinkExecutionTargetType
          .isKubernetesExecutionTargetType(flinkExecutionTarget)
    ) {
      flinkConfig.set(DeploymentOptions.TARGET, flinkExecutionTarget)
      context.setDeploymentTarget(flinkExecutionTarget)

      val kubernetesConfigFile = FLINK_KUBERNETES_CONFIG_FILE.getValue(options)
      if (StringUtils.isBlank(kubernetesConfigFile)) {
        throw new FlinkInitFailedException(KUBERNETES_CONFIG_FILE_EMPTY.getErrorDesc)
      }

      flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, kubernetesConfigFile)

      flinkConfig.set(
        KubernetesConfigOptions.NAMESPACE,
        FLINK_KUBERNETES_NAMESPACE.getValue(options)
      )
      flinkConfig.set(
        KubernetesConfigOptions.CONTAINER_IMAGE,
        FLINK_KUBERNETES_CONTAINER_IMAGE.getValue(options)
      )
      val kubernetesClusterId = FLINK_KUBERNETES_CLUSTER_ID.getValue(options)
      if (StringUtils.isNotBlank(kubernetesClusterId)) {
        flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, kubernetesClusterId)
      }

      val serviceAccount = FLINK_KUBERNETES_SERVICE_ACCOUNT.getValue(options)
      flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, serviceAccount)

      val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
      if (StringUtils.isNotBlank(flinkMainClassJar)) {
        flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJar))
      }
    }
    context
  }