in linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala [101:306]
protected def createEnvironmentContext(
engineCreationContext: EngineCreationContext
): EnvironmentContext = {
val options = engineCreationContext.getOptions
val flinkExecutionTarget = FLINK_EXECUTION_TARGET.getValue(options)
val defaultEnv =
Environment.parse(this.getClass.getClassLoader.getResource("flink-sql-defaults.yaml"))
val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
val flinkHome = FLINK_HOME.getValue(options)
val flinkConfDir = FLINK_CONF_DIR.getValue(options)
val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options)
val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
if (
StringUtils.isNotBlank(flinkVersion) && flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION)
) {
flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", "flink-dist_2.11")
}
// Local lib path
val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
// Ship directories
val shipDirsArray = getShipDirectories(options)
// other params
val flinkClientType = GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue(options)
val otherParams = new util.HashMap[String, Any]()
val isManager = ManagerUtil.isManager
if (isManager) {
// logger.info(
// s"flink manager mode on. Will set ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.key} to true."
// )
logger.info(
s"support parallelism : ${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()}"
)
}
otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase())
FlinkVersionThreadLocal.setFlinkVersion(flinkVersion)
val context = new EnvironmentContext(
defaultEnv,
new Configuration,
hadoopConfDir,
flinkConfDir,
flinkHome,
flinkDistJarPath,
flinkProvidedLibPath,
providedLibDirsArray,
shipDirsArray,
new util.ArrayList[URL],
flinkExecutionTarget,
flinkVersion,
otherParams
)
// Step1: environment-level configurations
val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
val yarnQueue = LINKIS_QUEUE_NAME.getValue(options)
val parallelism = FLINK_APP_DEFAULT_PARALLELISM.getValue(options)
val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "M"
val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "M"
val numberOfTaskSlots = LINKIS_FLINK_TASK_SLOTS.getValue(options)
logger.info(
s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory, taskManagerMemory = $taskManagerMemory, numberOfTaskSlots = $numberOfTaskSlots."
)
// Step2: application-level configurations
// construct app-config
val flinkConfig = context.getFlinkConfig
// construct jar-dependencies
val flinkUserProvidedLibPath = FLINK_PROVIDED_USER_LIB_PATH.getValue(options).split(",")
val providedLibDirList =
Lists.newArrayList(flinkUserProvidedLibPath.filter(StringUtils.isNotBlank): _*)
val flinkProvidedLibPathList =
Lists.newArrayList(flinkProvidedLibPath.split(",").filter(StringUtils.isNotBlank): _*)
// Add the global lib path to user lib path list
if (flinkProvidedLibPathList != null && flinkProvidedLibPathList.size() > 0) {
providedLibDirList.addAll(flinkProvidedLibPathList)
}
if (
!FlinkExecutionTargetType
.isKubernetesExecutionTargetType(flinkExecutionTarget)
) {
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibDirList)
// construct jar-dependencies
flinkConfig.set(YarnConfigOptions.SHIP_FILES, context.getShipDirs)
// yarn application name
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, jobName)
// yarn queue
flinkConfig.set(YarnConfigOptions.APPLICATION_QUEUE, yarnQueue)
}
// set user classpaths
val classpaths = FLINK_APPLICATION_CLASSPATH.getValue(options)
if (StringUtils.isNotBlank(classpaths)) {
logger.info(s"Add $classpaths to flink application classpath.")
flinkConfig.set(PipelineOptions.CLASSPATHS, util.Arrays.asList(classpaths.split(","): _*))
}
// Configure resource/concurrency
flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism)
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory))
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory))
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
case (key, value) =>
var flinkConfigValue = value
if (
FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
.equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
) {
flinkConfigValue = getExtractJavaOpts(value)
}
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
flinkConfig.set(
SecurityOptions.KERBEROS_LOGIN_CONTEXTS,
FLINK_KERBEROS_LOGIN_CONTEXTS.getValue(options)
)
flinkConfig.set(
SecurityOptions.KERBEROS_KRB5_PATH,
FLINK_KERBEROS_CONF_PATH.getValue(options)
)
flinkConfig.set(
SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
FLINK_KERBEROS_LOGIN_PRINCIPAL.getValue(options)
)
flinkConfig.set(
SecurityOptions.KERBEROS_LOGIN_KEYTAB,
FLINK_KERBEROS_LOGIN_KEYTAB.getValue(options)
)
}
if (FLINK_REPORTER_ENABLE.getValue(options)) {
flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue(options))
flinkConfig.set(
MetricOptions.REPORTER_INTERVAL,
Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue(options).toLong)
)
}
// set savePoint
val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options)
if (StringUtils.isNotBlank(savePointPath)) {
val allowNonRestoredState = FLINK_APP_ALLOW_NON_RESTORED_STATUS.getValue(options).toBoolean
val savepointRestoreSettings =
SavepointRestoreSettings.forPath(savePointPath, allowNonRestoredState)
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, flinkConfig)
}
// Configure user-entrance jar. Can be HDFS file, but only support 1 jar
val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
if (
StringUtils.isNotBlank(flinkMainClassJar) && FlinkExecutionTargetType
.isYarnExecutionTargetType(flinkExecutionTarget)
) {
val flinkMainClassJarPath =
if (new File(flinkMainClassJar).exists()) flinkMainClassJar
else getClass.getClassLoader.getResource(flinkMainClassJar).getPath
logger.info(
s"Ready to use $flinkMainClassJarPath as main class jar to submit application to Yarn."
)
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJarPath))
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName)
flinkConfig.setBoolean(DeploymentOptions.ATTACHED, FLINK_EXECUTION_ATTACHED.getValue(options))
context.setDeploymentTarget(YarnDeploymentTarget.APPLICATION.getName)
addApplicationLabels(engineCreationContext)
} else if (isOnceEngineConn(engineCreationContext.getLabels())) {
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
} else {
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
}
// set kubernetes config
if (
StringUtils.isNotBlank(flinkExecutionTarget) && FlinkExecutionTargetType
.isKubernetesExecutionTargetType(flinkExecutionTarget)
) {
flinkConfig.set(DeploymentOptions.TARGET, flinkExecutionTarget)
context.setDeploymentTarget(flinkExecutionTarget)
val kubernetesConfigFile = FLINK_KUBERNETES_CONFIG_FILE.getValue(options)
if (StringUtils.isBlank(kubernetesConfigFile)) {
throw new FlinkInitFailedException(KUBERNETES_CONFIG_FILE_EMPTY.getErrorDesc)
}
flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, kubernetesConfigFile)
flinkConfig.set(
KubernetesConfigOptions.NAMESPACE,
FLINK_KUBERNETES_NAMESPACE.getValue(options)
)
flinkConfig.set(
KubernetesConfigOptions.CONTAINER_IMAGE,
FLINK_KUBERNETES_CONTAINER_IMAGE.getValue(options)
)
val kubernetesClusterId = FLINK_KUBERNETES_CLUSTER_ID.getValue(options)
if (StringUtils.isNotBlank(kubernetesClusterId)) {
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, kubernetesClusterId)
}
val serviceAccount = FLINK_KUBERNETES_SERVICE_ACCOUNT.getValue(options)
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, serviceAccount)
val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
if (StringUtils.isNotBlank(flinkMainClassJar)) {
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJar))
}
}
context
}