in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala [98:188]
private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration = {
val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest)
submitRequest.jobType match {
case FlinkJobType.PYFLINK =>
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constants.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constants.PYTHON_EXECUTABLE)
val flinkOptPath: String =
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
if (StringUtils.isBlank(flinkOptPath)) {
logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
val flinkHome = submitRequest.flinkVersion.flinkHome
SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt")
logInfo(
s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
}
case _ =>
if (submitRequest.userJarFile != null) {
val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
val programOptions = ProgramOptions.create(commandLine)
val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
programOptions,
Collections.singletonList(uri.toString))
executionParameters.applyToConfiguration(flinkConfig)
}
}
// 1) set common parameter
flinkConfig
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.deployMode.getName)
.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)
if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
}
// 2) set savepoint parameter
if (StringUtils.isNotBlank(submitRequest.savePoint)) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
val enableRestoreMode =
submitRequest.restoreMode != null && submitRequest.flinkVersion
.checkVersion(FlinkRestoreMode.SINCE_FLINK_VERSION)
if (enableRestoreMode) {
flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
}
}
// 4) set env.xx.opts parameter
if (MapUtils.isNotEmpty(submitRequest.properties)) {
// file.encoding...
if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
val jvmOpt =
submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
if (!jvmOpt.contains("-Dfile.encoding=")) {
// set default file.encoding
val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(), opt)
}
}
submitRequest.properties
.filter(_._1.startsWith("env."))
.foreach(x => {
logInfo(s"env opts: ${x._1}: ${x._2}")
flinkConfig.setString(x._1, x._2.toString)
})
}
flinkConfig
}