in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala [62:133]
def submit(submitRequest: SubmitRequest): SubmitResponse = {
logInfo(
s"""
|--------------------------------------- flink job start ---------------------------------------
| userFlinkHome : ${submitRequest.flinkVersion.flinkHome}
| flinkVersion : ${submitRequest.flinkVersion.version}
| appName : ${submitRequest.appName}
| devMode : ${submitRequest.developmentMode.name()}
| execMode : ${submitRequest.executionMode.name()}
| k8sNamespace : ${submitRequest.k8sSubmitParam.kubernetesNamespace}
| flinkExposedType : ${submitRequest.k8sSubmitParam.flinkRestExposedType}
| clusterId : ${submitRequest.k8sSubmitParam.clusterId}
| applicationType : ${submitRequest.applicationType.getName}
| savePoint : ${submitRequest.savePoint}
| properties : ${submitRequest.properties.mkString(" ")}
| args : ${submitRequest.args}
| appConf : ${submitRequest.appConf}
| flinkBuildResult : ${submitRequest.buildResult}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest)
if (submitRequest.userJarFile != null) {
val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
val programOptions = ProgramOptions.create(commandLine)
val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
programOptions,
Collections.singletonList(uri.toString))
executionParameters.applyToConfiguration(flinkConfig)
}
// set common parameter
flinkConfig
.safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
.safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)
if (
!submitRequest.properties.containsKey(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())
) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
}
// set savepoint parameter
if (submitRequest.savePoint != null) {
flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
if (
submitRequest.flinkVersion.checkVersion(
RestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
) {
flinkConfig.setString(RestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
}
}
// set JVMOptions..
setJvmOptions(submitRequest, flinkConfig)
setConfig(submitRequest, flinkConfig)
doSubmit(submitRequest, flinkConfig)
}